Node.js stream pipe for passing pino logs to stdoutput and other services (Slack)
/* eslint-disable no-underscore-dangle */
// node app.js | node send-logs.js "${CW_LOG_GROUP}" "${CW_LOG_STREAM}"
const { Transform, Writable } = require('stream');
const https = require('https');
const logger = require('pino')();
const { slackWebhookPath } = require('./config/env');
const logStreamName = process.argv.pop();
const alertLevel = 50; // Level of msg that will be sent to Slack
logger.info(`Log stream name is ${logStreamName}, alerts are set for level ${alertLevel}. Slack webhook is ${slackWebhookPath ? 'defined' : 'undefiend'}`);
// The source process emits each log entry on a single line -> make a splitter for it, stdin can
// emit 'data' events which do not contain the whole line if the payload is larget than its max
// buffer size
// Also, it would have been trivial to just do chunk.toString().split('\n'), but that could be much
// slower. Besides, even if we emit a string, it will be converted to buffer again, so it seems
// wasteful
const splitter = new Transform({
transform(chunk, encoding, next) {
let rest = this._cache ? Buffer.concat([this._cache, chunk]) : chunk;
let nlIndex;
// As long as we keep finding new lines, keep making slices of the buffer and push them to the
// readable side of the transform stream
while ((nlIndex = rest.indexOf('\n')) !== -1) {
// The `end` parameter is non-inclusive, so increase it to include the newline we found
const line = rest.slice(0, ++nlIndex);
// start is inclusive, but we are already one char ahead of the newline -> all good
rest = rest.slice(nlIndex);
this.push(line);
}
// We split by newlines, but there is still some remaining data -> save for later
if (rest.length) {
this._cache = rest;
}
return next();
},
});
class Slack extends Writable {
constructor(opt) {
super();
this.slackWebhookPath = opt.slackWebhookPath;
this.logStreamName = opt.logStreamName;
this.alertLevel = opt.alertLevel;
}
_write(chunk, encoding, next) {
const log = chunk.toString('utf8');
if (JSON.parse(log).level >= this.alertLevel) {
this.notifySlack(log);
}
next();
}
notifySlack(log) {
const payload = JSON.stringify({ text: `${this.logStreamName}: ${log}` });
const req = https.request({
hostname: 'hooks.slack.com',
port: 443,
path: this.slackWebhookPath,
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
}, (res) => {
res.setEncoding('utf8');
res.on('data', (chunk) => {
logger.info(`Slack msg sent. Response is: ${chunk}`);
});
res.on('error', (err) => {
logger.error(err);
});
});
req.on('error', (err) => {
logger.error(err);
});
req.write(payload);
req.end();
}
}
const toSlack = new Slack({ slackWebhookPath, logStreamName, alertLevel });
process.stdin.pipe(splitter).pipe(toSlack);
process.stdin.pipe(process.stdout);
// Do not exit on terminating signals - wait until stdin emits 'end' event
// and all pending log writes are flushed to server, resultin in clean exit
process.once('SIGINT', () => { });
process.once('SIGTERM', () => { });