hivefans
10/29/2016 - 1:21 AM

sarama_produce_test_with_timestamp.go

package main

import (
	"log"
	"os"
	"time"

	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Errors = true
	config.Producer.Retry.Max = 10
	config.Producer.Compression = sarama.CompressionSnappy // or GZIP don't work in combination with Timestamp!
	config.Producer.Flush.Frequency = 500 * time.Millisecond
	config.ClientID = "test"
	sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)

	producer, err := sarama.NewAsyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatalln("[ERROR] Failed to start Sarama producer:", err)
	}
	defer producer.Close()

	go func() {
		for err := range producer.Errors() {
			log.Printf("[ERROR]: %s\n", err.Error())
		}
	}()

	producer.Input() <- &sarama.ProducerMessage{
		Timestamp: time.Now(),
		Topic:     "bla",
		Value:     sarama.ByteEncoder("test this message"),
	}
}