'use strict';
const winston = require('winston');
const Kafka = require('node-rdkafka');
function _debug() {
arguments[0] = '[TaskRunner] ' + arguments[0];
winston.debug(...arguments);
}
function _error() {
arguments[0] = '[TaskRunner] ' + arguments[0];
winston.error(...arguments);
}
function _info() {
arguments[0] = '[TaskRunner] ' + arguments[0];
winston.info(...arguments);
}
function TaskRunner(options) {
this.options = options;
//config
let appBuilder = this.options['xapp']['appBuilder'];
this.producerConfig = _.merge({}, appBuilder.kafka,
{'bootstrap.servers': this.options['kafka']['bootstrap.servers']});
this.consumerConfig = _.merge({}, this.producerConfig,
{'enable.auto.commit': this.options['kafka']['enable.auto.commit']});
this.topicConfig = this.options['kafka']['default.topic.config'];
this.kafkaTopic = appBuilder['kafkaTopic'];
this.producer = null;
this.consumer = null;
this.processFlag = false;
this.initializeProducer();
this.initializeConsumer();
}
TaskRunner.prototype.initialize = async function () {
await this._receiveKafkaMessage();
};
TaskRunner.prototype._onConnectionError = function (err) {
_error('_onConnectionError', err);
this.producer.disconnect();
};
TaskRunner.prototype.initializeProducer = function () {
_info('Initialize Kafka producer');
this.producer = new Kafka.Producer(this.producerConfig, this.topicConfig);
this.producer.connect({}, function (err) {
if (err) {
_debug(err);
}
});
this.producer.on('error', this._onConnectionError.bind(this));
this.producer.on('SIGTERM', this._onConnectionError.bind(this));
};
TaskRunner.prototype.initializeConsumer = function () {
_info('Initialize Kafka consumer');
this.consumer = new Kafka.KafkaConsumer(this.consumerConfig, this.topicConfig);
this.consumer.connect({}, function (err) {
if (err) {
_debug(err);
}
});
this.consumer
.on('ready', () => {
_info('Kafka consumer ready');
this.consumer.subscribe([this.kafkaTopic]);
setInterval(() => {
if (!this.processFlag) {
this.consumer.consume(1);
}
}, 1000);
});
this.consumer.on('error', this._onConnectionError.bind(this));
this.consumer.on('SIGTERM', this._onConnectionError.bind(this));
};
TaskRunner.prototype._receiveKafkaMessage = async function () {
_info('Receive kafka message');
this.consumer.on('data', async(data) => {
_info('Receive kafka message: %s', data.value.toString());
this.processFlag = true;
try {
//TODO: handler processKafkaMessage
this.processFlag = false;
this.consumer.commit();
} catch (ex) {
this.processFlag = false;
_error(ex);
}
});
};