majianyu
5/14/2019 - 10:46 AM

redis_stream_go

package main

import (
	"mytest/stream/consumer"
	"mytest/stream/publisher"
	"mytest/stream/redis"
	"time"
)

func main() {
	redis.Init()
	defer redis.Client.Close()
	go consumer.Consum("1")
	go consumer.Consum("2")
	go consumer.Consum("3")
	//publisher.Publish()
	for i := 0; i < 10; i++ {
		publisher.Publish()
		time.Sleep(2 * time.Second)
	}
	time.Sleep(time.Second * 3)
}
package publisher

import (
	"github.com/go-redis/redis"
	r2"mytest/stream/redis"
)

func Publish() {
	args := redis.XAddArgs{
		Stream:       "codehole",
		MaxLen:       0,
		MaxLenApprox: 0,
		ID:           "*",
		Values:       map[string]interface{}{"name": "laoqian", "age": 30},
	}
	r2.Client.XAdd(&args)
}
package consumer

import (
	"fmt"
	"github.com/go-redis/redis"
	r2 "mytest/stream/redis"
	"time"
)

func Consum(group string) {

	//s, err := r2.Client.XGroupCreate("codehole", group, "$").Result()
	//fmt.Println(s, err)
	for {
		streams, err := r2.Client.XReadGroup(&redis.XReadGroupArgs{
			Group:    group,
			Consumer: "c1",
			Streams:  []string{"codehole", ">"},
			Count:    1,
			Block:    time.Second * 1,
			NoAck:    true,
		}).Result()
		fmt.Println("group:"+group, streams, err)

	}
}
package redis

import (
	"fmt"
	"github.com/go-redis/redis"
)

var Client *redis.Client

func Init() {
	Client = redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	fmt.Println("Redis----------Ping()----------", Client.Ping())
}