const http = require("http");
const r = require("rethinkdbdash")();
function nextRow(cursor, cancelled) {
return new Promise((resolve, reject) => {
cursor.next((err, row) => err ? reject(err) : resolve(row));
cancelled.then(() => reject(new Error("Cancelled")));
});
}
async function* query(cancelled) {
let cursor = await r.db("rethinkdb").table("stats").changes();
cancelled.then(() => cursor.close());
while (true)
yield nextRow(cursor, cancelled);
}
async function iterStream(res, iter, cancel) {
res.writeHead(200, {"Content-Type": "text/event-stream"});
res.connection.on("close", () => cancel());
for await (let item of iter)
res.write(`event: item\ndata: ${JSON.stringify(item)}\n\n`);
}
async function handler(req, res) {
let cancel, cancelled = new Promise(resolve => cancel = resolve);
try {
await iterStream(res, query(cancelled), cancel)
}
catch (err) {
if (err.message === "Cancelled")
console.log("Connection was closed");
else console.log("Unexpected error:", err.stack);
}
}
http.createServer(handler).listen(8000)