package main
import (
"log"
"os"
"time"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Errors = true
config.Producer.Retry.Max = 10
config.Producer.Compression = sarama.CompressionSnappy // or GZIP don't work in combination with Timestamp!
config.Producer.Flush.Frequency = 500 * time.Millisecond
config.ClientID = "test"
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatalln("[ERROR] Failed to start Sarama producer:", err)
}
defer producer.Close()
go func() {
for err := range producer.Errors() {
log.Printf("[ERROR]: %s\n", err.Error())
}
}()
producer.Input() <- &sarama.ProducerMessage{
Timestamp: time.Now(),
Topic: "bla",
Value: sarama.ByteEncoder("test this message"),
}
}