ngkonstantinidis
10/25/2018 - 8:43 PM

[Kakfka Producer] A sample kafka producer #kafka #sourcecode

[Kakfka Producer] A sample kafka producer #kafka #sourcecode

/**
* This method or function return a new Kafka producer. This maybe
* called once in an application. For example, in a spring project
* this may produced using @Bean or in a java ee application using
* the @Produce or @ApplicationScoped
*/
public KafkaProducer getProducer(KafkaConfiguration kafkaConfiguration) {
	return new KafkaProducer(kafkaConfiguration.asMap());
}
/**
* Kafka Producer configuration. Shows the way and
* the parameters. In this case the fields are bind
* with a application.properties file, but may bind 
* also with other ways.
*/
@Configuration
@ConfigurationProperties("custom.kafka")
public static class KafkaConfiguration {

  	private String bootstrapServers;

    private String keySerializerClass;

	private String valueSerializerClass;
        
    private int retries;

	Map<String, ? extends Serializable> asMap() {
    	return HashMap.of(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
			ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass,
	        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass,
	        ProducerConfig.RETRIES_CONFIG, retries
		).toJavaMap();
	}
}
/**
* This shows the way of using tha kafka producer. In this
* example a text message, "text message", is sent to the 
* a topic, "testTopic". The producer sends the message using
* the fire-and-forget method.
*/
@Autowired
private KafkaProducer kafkaProducer;

...

Try.of(() -> kafkaProducer.send(new ProducerRecord("testTopic", "text message")))
                    .onSuccess(future -> log.info("Message sent"))
                    .onFailure(Throwable::printStackTrace);