erikpalla
4/11/2017 - 12:56 PM

Node.js stream pipe for passing pino logs to stdoutput and other services (Slack)

Node.js stream pipe for passing pino logs to stdoutput and other services (Slack)

/* eslint-disable no-underscore-dangle */

// node app.js | node send-logs.js "${CW_LOG_GROUP}" "${CW_LOG_STREAM}"
const { Transform, Writable } = require('stream');
const https = require('https');
const logger = require('pino')();

const { slackWebhookPath } = require('./config/env');

const logStreamName = process.argv.pop();
const alertLevel = 50; // Level of msg that will be sent to Slack
logger.info(`Log stream name is ${logStreamName}, alerts are set for level ${alertLevel}. Slack webhook is ${slackWebhookPath ? 'defined' : 'undefiend'}`);
// The source process emits each log entry on a single line -> make a splitter for it, stdin can
// emit 'data' events which do not contain the whole line if the payload is larget than its max
// buffer size
// Also, it would have been trivial to just do chunk.toString().split('\n'), but that could be much
// slower. Besides, even if we emit a string, it will be converted to buffer again, so it seems
// wasteful

const splitter = new Transform({
  transform(chunk, encoding, next) {
    let rest = this._cache ? Buffer.concat([this._cache, chunk]) : chunk;
    let nlIndex;

    // As long as we keep finding new lines, keep making slices of the buffer and push them to the
    // readable side of the transform stream
    while ((nlIndex = rest.indexOf('\n')) !== -1) {
      // The `end` parameter is non-inclusive, so increase it to include the newline we found
      const line = rest.slice(0, ++nlIndex);
      // start is inclusive, but we are already one char ahead of the newline -> all good
      rest = rest.slice(nlIndex);
      this.push(line);
    }

    // We split by newlines, but there is still some remaining data -> save for later
    if (rest.length) {
      this._cache = rest;
    }

    return next();
  },
});


class Slack extends Writable {
  constructor(opt) {
    super();
    this.slackWebhookPath = opt.slackWebhookPath;
    this.logStreamName = opt.logStreamName;
    this.alertLevel = opt.alertLevel;
  }
  _write(chunk, encoding, next) {
    const log = chunk.toString('utf8');
    if (JSON.parse(log).level >= this.alertLevel) {
      this.notifySlack(log);
    }
    next();
  }
  notifySlack(log) {
    const payload = JSON.stringify({ text: `${this.logStreamName}: ${log}` });
    const req = https.request({
      hostname: 'hooks.slack.com',
      port: 443,
      path: this.slackWebhookPath,
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
      },
    }, (res) => {
      res.setEncoding('utf8');
      res.on('data', (chunk) => {
        logger.info(`Slack msg sent. Response is: ${chunk}`);
      });
      res.on('error', (err) => {
        logger.error(err);
      });
    });
    req.on('error', (err) => {
      logger.error(err);
    });
    req.write(payload);
    req.end();
  }
}
const toSlack = new Slack({ slackWebhookPath, logStreamName, alertLevel });

process.stdin.pipe(splitter).pipe(toSlack);
process.stdin.pipe(process.stdout);

// Do not exit on terminating signals - wait until stdin emits 'end' event
// and all pending log writes are flushed to server, resultin in clean exit
process.once('SIGINT', () => { });
process.once('SIGTERM', () => { });