kafka_connect_put
@Override
public void put(Collection<SinkRecord> collection) {
try {
Collection<String> recordsAsString = collection.stream().map(r -> String.valueOf(r.value())).collect(Collectors.toList());
elasticService.process(recordsAsString);
}
catch (Exception e) {
log.error("Error while processing records");
log.error(e.toString());
}
}