[js: Server-Sent Events] Note of implementation Server-Sent Events as Long-polling. #php #js
いわゆるポーリング型のサーバプッシュ実装方式の一つ。純粋ポーリングや Comet と違い W3C で策定され JavaScript の API も存在する。websocket が出てきてからはいらない子扱い。
class ServerSentEventsController {
public function fetch() {
$this->printFetchHeader();
$messages = ['hoge', 'fuga', 'piyo', 'foo', 'bar', 'baz'];
$array = ['data is...' => 'finish !!'];
$cnt = 0;
$started = new \DateTimeImmutable();
while (strtotime('now') <= $started->modify('+'.LOOP_TIME_LIMIT.' seconds')->format('U')) {
if ($cnt === count($messages)) {
$this->printFetchEvent($array, 'close');
}
$this->printFetchEvent($messages[$cnt]);
$cnt++;
}
exit;
}
/**
* Print event method for Server-Sent Events.
* @param mixed $data
* @param ?string $event
* @param ?int $sleep
* @param ?int $retry
* @see https://gist.github.com/a08b20aea47f509d8947e6bb63dcb9e3
*/
protected function printFetchEvent($data, string $event='message', int $sleep=1, int $retry=0) {
$print = 'event: '.$event.LF;
if (is_array($data) || is_object($data)) {
$print .= 'data: '.json_encode($data).LF;
} else {
$print .= 'data: '.$data.LF;
}
if ($retry) {
$print .= 'retry: '.($retry * 1000).LF; // Milliseconds.
}
$print .= LF;
echo $print;
ob_flush();
flush();
session_write_close();
sleep($sleep);
}
/**
* Flush method for Server-Sent Events.
* @param int $sleep
* @return void
*/
protected function printFetchFlush(int $sleep=1) {
ob_flush();
flush();
session_write_close();
sleep($sleep);
}
}
/* in riot tag */
const self = this
const api = '/admin/api/queues/fetch'
this.es = null
this.on('before-mount', () => {
window.onbeforeunload = () => { this.es.close() }
})
this.on('mount', () => {
this.es = new EventSource(api)
this.es.addEventListener('message', (e) => {
console.log(`${e.data} (state: ${e.target.readyState})`)
// [JSON] console.dir(JSON.parse(e.data))
})
this.es.addEventListener('close', (e) => {
console.log(e.data)
this.es.close()
})
this.es.addEventListener('error', (e) => {
this.es.close()
console.log('Failed to Server Sent Events connection.')
})
})
/* results on browser console
hoge (state: 1)
fuga (state: 1)
piyo (state: 1)
foo (state: 1)
bar (state: 1)
baz (state: 1)
{"data is...":"finish !!"}
*/
EventSource.readyState
が 1 でない ( 異常時 ) にブラウザ上の JS メモリが生きていれば同一セッションで再度コネクションを張ろうと試みるみたい。
Content-Type: text/event-stream
につき、PHP の header()
を伴うアクションは全てエラーとなる。
SSE のコンテントタイプが Content-Type: text/event-stream
のためか while
によるループ中だからかまで検証していないが、 セッションを参照することはできるが、中身のデータが別のプロセスで更新されても更新されない ことに注意。
this.es.close()
が出来ない状態、具体的には window.location.href
など JS 側で別ページへリダイレクトした際に、正常に EventSource を close 出来ずにサーバサイドの永久ループプロセスが残存してしまった。
SSE のループは while (一定時間後に false になる条件) {}
みたいな感じに実装しないと、何らかの原因でループを殺せないとサーバ上に不要な SSE ループを残したまんまになってしまう ... 。
Chromeだけかもわからないケド、プッシュの間隔が数秒間あろうがなかろうが SSE はセッションを張り続ける。 var es = new EventSource()
した瞬間から「(低コストではあるが)サーバプッシュを受け付けるために常時通信している」ことになる。他ページに移ったりするタイミングでこの接続を適切に es.close()
で切断すべし。
// 移動を感知して切断
window.onbeforeunload = (e) => { if (this.es) this.es.close() }
window.onunload = (e) => { if (this.es) this.es.close() }
window.onclose = (e) => { if (this.es) this.es.close() }
よくある SSE サンプルコードではサーバ側で sleep(1)
でループかましてるが、セッション展開時(CakePHPなどFW導入してると常に展開しているので注意)はそのセッションを一度クローズしないと セッションがロックされて同一セッションからのリクエストをサーバが捌かなくなる。 これによりクライアント側では「返答なし」と判断してAjax不渡り→ Provisional headers are shown
になる。これについてはループで sleep()
に入る前に session_write_close()
してやること。
$user = $this->Auth->user(); // auth-session
header("Content-Type: text/event-stream\n");
header("Cache-Control: no-cache\n");
while (1) {
echo "event: message\n";
echo "data: ".json_encode($user)."\n\n";
ob_flush();
flush();
session_write_close(); // !important
sleep(1);
}
Riot.js で実装時、マウント直後に this.es = new EventSource()
で貼っており、それとは別に this.on('mount')
のコールバックで Ajax を走らせていた。Riotの内部的なコンストラクタでこの辺の処理が競合して「非同期通信乱立」みたいになった。らめぇこんなの制御できないよぉ。
this.es = new EventSource(`${fetchApi}`)
this.es.addEventListener(){/* サーバプッシュ待ち処理 */} // こいつが走らない
this.on('mount', () => {
request.get(`${pullApi}`)
.end(/* なんか Ajax 的な処理 */)
})
そもそも Riot.js 等ビューフレームワークはグローバルな空間で副作用を伴うもんをポコポコ書くような設計になってない。各ライフサイクルイベントに適切に割り振るか、イニシャライズで順次進行的に非同期通信をポコポコだすなら Promise()
などで制御しないと動作が死ぬほど不安定になる。この辺ちょっとテキトーに考えてた。もういい加減このあたり setTimeout()
とかで誤魔化さないんだよ?わかった?
const request = require('superagent')
this.on('mount', () => { /* Init routine. */
this.pullByAjax() // Promise を return してくれるので...
.then((res) => { // からの~?
this.fetchInit() // ここで SSE はる...みたいな感じ
})
.catch((err) => { // エラーはんどら
console.warn('Error has occurred.')
console.dir(err)
})
})
/**
* Pull init data on Ajax.
* 最初にやりたい PULL な Ajax
*/
this.pullByAjax = () => {
return new Promise((resolve, reject) => { /* Wrapping by Promise. */
request.get(`${api}/pull`)
.set('X-Requested-With', 'XMLHttpRequest')
.type('form')
.end((err, res) => {
if (err) { /* Send to error handler. */
reject('Failed to Ajax.')
} else {
// do something fxxk !!
resolve(res.body.response)
}
})
})
}
/**
* Initialize fetch by Server-Sent Events
* からの~?でやりたい SSE のコネクション
*/
this.fetchInit = () => { /* Startup fetch by SSE. */
this.es = new EventSource(`${api}/fetch`)
this.es.onopen = () => {
console.log('コネクションを開始しました')
}
this.es.addEventListener('push', (e) => {
JSON.parse(e.data).map((queue) => {
// do something fxxk !!
})
})
this.es.addEventListener('ping', (e) => {
console.log(`${e.data} (state: ${e.target.readyState})`)
})
this.es.addEventListener('close', (e) => {
console.log(`${e.data} (state: ${e.target.readyState})`)
this.es.close()
})
this.es.addEventListener('error', (e) => {
alert('サーバーとの接続が切断されました。画面を再読み込みします。')
console.log(`${e.data} (state: ${e.target.readyState})`)
this.es.close()
window.location.reload()
})
window.onbeforeunload = (e) => { if (this.es) this.es.close() }
window.onunload = (e) => { if (this.es) this.es.close() }
window.onclose = (e) => { if (this.es) this.es.close() }
}