RocketMQ 快速入门

本文最后更新于:2 个月前

RocketMQ 的基本概念

RocketMQ 由四部分组成:命名服务器(Name Server)、代理(Broker)、生产者(Producer)和消费者(Consumer),其中每一个都可以水平扩展而没有单点故障,如下图所示。

img

  • Producer : 消息的生产者
  • Consumer: 消息的消费者
  • Broker:主要用于 producer 和 consumer 接收和发送消息,一个 RabbitMQ 实例就是一个 Broker
  • NameServer: 管理 broker,服务发现者
  • Topic: 区分消息的种类; 一个发送者可以发送消息给一个或者多个 Topic; 一个消息的接收者可以订阅一个或者多个 topic 消息
  • Message: 消息载体。Message 发送或者消费的时候必须指定 Topic
  • Queue:1 个 Topic 会被分为 N 个 Queue,数量是可配置的。Message 本身其实是存储到 Queue 上的,消费者消费的也是 Queue 上的消息

大致流程

  1. Broker 都注册到 Nameserver 上
  2. Producer 发消息的时候会从 Nameserver 上获取发消息的 topic 信息
  3. Producer 向提供服务的所有 master 建立长连接,且定时向 master 发送心跳
  4. Consumer 通过 NameServer 集群获得 Topic 的路由信息
  5. Consumer 会与所有的 Master 和所有的 Slave 都建立连接进行监听新消息

RocketMQ Client Go API 的使用

RocketMQ Client Go 一个纯净的产品就绪的 RocketMQ 客户端,它几乎支持 Apache RocketMQ 的全部功能

使用 go get 安装 SDK:

1
go get -u github.com/apache/rocketmq-client-go/v2

发送普通消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"context"
"fmt"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.31.162:9876"}))
if err != nil {
panic("生成 producer 失败")
}

if err = p.Start(); err != nil {
panic("启动 producer 失败")
}

res, err := p.SendSync(context.Background(), primitive.NewMessage("RocketMQ", []byte("this is RocketMQ")))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}

if err = p.Shutdown(); err != nil {
panic("关闭 producer 失败")
}

}

运行结果如下:

img

浏览器查询结果:

img

消费普通消息

新建 consumer/main.go 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"context"
"fmt"
"time"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
)

func main() {
c, _ := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"192.168.31.162:9876"}),
consumer.WithGroupName("Aliliin"), // 多个实例
)

err := c.Subscribe("Aliliin", consumer.MessageSelector{},
func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range msgs {
fmt.Printf("获取到的数据: %v \n", msgs[i])
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
fmt.Println("读取消息失败")
}
_ = c.Start()
//不能让主 goroutine 退出,不然就马上结束了
time.Sleep(time.Hour)
_ = c.Shutdown()
}

发送延时消息

新建 delay/main.go 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"context"
"fmt"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.31.162:9876"}))
if err != nil {
panic("生成 producer 失败")
}

if err = p.Start(); err != nil {
panic("启动 producer 失败")
}

msg := primitive.NewMessage("Aliliin", []byte("this is delay message"))
msg.WithDelayTimeLevel(3) // 延迟级别
res, err := p.SendSync(context.Background(), msg)
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}

if err = p.Shutdown(); err != nil {
panic("关闭 producer 失败")
}
}

继续执行消费 consumer/main.go 文件,等到消费时间一到,可以看到成功消费。

img

新建事务消息

新建 transaction/main.go 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"context"
"fmt"
"time"

"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)

type TransactionListener struct{}

func (o *TransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {
fmt.Println("开始执行")
time.Sleep(time.Second * 3)
fmt.Println("执行成功")
//执行逻辑无缘无故失败 代码异常 宕机
return primitive.CommitMessageState // 执行成功的
// return primitive.UnknowState
}

func (o *TransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {
fmt.Println("rocketmq 的消息回查")
time.Sleep(time.Second * 15)
return primitive.CommitMessageState
}

func main() {
p, err := rocketmq.NewTransactionProducer(
&TransactionListener{},
producer.WithNameServer([]string{"192.168.31.162:9876"}),
)
if err != nil {
panic("生成 producer 失败")
}

if err = p.Start(); err != nil {
panic("启动 producer 失败")
}

res, err := p.SendMessageInTransaction(context.Background(),
primitive.NewMessage("TransTopic", []byte("this is transaction message")))
if err != nil {
fmt.Printf("发送失败: %s\n", err)
} else {
fmt.Printf("发送成功: %s\n", res.String())
}

time.Sleep(time.Hour)
if err = p.Shutdown(); err != nil {
panic("关闭 producer 失败")
}
}

primitive.CommitMessageState 的执行逻辑

img

primitive.RollbackMessageState 的执行逻辑,不发送消息

img

primitive.UnknowState 未知错误,消息回查

img

关注我获取更新