lqshow
10/14/2017 - 3:21 AM

Nodejs和Kafka交互

'use strict';

const winston = require('winston');
const Kafka = require('node-rdkafka');

function _debug() {
    arguments[0] = '[TaskRunner] ' + arguments[0];
    winston.debug(...arguments);
}

function _error() {
    arguments[0] = '[TaskRunner] ' + arguments[0];
    winston.error(...arguments);
}

function _info() {
    arguments[0] = '[TaskRunner] ' + arguments[0];
    winston.info(...arguments);
}

function TaskRunner(options) {
    this.options = options;

    //config
    let appBuilder = this.options['xapp']['appBuilder'];
    this.producerConfig = _.merge({}, appBuilder.kafka,
        {'bootstrap.servers': this.options['kafka']['bootstrap.servers']});
    this.consumerConfig = _.merge({}, this.producerConfig,
        {'enable.auto.commit': this.options['kafka']['enable.auto.commit']});
    this.topicConfig = this.options['kafka']['default.topic.config'];
    this.kafkaTopic = appBuilder['kafkaTopic'];

    this.producer = null;
    this.consumer = null;
    this.processFlag = false;
    this.initializeProducer();
    this.initializeConsumer();
}

TaskRunner.prototype.initialize = async function () {
    await this._receiveKafkaMessage();
};

TaskRunner.prototype._onConnectionError = function (err) {
    _error('_onConnectionError', err);
    this.producer.disconnect();
};

TaskRunner.prototype.initializeProducer = function () {
    _info('Initialize Kafka producer');

    this.producer = new Kafka.Producer(this.producerConfig, this.topicConfig);

    this.producer.connect({}, function (err) {
        if (err) {
            _debug(err);
        }
    });

    this.producer.on('error', this._onConnectionError.bind(this));
    this.producer.on('SIGTERM', this._onConnectionError.bind(this));
};

TaskRunner.prototype.initializeConsumer = function () {
    _info('Initialize Kafka consumer');

    this.consumer = new Kafka.KafkaConsumer(this.consumerConfig, this.topicConfig);

    this.consumer.connect({}, function (err) {
        if (err) {
            _debug(err);
        }
    });

    this.consumer
        .on('ready', () => {
            _info('Kafka consumer ready');
            this.consumer.subscribe([this.kafkaTopic]);

            setInterval(() => {
                if (!this.processFlag) {
                    this.consumer.consume(1);
                }
            }, 1000);
        });

    this.consumer.on('error', this._onConnectionError.bind(this));
    this.consumer.on('SIGTERM', this._onConnectionError.bind(this));
};

TaskRunner.prototype._receiveKafkaMessage = async function () {
    _info('Receive kafka message');

    this.consumer.on('data', async(data) => {
        _info('Receive kafka message: %s', data.value.toString());
        this.processFlag = true;
        try {
            //TODO: handler processKafkaMessage
            this.processFlag = false;
            this.consumer.commit();
        } catch (ex) {
            this.processFlag = false;
            _error(ex);
        }
    });
};