73nko
3/13/2018 - 2:25 PM

Cancelleable async

const http = require("http");
const r = require("rethinkdbdash")();

function nextRow(cursor, cancelled) {
  return new Promise((resolve, reject) => {
    cursor.next((err, row) => err ? reject(err) : resolve(row));
    cancelled.then(() => reject(new Error("Cancelled")));
  });
}

async function* query(cancelled) {
  let cursor = await r.db("rethinkdb").table("stats").changes();
  cancelled.then(() => cursor.close());

  while (true)
    yield nextRow(cursor, cancelled);
}

async function iterStream(res, iter, cancel) {
  res.writeHead(200, {"Content-Type": "text/event-stream"});
  res.connection.on("close", () => cancel());

  for await (let item of iter)
    res.write(`event: item\ndata: ${JSON.stringify(item)}\n\n`);
}

async function handler(req, res) {
  let cancel, cancelled = new Promise(resolve => cancel = resolve);
  
  try {
    await iterStream(res, query(cancelled), cancel)
  }
  catch (err) {
    if (err.message === "Cancelled")
      console.log("Connection was closed");
    else console.log("Unexpected error:", err.stack);
  }
}

http.createServer(handler).listen(8000)