kafka golang
package main
import (
"flag"
"github.com/optiopay/kafka"
"log"
"net/http"
"net/http/pprof"
"strings"
"time"
"ooxx/config"
"ooxx/lib"
"ooxx/model"
)
const LOG_CHANNEL_COUNT = 200
const LOG_BUFFER_COUNT = 100
var debug = flag.String("debug", "false", "debug mode")
var queue = make(chan []byte, LOG_CHANNEL_COUNT)
var buffer = make([]string, LOG_BUFFER_COUNT)
var ticker = time.NewTicker(4 * time.Second)
func save_message() {
if len(buffer) > 0 {
tm := lib.TimeFormat()
log := "stats_play_" + tm
file := config.Config.StatsLogDir + log
buf := bytes.Buffer{}
for _, v := range buffer {
if v == "" {
continue
}
buf.WriteString(v)
buf.WriteString("\n")
}
content := buf.String()
if content != "" {
lib.FilePutContents2(file, content)
buffer = buffer[0:0]
}
}
}
func push_message() {
for {
select {
case c := <-queue:
buffer = append(buffer, string(c))
case <-ticker.C:
save_message()
}
}
}
func consume_flow_message(broker kafka.Client, topic string, partition int) {
conf := kafka.NewConsumerConf(topic, int32(partition))
conf.StartOffset = kafka.StartOffsetNewest
consumer, err := broker.Consumer(conf)
if err != nil {
log.Fatalf("cannot create kafka consumer for %s:%d: %s", topic, partition, err)
}
for {
msg, err := consumer.Consume()
if err != nil {
if err != kafka.ErrNoData {
log.Printf("cannot consume %s:%d message: %s", topic, partition, err)
}
break
}
switch partition {
case config.Config.KafkaPartitionFlay:
log.Printf("%s:%d, %d: %s", topic, partition, msg.Offset, msg.Value)
case config.Config.KafkaPartitionShow:
log.Printf("%s:%d, %d: %s", topic, partition, msg.Offset, msg.Value)
case config.Config.KafkaPartitionFlow:
log.Printf("%s:%d, %d: %s", topic, partition, msg.Offset, msg.Value)
if len(msg.Value) > 0 {
queue <- msg.Value
}
}
}
log.Print("consume_flow_message, consumer quit, %s:%d", topic, partition)
}
func main() {
defer func() {
if err := recover(); err != nil {
lib.P("panic:", err, "\nstack:"+lib.Stack(false))
}
}()
defer model.Db.Close()
flag.Parse()
go func() {
profServeMux := http.NewServeMux()
profServeMux.HandleFunc("/debug/pprof/", pprof.Index)
profServeMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
profServeMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
profServeMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
err := http.ListenAndServe(":9527", profServeMux)
if err != nil {
panic(err)
}
}()
var kafkaAddrs = strings.Split(config.Config.KafkaBrokers, ",")
var conf = kafka.NewBrokerConf("xktv")
conf.DialTimeout = 1 * time.Second
conf.DialRetryLimit = 1
broker, err := kafka.Dial(kafkaAddrs, conf)
if err != nil {
log.Fatalf("cannot connect to kafka cluster: %s", err)
}
defer broker.Close()
go push_message()
go consume_flow_message(broker, config.Config.KafkaTopicFlow, config.Config.KafkaPartitionFlay)
go consume_flow_message(broker, config.Config.KafkaTopicFlow, config.Config.KafkaPartitionShow)
consume_flow_message(broker, config.Config.KafkaTopicFlow, config.Config.KafkaPartitionFlow)
}