skynyrd
12/16/2017 - 12:39 PM

kafka_connect_put

kafka_connect_put

@Override
public void put(Collection<SinkRecord> collection) {
        try {
            Collection<String> recordsAsString = collection.stream().map(r -> String.valueOf(r.value())).collect(Collectors.toList());
            elasticService.process(recordsAsString);
        }
        catch (Exception e) {
            log.error("Error while processing records");
            log.error(e.toString());
        }
}