yano3nora
8/17/2017 - 6:40 AM

[php+js: Server-Sent Events] Note of implementation Server-Sent Events as Long-polling. #php #js #cakephp

[js: Server-Sent Events] Note of implementation Server-Sent Events as Long-polling. #php #js

OVERVIEW

いわゆるポーリング型のサーバプッシュ実装方式の一つ。純粋ポーリングや Comet と違い W3C で策定され JavaScript の API も存在する。websocket が出てきてからはいらない子扱い。

refs

Backend

class ServerSentEventsController {

  public function fetch() {
    $this->printFetchHeader();
    $messages = ['hoge', 'fuga', 'piyo', 'foo', 'bar', 'baz'];
    $array    = ['data is...' => 'finish !!'];
    $cnt      = 0;
    $started  = new \DateTimeImmutable();
    while (strtotime('now') <= $started->modify('+'.LOOP_TIME_LIMIT.' seconds')->format('U')) {
      if ($cnt === count($messages)) {
        $this->printFetchEvent($array, 'close');
      }
      $this->printFetchEvent($messages[$cnt]);
      $cnt++;
    }
    exit;
  }

  /**
   * Print event method for Server-Sent Events.
   * @param mixed   $data
   * @param ?string $event
   * @param ?int    $sleep
   * @param ?int    $retry
   * @see https://gist.github.com/a08b20aea47f509d8947e6bb63dcb9e3
   */
  protected function printFetchEvent($data, string $event='message', int $sleep=1, int $retry=0) {
    $print = 'event: '.$event.LF;
    if (is_array($data) || is_object($data)) {
      $print .= 'data: '.json_encode($data).LF;
    } else {
      $print .= 'data: '.$data.LF;
    }
    if ($retry) {
      $print .= 'retry: '.($retry * 1000).LF;  // Milliseconds.
    }
    $print .= LF;
    echo $print;
    ob_flush();
    flush();
    session_write_close();
    sleep($sleep);
  }


  /** 
   * Flush method for Server-Sent Events.
   * @param  int $sleep
   * @return void
   */
  protected function printFetchFlush(int $sleep=1) {
    ob_flush();
    flush();
    session_write_close();
    sleep($sleep);
  }

}

Frontend

/* in riot tag */
const self    = this
const api     = '/admin/api/queues/fetch'
this.es       = null

this.on('before-mount', () => {
  window.onbeforeunload = () => { this.es.close() }
})

this.on('mount', () => {
  this.es = new EventSource(api)
  this.es.addEventListener('message', (e) => {
    console.log(`${e.data} (state: ${e.target.readyState})`)
    // [JSON] console.dir(JSON.parse(e.data))
  })
  this.es.addEventListener('close', (e) => {
    console.log(e.data)
    this.es.close()
  })
  this.es.addEventListener('error', (e) => {
    this.es.close()
    console.log('Failed to Server Sent Events connection.')
  })
})

/* results on browser console
  hoge (state: 1)
  fuga (state: 1)
  piyo (state: 1)
  foo (state: 1)
  bar (state: 1)
  baz (state: 1)
  {"data is...":"finish !!"}
*/

TIPS & REFERENCES

勝手に再接続しようとする

EventSource.readyState が 1 でない ( 異常時 ) にブラウザ上の JS メモリが生きていれば同一セッションで再度コネクションを張ろうと試みるみたい。

クッキー・セッション・リダイレクトなど NG

Content-Type: text/event-stream につき、PHP の header() を伴うアクションは全てエラーとなる。

セッション情報の更新も反映されない?

SSE のコンテントタイプが Content-Type: text/event-stream のためか while によるループ中だからかまで検証していないが、 セッションを参照することはできるが、中身のデータが別のプロセスで更新されても更新されない ことに注意。

解放し損ねた SSE プロセスへの安全装置を用意

this.es.close() が出来ない状態、具体的には window.location.href など JS 側で別ページへリダイレクトした際に、正常に EventSource を close 出来ずにサーバサイドの永久ループプロセスが残存してしまった。

SSE のループは while (一定時間後に false になる条件) {} みたいな感じに実装しないと、何らかの原因でループを殺せないとサーバ上に不要な SSE ループを残したまんまになってしまう ... 。

HTTP の常時接続はちゃんと close() しないとブラウザ動かねえ

Chromeだけかもわからないケド、プッシュの間隔が数秒間あろうがなかろうが SSE はセッションを張り続ける。 var es = new EventSource() した瞬間から「(低コストではあるが)サーバプッシュを受け付けるために常時通信している」ことになる。他ページに移ったりするタイミングでこの接続を適切に es.close() で切断すべし。

Sample

// 移動を感知して切断
window.onbeforeunload = (e) => { if (this.es) this.es.close() }        
window.onunload       = (e) => { if (this.es) this.es.close() }
window.onclose        = (e) => { if (this.es) this.es.close() }

セッションを2個同時にさばけない

よくある SSE サンプルコードではサーバ側で sleep(1) でループかましてるが、セッション展開時(CakePHPなどFW導入してると常に展開しているので注意)はそのセッションを一度クローズしないと セッションがロックされて同一セッションからのリクエストをサーバが捌かなくなる。 これによりクライアント側では「返答なし」と判断してAjax不渡り→ Provisional headers are shown になる。これについてはループで sleep() に入る前に session_write_close() してやること。

Refs

Sample

$user = $this->Auth->user();  // auth-session
header("Content-Type: text/event-stream\n");
header("Cache-Control: no-cache\n");

while (1) {
  echo "event: message\n";
  echo "data: ".json_encode($user)."\n\n";

  ob_flush();
  flush();
  session_write_close();  // !important
  sleep(1);
}

非同期通信の発火タイミングがばらけるとエラー地獄に

Riot.js で実装時、マウント直後に this.es = new EventSource() で貼っており、それとは別に this.on('mount') のコールバックで Ajax を走らせていた。Riotの内部的なコンストラクタでこの辺の処理が競合して「非同期通信乱立」みたいになった。らめぇこんなの制御できないよぉ。

this.es = new EventSource(`${fetchApi}`)
this.es.addEventListener(){/* サーバプッシュ待ち処理 */}  // こいつが走らない

this.on('mount', () => {
  request.get(`${pullApi}`)
    .end(/* なんか Ajax 的な処理 */)
})

Promise でこう来たらこうっ!をちゃんと実装

そもそも Riot.js 等ビューフレームワークはグローバルな空間で副作用を伴うもんをポコポコ書くような設計になってない。各ライフサイクルイベントに適切に割り振るか、イニシャライズで順次進行的に非同期通信をポコポコだすなら Promise() などで制御しないと動作が死ぬほど不安定になる。この辺ちょっとテキトーに考えてた。もういい加減このあたり setTimeout() とかで誤魔化さないんだよ?わかった?

Refs

Sample

const request = require('superagent')

this.on('mount', () => { /* Init routine. */
  this.pullByAjax()     // Promise を return してくれるので...
    .then((res) => {    // からの~?
      this.fetchInit()  // ここで SSE はる...みたいな感じ
    })
    .catch((err) => {   // エラーはんどら
      console.warn('Error has occurred.')
      console.dir(err)
    })
})

/**
 * Pull init data on Ajax.
 * 最初にやりたい PULL な Ajax
 */
this.pullByAjax = () => { 
  return new Promise((resolve, reject) => { /* Wrapping by Promise. */
    request.get(`${api}/pull`)
      .set('X-Requested-With', 'XMLHttpRequest')
      .type('form')
      .end((err, res) => {
        if (err) { /* Send to error handler. */
          reject('Failed to Ajax.')
        } else {
          // do something fxxk !!
          resolve(res.body.response)
        }
    })
  })
}

/**
 * Initialize fetch by Server-Sent Events
 * からの~?でやりたい SSE のコネクション
 */
this.fetchInit = () => { /* Startup fetch by SSE. */
  this.es = new EventSource(`${api}/fetch`)
  this.es.onopen = () => {
    console.log('コネクションを開始しました')
  }
  this.es.addEventListener('push', (e) => { 
    JSON.parse(e.data).map((queue) => { 
      // do something fxxk !!
    })
  })
  this.es.addEventListener('ping', (e) => { 
    console.log(`${e.data} (state: ${e.target.readyState})`)
  })
  this.es.addEventListener('close', (e) => { 
    console.log(`${e.data} (state: ${e.target.readyState})`)
    this.es.close()
  })
  this.es.addEventListener('error', (e) => {
    alert('サーバーとの接続が切断されました。画面を再読み込みします。')
    console.log(`${e.data} (state: ${e.target.readyState})`)
    this.es.close()
    window.location.reload()
  })
  window.onbeforeunload = (e) => { if (this.es) this.es.close() }        
  window.onunload       = (e) => { if (this.es) this.es.close() }
  window.onclose        = (e) => { if (this.es) this.es.close() }
}