消息队列,我们一般称它为 MQ(Message Queue),两个单词的结合,这两个英文单词想必大家都应该知道吧,其实最熟悉的还是 Queue 吧,即队列。队列是一种先进先出的数据结构,队列的使用还是比较普遍的。在现代软件开发中,消息队列是实现应用解耦、提高系统扩展性和健壮性的关键组件。Go 语言以其并发处理能力著称,是构建高性能消息队列的理想选择。本文将介绍一个使用 Go 语言和 Redis 实现的简单但功能完备的消息队列系统。
概述 消息队列系统通常由生产者(Producer)和消费者(Consumer)组成。生产者负责发送消息,而消费者则负责接收和处理这些消息。我们的消息队列系统使用 Redis 作为后端存储,利用其高性能和持久化特性来保证消息的可靠性。
程序结构 我们的系统由以下几个核心组件构成:
Queue:队列管理器,负责创建队列、发布消息和启动消费者。
Producer:消息生产者,用于向队列发送消息。
Consumer:消息消费者,用于从队列接收并处理消息。
Message:消息实体,定义了消息的结构和序列化方法。
核心代码解析 Queue 管理器 queue.go 文件定义了 Queue 结构和相关方法,它封装了一个用于消息队列处理的 redis.Client 实例,。Queue 结构也包含了 Redis 客户端、主题(topic)以及生产者和消费者实例。其中 NewQueue 函数用于创建一个新的消息队列实例,包括生产者和消费者。Start 方法用于启动消费者的监听,Publish 方法用于发布消息。完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package queueimport ( "context" "github.com/go-redis/redis/v8" "sync" )const ( HashSuffix = ":hash" SetSuffix = ":set" )var once sync.Once type Queue struct { ctx context.Context redis *redis.Client topic string producer *producer consumer *consumer }func NewQueue (ctx context.Context, redis *redis.Client, opts ...Option) *Queue { var queue *Queue once.Do(func () { defaultOptions := Options{ topic: "topic" , handler: defaultHander, } for _, apply := range opts { apply(&defaultOptions) } queue = &Queue{ ctx: ctx, redis: redis, topic: defaultOptions.topic, producer: NewProducer(ctx), consumer: NewConsumer(ctx, defaultOptions.handler), } }) return queue }func (q *Queue) Start () { go q.consumer.listen(q.redis, q.topic) }func (q *Queue) Publish (msg *Message) (int64 , error) { return q.producer.publish(q.redis, q.topic, msg) }
你也发现了其中 NewQueue 函数接受一个上下文、Redis 客户端和一个可变数量的选项,用于创建一个新的队列实例。Option 定义了一个使用函数闭包实现的选项模式,用于生成具有不同选项的 Options 结构,其中包含了可配置的字段 topic 和 handler。WithTopic 和 WithHandler 是两个函数,它们接受相应的参数并返回对应的选项函数。当这些选项函数被应用到 Options 结构时,会设置结构体中相应的字段值。queue_option.go 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package queuetype Option func (*Options) type Options struct { topic string handler handlerFunc }func WithTopic (topic string ) Option { return func (opts *Options) { opts.topic = topic } }func WithHandler (handler handlerFunc) Option { return func (opts *Options) { opts.handler = handler } }
消息实体 message.go 文件定义了 Message 结构,用于表示消息实体。它包含了消息的唯一标识符(ID)、创建时间、消费时间以及消息体(body)。NewMessage 函数用于创建消息实例。GetScore 函数返回消息的分数,GetId 函数返回消息的 ID,MarshalBinary 和 UnmarshalBinary 函数用于消息的序列化和反序列化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package queueimport ( "encoding/json" "github.com/satori/go.uuid" "time" )type Message struct { Id string `json:"id"` CreateTime time.Time `json:"createTime"` ConsumeTime time.Time `json:"consumeTime"` Body interface {} `json:"body"` }func NewMessage (id string , consumeTime time.Time, body interface {}) *Message { if id == "" { id = uuid.NewV4().String() } return &Message{ Id: id, CreateTime: time.Now(), ConsumeTime: consumeTime, Body: body, } }func (m *Message) GetScore () float64 { return float64 (m.ConsumeTime.Unix()) }func (m *Message) GetId () string { return m.Id }func (m *Message) MarshalBinary () ([]byte , error) { return json.Marshal(m) }func (m *Message) UnmarshalBinary (data []byte ) error { return json.Unmarshal(data, m) }
生产者 producer.go 文件定义了 producer 结构和 publish 方法。生产者负责将消息发布到 Redis 的有序集合(sorted set)和哈希表(hash)中。代码创建了一个生产者类型并实现了发布消息到 Redis 的功能。在发布消息时,它首先将消息写入有序集合,然后将消息写入哈希表。这样,通过使用 NewProducer 函数创建生产者实例后,可以使用其 publish 方法将消息发布到 Redis 中保存。 producer.go 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package queueimport ( "context" "github.com/go-redis/redis/v8" )type producer struct { ctx context.Context }func NewProducer (ctx context.Context) *producer { return &producer{ ctx: ctx, } }func (p *producer) publish (redisClient *redis.Client, topic string , msg *Message) (int64 , error) { z := &redis.Z{ Score: msg.GetScore(), Member: msg.GetId(), } setKey := topic + SetSuffix n, err := redisClient.ZAdd(p.ctx, setKey, z).Result() if err != nil { return n, err } hashKey := topic + HashSuffix return redisClient.HSet(p.ctx, hashKey, msg.GetId(), msg).Result() }
消费者 consumer.go 文件定义了 consumer 结构和 listen 方法。消费者通过监听 Redis 的有序集合来获取消息,并从哈希表中检索消息详情,然后通过提供的处理器(handler)函数进行处理。建了一个消费者类型,并实现了监听消息队列中的消息并处理的功能。在 NewConsumer 方法中,创建了一个消费者实例并初始化了相关参数。在 listen 方法中,通过定时器定时从有序集合获取消息,并通过通道将数据送至处理函数进行最终逻辑处理。这样,通过使用 NewConsumer 函数创建消费者实例后,可以调用其 listen 方法实现对消息队列的监听和处理。consumer.go 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 package queueimport ( "context" "encoding/json" "fmt" "github.com/go-redis/redis/v8" "log" "strconv" "time" )type handlerFunc func (msg Message) func defaultHandler (msg Message) { fmt.Println(msg) }type consumer struct { ctx context.Context duration time.Duration ch chan []string handler handlerFunc }func NewConsumer (ctx context.Context, handler handlerFunc) *consumer { return &consumer{ ctx: ctx, duration: time.Second, ch: make (chan []string , 1000 ), handler: handler, } }func (c *consumer) listen (redisClient *redis.Client, topic string ) { go func () { for { select { case ret := <-c.ch: key := topic + HashSuffix result, err := redisClient.HMGet(c.ctx, key, ret...).Result() if err != nil { log.Println(err) } if len (result) > 0 { redisClient.HDel(c.ctx, key, ret...) } msg := Message{} for _, v := range result { if v == nil { continue } str := v.(string ) json.Unmarshal([]byte (str), &msg) go c.handler(msg) } } } }() ticker := time.NewTicker(c.duration) defer ticker.Stop() for { select { case <-c.ctx.Done(): log.Println("consumer quit:" , c.ctx.Err()) return case <-ticker.C: min := strconv.Itoa(0 ) max := strconv.Itoa(int (time.Now().Unix())) opt := &redis.ZRangeBy{ Min: min, Max: max, } key := topic + SetSuffix result, err := redisClient.ZRangeByScore(c.ctx, key, opt).Result() if err != nil { log.Fatal(err) return } if len (result) > 0 { redisClient.ZRemRangeByScore(c.ctx, key, min, max) c.ch <- result } } } }
使用示例 假设我们有一个简单的场景,需要发送和处理用户行为日志。我们可以定义一个日志消息的生产者和消费者,main.go 如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 var RedisQueue *queue.Queuevar Redis *redis.Clientfunc mian () { client := redis.NewClient(option) _, err := client.Ping(context.Background()).Result() if err != nil { log.Fatal(fmt.Sprintf("Connect to redis: %v" , err)) } Redis = client InitRedisQueue() }func InitRedisQueue () { RedisQueue = queue.NewQueue(context.Background(), Redis, queue.WithTopic("send-message" ), queue.WithHandler(bot.CreateAndSendMessages)) RedisQueue.Start() }func CreateAndSendMessages () { id := uuid.NewV4().String() logMsg := queue.NewMessage(id, time.Now(), map [string ]interface {}{"user_id" : 123 , "action" : "login" }) queue.Publish(logMsg) if err != nil { fmt.Println("send " , err) } }
总结 本文介绍了一个基于 Go 语言和 Redis 实现的消息队列系统。该系统简单、高效,并且易于扩展。无论是在微服务架构中还是在需要解耦生产者和消费者的场景中,它都能提供强大的支持。请注意,这是一篇技术文章,您可以根据需要进行调整和补充。如果您有更多的细节或特定的要求,以便进一步用在生产环境中,请您完善更多的细节,比如:性能调优,特别是考虑到网络延迟和 Redis 实例的性能。
关注我获取更新