RocketMQ 的基本概念
RocketMQ 由四部分组成:命名服务器(Name Server)、代理(Broker)、生产者(Producer)和消费者(Consumer),其中每一个都可以水平扩展而没有单点故障,如下图所示。
- Producer : 消息的生产者
- Consumer: 消息的消费者
- Broker:主要用于 producer 和 consumer 接收和发送消息,一个 RabbitMQ 实例就是一个 Broker
- NameServer: 管理 broker,服务发现者
- Topic: 区分消息的种类; 一个发送者可以发送消息给一个或者多个 Topic; 一个消息的接收者可以订阅一个或者多个 topic 消息
- Message: 消息载体。Message 发送或者消费的时候必须指定 Topic
- Queue:1 个 Topic 会被分为 N 个 Queue,数量是可配置的。Message 本身其实是存储到 Queue 上的,消费者消费的也是 Queue 上的消息
大致流程
- Broker 都注册到 Nameserver 上
- Producer 发消息的时候会从 Nameserver 上获取发消息的 topic 信息
- Producer 向提供服务的所有 master 建立长连接,且定时向 master 发送心跳
- Consumer 通过 NameServer 集群获得 Topic 的路由信息
- Consumer 会与所有的 Master 和所有的 Slave 都建立连接进行监听新消息
RocketMQ Client Go API 的使用
RocketMQ Client Go 一个纯净的产品就绪的 RocketMQ 客户端,它几乎支持 Apache RocketMQ 的全部功能
使用 go get 安装 SDK:
1
| go get -u github.com/apache/rocketmq-client-go/v2
|
发送普通消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package main
import ( "context" "fmt"
"github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" )
func main() { p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.31.162:9876"})) if err != nil { panic("生成 producer 失败") }
if err = p.Start(); err != nil { panic("启动 producer 失败") }
res, err := p.SendSync(context.Background(), primitive.NewMessage("RocketMQ", []byte("this is RocketMQ"))) if err != nil { fmt.Printf("发送失败: %s\n", err) } else { fmt.Printf("发送成功: %s\n", res.String()) }
if err = p.Shutdown(); err != nil { panic("关闭 producer 失败") }
}
|
运行结果如下:
浏览器查询结果:
消费普通消息
新建 consumer/main.go
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package main
import ( "context" "fmt" "time"
"github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/consumer" "github.com/apache/rocketmq-client-go/v2/primitive" )
func main() { c, _ := rocketmq.NewPushConsumer( consumer.WithNameServer([]string{"192.168.31.162:9876"}), consumer.WithGroupName("Aliliin"), )
err := c.Subscribe("Aliliin", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { for i := range msgs { fmt.Printf("获取到的数据: %v \n", msgs[i]) } return consumer.ConsumeSuccess, nil }) if err != nil { fmt.Println("读取消息失败") } _ = c.Start() time.Sleep(time.Hour) _ = c.Shutdown() }
|
发送延时消息
新建 delay/main.go
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package main
import ( "context" "fmt"
"github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" )
func main() { p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.31.162:9876"})) if err != nil { panic("生成 producer 失败") }
if err = p.Start(); err != nil { panic("启动 producer 失败") }
msg := primitive.NewMessage("Aliliin", []byte("this is delay message")) msg.WithDelayTimeLevel(3) res, err := p.SendSync(context.Background(), msg) if err != nil { fmt.Printf("发送失败: %s\n", err) } else { fmt.Printf("发送成功: %s\n", res.String()) }
if err = p.Shutdown(); err != nil { panic("关闭 producer 失败") } }
|
继续执行消费 consumer/main.go
文件,等到消费时间一到,可以看到成功消费。
新建事务消息
新建 transaction/main.go
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package main
import ( "context" "fmt" "time"
"github.com/apache/rocketmq-client-go/v2" "github.com/apache/rocketmq-client-go/v2/primitive" "github.com/apache/rocketmq-client-go/v2/producer" )
type TransactionListener struct{}
func (o *TransactionListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState { fmt.Println("开始执行") time.Sleep(time.Second * 3) fmt.Println("执行成功") return primitive.CommitMessageState }
func (o *TransactionListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState { fmt.Println("rocketmq 的消息回查") time.Sleep(time.Second * 15) return primitive.CommitMessageState }
func main() { p, err := rocketmq.NewTransactionProducer( &TransactionListener{}, producer.WithNameServer([]string{"192.168.31.162:9876"}), ) if err != nil { panic("生成 producer 失败") }
if err = p.Start(); err != nil { panic("启动 producer 失败") }
res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("TransTopic", []byte("this is transaction message"))) if err != nil { fmt.Printf("发送失败: %s\n", err) } else { fmt.Printf("发送成功: %s\n", res.String()) }
time.Sleep(time.Hour) if err = p.Shutdown(); err != nil { panic("关闭 producer 失败") } }
|
primitive.CommitMessageState
的执行逻辑
primitive.RollbackMessageState
的执行逻辑,不发送消息
primitive.UnknowState
未知错误,消息回查
关注我获取更新