手把手教你用 Go 写一个基于 Redis 的消息队列

本文最后更新于:3 分钟前

消息队列,我们一般称它为 MQ(Message Queue),两个单词的结合,这两个英文单词想必大家都应该知道吧,其实最熟悉的还是 Queue 吧,即队列。队列是一种先进先出的数据结构,队列的使用还是比较普遍的。在现代软件开发中,消息队列是实现应用解耦、提高系统扩展性和健壮性的关键组件。Go 语言以其并发处理能力著称,是构建高性能消息队列的理想选择。本文将介绍一个使用 Go 语言和 Redis 实现的简单但功能完备的消息队列系统。

概述

消息队列系统通常由生产者(Producer)和消费者(Consumer)组成。生产者负责发送消息,而消费者则负责接收和处理这些消息。我们的消息队列系统使用 Redis 作为后端存储,利用其高性能和持久化特性来保证消息的可靠性。

程序结构

我们的系统由以下几个核心组件构成:

  • Queue:队列管理器,负责创建队列、发布消息和启动消费者。
  • Producer:消息生产者,用于向队列发送消息。
  • Consumer:消息消费者,用于从队列接收并处理消息。
  • Message:消息实体,定义了消息的结构和序列化方法。

核心代码解析

Queue 管理器

queue.go 文件定义了 Queue 结构和相关方法,它封装了一个用于消息队列处理的 redis.Client 实例,。Queue 结构也包含了 Redis 客户端、主题(topic)以及生产者和消费者实例。其中 NewQueue 函数用于创建一个新的消息队列实例,包括生产者和消费者。Start 方法用于启动消费者的监听,Publish 方法用于发布消息。完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

package queue

import (
"context"
"github.com/go-redis/redis/v8"
"sync"
)

const (
HashSuffix = ":hash" // Redis 键后缀,用于哈希表
SetSuffix = ":set" // Redis 键后缀,用于集合
)

var once sync.Once // 用于确保某些初始化代码只执行一次的全局变量

type Queue struct {
ctx context.Context // 上下文

// redis
redis *redis.Client // Redis客户端
topic string // 主题

// producer and consumer
producer *producer // 生产者
consumer *consumer // 消费者

}

func NewQueue(ctx context.Context, redis *redis.Client, opts ...Option) *Queue {
var queue *Queue

// 使用sync.Once确保以下代码只执行一次
once.Do(func() {
// 定义默认的选项
defaultOptions := Options{
topic: "topic",
handler: defaultHander, // 默认处理函数
}

// 应用传入的选项(opts)
for _, apply := range opts {
apply(&defaultOptions)
}

// 创建Queue实例
queue = &Queue{
ctx: ctx,
redis: redis,
topic: defaultOptions.topic,
producer: NewProducer(ctx), // 创建生产者
consumer: NewConsumer(ctx, defaultOptions.handler), // 创建消费者,使用处理函数
}
})

// 返回Queue实例
return queue

}

func (q *Queue) Start() {
// 启动消费者的监听
go q.consumer.listen(q.redis, q.topic)
}

func (q *Queue) Publish(msg *Message) (int64, error) {
// 发布消息
return q.producer.publish(q.redis, q.topic, msg)
}

  • 你也发现了其中 NewQueue 函数接受一个上下文、Redis 客户端和一个可变数量的选项,用于创建一个新的队列实例。Option 定义了一个使用函数闭包实现的选项模式,用于生成具有不同选项的 Options 结构,其中包含了可配置的字段 topic 和 handler。WithTopic 和 WithHandler 是两个函数,它们接受相应的参数并返回对应的选项函数。当这些选项函数被应用到 Options 结构时,会设置结构体中相应的字段值。queue_option.go 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package queue

type Option func(*Options)

type Options struct {
topic string
handler handlerFunc
}

// WithTopic 用于设置选项中的 topic 字段
func WithTopic(topic string) Option {
return func(opts *Options) {
opts.topic = topic
}
}

// WithHandler 用于设置选项中的 handler 字段
func WithHandler(handler handlerFunc) Option {
return func(opts *Options) {
opts.handler = handler
}
}

消息实体

message.go 文件定义了 Message 结构,用于表示消息实体。它包含了消息的唯一标识符(ID)、创建时间、消费时间以及消息体(body)。NewMessage 函数用于创建消息实例。GetScore 函数返回消息的分数,GetId 函数返回消息的 ID,MarshalBinary 和 UnmarshalBinary 函数用于消息的序列化和反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package queue

import (
"encoding/json"
"github.com/satori/go.uuid"
"time"
)

// Message 定义消息结构
type Message struct {
Id string `json:"id"`
CreateTime time.Time `json:"createTime"`
ConsumeTime time.Time `json:"consumeTime"`
Body interface{} `json:"body"`
}

// NewMessage 用于创建消息实体
func NewMessage(id string, consumeTime time.Time, body interface{}) *Message {
if id == "" {
id = uuid.NewV4().String()
}
return &Message{
Id: id,
CreateTime: time.Now(),
ConsumeTime: consumeTime,
Body: body,
}
}

// GetScore 用于返回消息的分数
func (m *Message) GetScore() float64 {
return float64(m.ConsumeTime.Unix())
}

// GetId 用于返回消息的ID
func (m *Message) GetId() string {
return m.Id
}

// MarshalBinary 用于将消息结构体序列化为二进制数据
func (m *Message) MarshalBinary() ([]byte, error) {
return json.Marshal(m)
}

// UnmarshalBinary 用于将二进制数据反序列化为消息结构体
func (m *Message) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, m)
}

生产者

producer.go 文件定义了 producer 结构和 publish 方法。生产者负责将消息发布到 Redis 的有序集合(sorted set)和哈希表(hash)中。代码创建了一个生产者类型并实现了发布消息到 Redis 的功能。在发布消息时,它首先将消息写入有序集合,然后将消息写入哈希表。这样,通过使用 NewProducer 函数创建生产者实例后,可以使用其 publish 方法将消息发布到 Redis 中保存。 producer.go 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package queue

import (
"context"
"github.com/go-redis/redis/v8"
)

type producer struct {
ctx context.Context
}

func NewProducer(ctx context.Context) *producer {
return &producer{
ctx: ctx,
}
}

// Publish方法用于将消息发布到Redis中
func (p *producer) publish(redisClient *redis.Client, topic string, msg *Message) (int64, error) {
z := &redis.Z{
Score: msg.GetScore(),
Member: msg.GetId(),
}

// 将消息写入有序集合
setKey := topic + SetSuffix
n, err := redisClient.ZAdd(p.ctx, setKey, z).Result()
if err != nil {
return n, err
}

// 将消息写入哈希表
hashKey := topic + HashSuffix
return redisClient.HSet(p.ctx, hashKey, msg.GetId(), msg).Result()
}

消费者

consumer.go 文件定义了 consumer 结构和 listen 方法。消费者通过监听 Redis 的有序集合来获取消息,并从哈希表中检索消息详情,然后通过提供的处理器(handler)函数进行处理。建了一个消费者类型,并实现了监听消息队列中的消息并处理的功能。在 NewConsumer 方法中,创建了一个消费者实例并初始化了相关参数。在 listen 方法中,通过定时器定时从有序集合获取消息,并通过通道将数据送至处理函数进行最终逻辑处理。这样,通过使用 NewConsumer 函数创建消费者实例后,可以调用其 listen 方法实现对消息队列的监听和处理。consumer.go 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package queue

import (
"context"
"encoding/json"
"fmt"
"github.com/go-redis/redis/v8"
"log"
"strconv"
"time"
)

// handlerFunc 是用于处理消息的函数类型
type handlerFunc func(msg Message)

// 默认处理函数,打印消息
func defaultHandler(msg Message) {
fmt.Println(msg)
}

// consumer 类型包含了消费者相关的字段和方法
type consumer struct {
ctx context.Context
duration time.Duration
ch chan []string
handler handlerFunc
}

// NewConsumer 用于创建一个消费者实例
func NewConsumer(ctx context.Context, handler handlerFunc) *consumer {
return &consumer{
ctx: ctx,
duration: time.Second,
ch: make(chan []string, 1000),
handler: handler,
}
}

// listen 方法用于监听消息队列中的消息并处理
func (c *consumer) listen(redisClient *redis.Client, topic string) {
// 从哈希表中获取数据并处理
go func() {
for {
select {
case ret := <-c.ch:
// 从哈希表中批量获取数据信息
key := topic + HashSuffix
result, err := redisClient.HMGet(c.ctx, key, ret...).Result()
if err != nil {
log.Println(err)
}

if len(result) > 0 {
redisClient.HDel(c.ctx, key, ret...)
}

msg := Message{}
for _, v := range result {
// 由于哈希表和有序集合操作不是原子操作,可能会出现删除了集合中的数据但哈希表中数据未删除的情况
if v == nil {
continue
}
str := v.(string)
json.Unmarshal([]byte(str), &msg)

// 处理逻辑
go c.handler(msg)
}

}
}
}()

// 定时器用于定时获取消息并处理
ticker := time.NewTicker(c.duration)
defer ticker.Stop()
for {
select {
case <-c.ctx.Done(): // 上下文取消,退出监听
log.Println("consumer quit:", c.ctx.Err())
return
case <-ticker.C: // 定时获取消息
// 从 Redis 中读取数据
min := strconv.Itoa(0)
max := strconv.Itoa(int(time.Now().Unix()))
opt := &redis.ZRangeBy{
Min: min,
Max: max,
}

key := topic + SetSuffix
result, err := redisClient.ZRangeByScore(c.ctx, key, opt).Result()
if err != nil {
log.Fatal(err)
return
}

// 获取到数据
if len(result) > 0 {
// 从有序集合中移除数据
redisClient.ZRemRangeByScore(c.ctx, key, min, max)

// 写入通道,进行哈希表处理
c.ch <- result
}
}
}
}

使用示例

假设我们有一个简单的场景,需要发送和处理用户行为日志。我们可以定义一个日志消息的生产者和消费者,main.go 如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42


var RedisQueue *queue.Queue
var Redis *redis.Client

func mian() {
// 这里 redis 的链接自己写吧
client := redis.NewClient(option)

_, err := client.Ping(context.Background()).Result()

if err != nil {
log.Fatal(fmt.Sprintf("Connect to redis: %v", err))
}

Redis = client

// 初始化 redis 消费队列
InitRedisQueue()
}

// InitRedisQueue 创建延时队列
func InitRedisQueue() {
// 创建队列
RedisQueue = queue.NewQueue(context.Background(), Redis,
queue.WithTopic("send-message"),
queue.WithHandler(bot.CreateAndSendMessages))

// 启动消费者
RedisQueue.Start()
}

// 创建并发送消息
func CreateAndSendMessages() {
id := uuid.NewV4().String()
logMsg := queue.NewMessage(id, time.Now(), map[string]interface{}{"user_id": 123, "action": "login"})
queue.Publish(logMsg)
if err != nil {
fmt.Println("send ", err)
}
}

总结

本文介绍了一个基于 Go 语言和 Redis 实现的消息队列系统。该系统简单、高效,并且易于扩展。无论是在微服务架构中还是在需要解耦生产者和消费者的场景中,它都能提供强大的支持。请注意,这是一篇技术文章,您可以根据需要进行调整和补充。如果您有更多的细节或特定的要求,以便进一步用在生产环境中,请您完善更多的细节,比如:性能调优,特别是考虑到网络延迟和 Redis 实例的性能。

关注我获取更新


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!