基于 Docker 的 pubsub 包实现的 gRPC 的发布与订阅
1. 创建对应的文件和目录
1 2 3 4 5 6 7
| mkdir pubsub cd pubsub go mod init pubsub
mkdir proto publisher subscribe touch server.go
|
2. 创建 proto 文件
1 2
| cd proto touch pubsub.proto
|
pubsub.proto 内容如下
1 2 3 4 5 6 7 8 9 10 11 12
| syntax = "proto3";
option go_package = "./;pb";
message Result { string value = 1; }
service PubSubService { rpc Publish (Result) returns (Result); rpc Subscribe (Result) returns (stream Result); }
|
如未安装 proto 扩展请先安装
执行 proto 命令
1
| protoc --go_out=plugins=grpc:. pubsub.proto
|
3. 进入 pubsub 编辑 server.go 文件;内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package main
import ( "context" "fmt" "github.com/docker/docker/pkg/pubsub" grpc "google.golang.org/grpc" "log" "net" "strings"
pb "pubsub/proto" "time" )
type PubSubService struct { pub *pubsub.Publisher }
func main() { grpcServer := grpc.NewServer() pb.RegisterPubSubServiceServer(grpcServer, NewPubSubService())
listener, err := net.Listen("tcp", ":1234") if err != nil { log.Fatal("listen err: ", err) } else { fmt.Println("Service Start.") } grpcServer.Serve(listener) }
func NewPubSubService() *PubSubService { return &PubSubService{pub: pubsub.NewPublisher(100*time.Millisecond, 10)} }
func (pub *PubSubService) Publish(ctx context.Context, arg *pb.Result) (*pb.Result, error) { value := arg.GetValue() pub.pub.Publish(value) return &pb.Result{Value: value}, nil }
func (pub *PubSubService) Subscribe(arg *pb.Result, stream pb.PubSubService_SubscribeServer) error { ch := pub.pub.SubscribeTopic(func(v interface{}) bool { if key, ok := v.(string); ok { if strings.HasPrefix(key, arg.GetValue()) { return true } } return false })
for v := range ch { if err := stream.Send(&pb.Result{Value: v.(string)}); err != nil { return err } } return nil }
|
4. 进入 subscribe 目录,编写 subscribe.go 内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package main
import ( "context" "fmt" "google.golang.org/grpc" "io" "log" pb "pubsub/proto" )
func main() { conn, err := grpc.Dial("127.0.0.1:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close()
client := pb.NewPubSubServiceClient(conn)
stream, err := client.Subscribe( context.Background(), &pb.Result{Value: "docker:"}, ) if err != nil { log.Fatal(err) }
for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(reply.GetValue()) }
stream, err = client.Subscribe( context.Background(), &pb.Result{Value: "docker:"}, ) if err != nil { log.Fatal(err) }
for { reply, err := stream.Recv() if err != nil { if err == io.EOF { break } log.Fatal(err) } fmt.Println(reply.GetValue()) } }
|
5. 进入 publisher 目录,编写 publisher.go 内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package main
import ( "context" "fmt" "log" pb "pubsub/proto"
grpc "google.golang.org/grpc" )
func main() { conn, err := grpc.Dial("127.0.0.1:1234", grpc.WithInsecure()) if err != nil { log.Fatal(err) } defer conn.Close()
client := pb.NewPubSubServiceClient(conn) str, err := client.Publish( context.Background(), &pb.Result{Value: "docker: hello Docker"}, ) if err != nil { log.Fatal(err) } else { fmt.Println("Publish: ", str.GetValue()) } }
|
6. 启动服务命令
1 2 3 4 5 6 7 8 9
| 进入根目录执行 go run server.go // 启动服务 进入 subscribe 目录执行 go run subscribe.go // 订阅服务 进入 publisher 目录执行 go run publisher.go // 发布
执行 发布之后,就能通过订阅窗口 有内容输出 docker: hello Docker
|