kratos 框架商城微服务实战之用户服务API (五)

本文最后更新于:2 个月前

Go-kratos 框架商城微服务实战之用户服务 (五)

这篇主要给服务加入链路追踪,完善 consul,并测试 shop 的 http api 接口 文章写的不清晰的地方可通过 GitHub 源码进行查看, 也感谢您指出不足之处,欢迎大佬指教。

注:竖排 … 代码省略,为了保持文章的篇幅简洁,我会将一些不必要的代码使用竖排的 . 来代替,你在复制本文代码块的时候,切记不要将 . 也一同复制进去。

准备工作

安装 consul

1
2
3
# 这里使用的是 docker 工具进行创建的
docker run -d -p 8500:8500 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8600:8600/udp consul consul agent -dev -client=0.0.0.0

jaeger 的安装

1
2
3
# 这里使用的是 docker 工具进行创建的
docker run --rm --name jaeger -p14268:14268 -p16686:16686 jaegertracing/all-in-one
// 执行完毕之后,切记别退出服务
  • 浏览器访问 http://127.0.0.1:16686/ 验证是否安装成功

user 服务添加配置代码

consul 的配置前几篇文章都已经添加过了,这里就不重复添加了

  • user 项目中添加
1
2
3
4
5
6
7
 # user/configs/config.yaml 配置文件新增

...

trace:
endpoint: http://127.0.0.1:14268/api/traces

  • 修改 user 的配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
...

message Bootstrap {
Server server = 1;
Data data = 2;
Trace trace = 3; // 此处为新增的配置
}

...

message Trace {
string endpoint = 1;
}
  • 生成 user 的 conf 文件
1
2
user 根目录执行命令,生成新的配置文件
make config
  • 修改 grpc.go 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package server

import (
.
.
.
"github.com/go-kratos/kratos/v2/middleware/tracing" // 新增引入
)

// NewGRPCServer new a gRPC server.
func NewGRPCServer(c *conf.Server, u *service.UserService, logger log.Logger) *grpc.Server {
var opts = []grpc.ServerOption{
grpc.Middleware(
recovery.Recovery(),
tracing.Server(), // 新增 tracing
),
}
.
.
.
v1.RegisterUserServer(srv, u)
return srv
}
  • 修改 main.go 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main

import (
"flag"
"github.com/go-kratos/kratos/v2/registry"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"os"

"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/transport/grpc"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"user/internal/conf"
)

// go build -ldflags "-X main.Version=x.y.z"
var (
// Name is the name of the compiled software.
Name = "shop.user.service"
// Version is the version of the compiled software.
Version "user.v1"
// flagconf is the config flag.
flagconf string

id, _ = os.Hostname()
)

func init() {
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
}

func newApp(logger log.Logger, gs *grpc.Server, rr registry.Registrar) *kratos.App {
return kratos.New(
kratos.ID(id+"user service"),
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
kratos.Server(
gs,
),
kratos.Registrar(rr), // 服务注册与发现
)
}

// Set global trace provider 设置链路追逐的方法
func setTracerProvider(url string) error {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return err
}
tp := tracesdk.NewTracerProvider(
// Set the sampling rate based on the parent span to 100%
tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))),
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in an Resource.
tracesdk.WithResource(resource.NewSchemaless(
semconv.ServiceNameKey.String(Name),
attribute.String("env", "dev"),
)),
)
otel.SetTracerProvider(tp)
return nil
}

func main() {
flag.Parse()
logger := log.With(log.NewStdLogger(os.Stdout),
"ts", log.DefaultTimestamp,
"caller", log.DefaultCaller,
"service.id", id,
"service.name", Name,
"service.version", Version,
"trace_id", tracing.TraceID(),
"span_id", tracing.SpanID(),
)
c := config.New(
config.WithSource(
file.NewSource(flagconf),
),
)
defer c.Close()

if err := c.Load(); err != nil {
panic(err)
}

var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
panic(err)
}
// 加入链路追踪的配置
if err := setTracerProvider(bc.Trace.Endpoint); err != nil {
panic(err)
}

var rc conf.Registry
if err := c.Scan(&rc); err != nil {
panic(err)
}

app, cleanup, err := initApp(bc.Server, &rc, bc.Data, logger)
if err != nil {
panic(err)
}
defer cleanup()

// start and wait for stop signal
if err := app.Run(); err != nil {
panic(err)
}
}

修改 wire.go 文件

1
2
根目录执行命令,生成新的 wire_gen.go 文件
make wire

shop 项目中添加配置代码

前几篇已经把 consul service 的一些配置加入到了 config 文件中,这里就不重复添加了

  • 修改 config.yaml 文件

    考虑到这个配置文件的重要性,这里贴出来了全部的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 name: shop.api
server:
http:
addr: 0.0.0.0:8097
timeout: 1s
grpc:
addr: 0.0.0.0:9001
timeout: 1s
data:
database:
driver: mysql
source: root:root@tcp(127.0.0.1:3306)/test
redis:
addr: 127.0.0.1:6379
read_timeout: 0.2s
write_timeout: 0.2s
trace:
endpoint: http://127.0.0.1:14268/api/traces
auth:
jwt_key: hqFr%3ddt32DGlSTOI5cO6@TH#fFwYnP$S
service:
user:
endpoint: discovery:///shop.user.service
goods:
endpoint: discovery:///shop.goods.service

  • 修改 conf.proto 文件

    考虑到这个配置文件的重要性,这里贴出来了全部的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
syntax = "proto3";
package shop.api;

option go_package = "shop/internal/conf;conf";

import "google/protobuf/duration.proto";

message Bootstrap {
Server server = 1;
Data data = 2;
Trace trace = 3;
Auth auth = 4;
Service service = 5;
}

message Server {
message HTTP {
string network = 1;
string addr = 2;
google.protobuf.Duration timeout = 3;
}
message GRPC {
string network = 1;
string addr = 2;
google.protobuf.Duration timeout = 3;
}
HTTP http = 1;
GRPC grpc = 2;
}

message Data {
message Database {
string driver = 1;
string source = 2;
}
message Redis {
string network = 1;
string addr = 2;
google.protobuf.Duration read_timeout = 3;
google.protobuf.Duration write_timeout = 4;
}
Database database = 1;
Redis redis = 2;
}

message Service {
message User {
string endpoint = 1;
}
message Goods {
string endpoint = 1;
}
User user = 1;
Goods goods = 2;
}

message Trace {
string endpoint = 1;
}


message Registry {
message Consul {
string address = 1;
string scheme = 2;
}
Consul consul = 1;
}

message Auth {
string jwt_key = 1;
}

  • 生成新的配置
1
2
user 根目录执行命令,生成新的配置文件
make config
  • 修改链接用户服务的连接

    shop/internal/data/data.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package data

import (
"context"
consul "github.com/go-kratos/kratos/contrib/registry/consul/v2"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/google/wire"
consulAPI "github.com/hashicorp/consul/api"
grpcx "google.golang.org/grpc"
userV1 "shop/api/service/user/v1"
"shop/internal/conf"
"time"
)

// ProviderSet is data providers.
var ProviderSet = wire.NewSet(NewData, NewUserRepo, NewUserServiceClient, NewRegistrar, NewDiscovery)

// Data .
type Data struct {
log *log.Helper
uc userV1.UserClient
}

// NewData .
func NewData(c *conf.Data, uc userV1.UserClient, logger log.Logger) (*Data, error) {
l := log.NewHelper(log.With(logger, "module", "data"))
return &Data{log: l, uc: uc}, nil
}

// NewUserServiceClient 链接用户服务 grpc
func NewUserServiceClient(ac *conf.Auth, sr *conf.Service, rr registry.Discovery) userV1.UserClient {
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint(sr.User.Endpoint),
grpc.WithDiscovery(rr),
grpc.WithMiddleware(
tracing.Client(), // 链路追踪
recovery.Recovery(),
),
grpc.WithTimeout(2*time.Second),
grpc.WithOptions(grpcx.WithStatsHandler(&tracing.ClientHandler{})),
)
if err != nil {
panic(err)
}
c := userV1.NewUserClient(conn)
return c
}

// NewRegistrar add consul
func NewRegistrar(conf *conf.Registry) registry.Registrar {
c := consulAPI.DefaultConfig()
c.Address = conf.Consul.Address
c.Scheme = conf.Consul.Scheme
cli, err := consulAPI.NewClient(c)
if err != nil {
panic(err)
}
r := consul.New(cli, consul.WithHealthCheck(false))
return r
}

func NewDiscovery(conf *conf.Registry) registry.Discovery {
c := consulAPI.DefaultConfig()
c.Address = conf.Consul.Address
c.Scheme = conf.Consul.Scheme
cli, err := consulAPI.NewClient(c)
if err != nil {
panic(err)
}
r := consul.New(cli, consul.WithHealthCheck(false))
return r
}

  • 修改 server http 服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package server

import (
"context"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/auth/jwt"
"github.com/go-kratos/kratos/v2/middleware/logging"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/middleware/selector"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/middleware/validate"
"github.com/go-kratos/kratos/v2/transport/http"
jwt2 "github.com/golang-jwt/jwt/v4"
"github.com/gorilla/handlers"
v1 "shop/api/shop/v1"
"shop/internal/conf"
"shop/internal/service"
)

// NewHTTPServer new an HTTP server.
func NewHTTPServer(c *conf.Server, ac *conf.Auth, s *service.ShopService, logger log.Logger) *http.Server {
var opts = []http.ServerOption{
http.Middleware(
recovery.Recovery(),
validate.Validator(),
tracing.Server(), // 这里是本篇新增的
selector.Server(
jwt.Server(func(token *jwt2.Token) (interface{}, error) {
return []byte(ac.JwtKey), nil
}, jwt.WithSigningMethod(jwt2.SigningMethodHS256)),
).Match(NewWhiteListMatcher()).Build(),
logging.Server(logger),
),
http.Filter(handlers.CORS(
handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}),
handlers.AllowedMethods([]string{"GET", "POST", "PUT", "HEAD", "OPTIONS"}),
handlers.AllowedOrigins([]string{"*"}),
)),
}

...

return srv
}

...

  • 修改启动文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main

import (
"flag"
"os"

"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/config/file"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/tracing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.7.0"

"shop/internal/conf"
)

// go build -ldflags "-X main.Version=x.y.z"
var (
// Name is the name of the compiled software.
Name = "shop.api"
// Version is the version of the compiled software.
Version = "shop.api.v1"
// flagconf is the config flag.
flagconf string

id, _ = os.Hostname()
)

func init() {
flag.StringVar(&flagconf, "conf", "../../configs", "config path, eg: -conf config.yaml")
}

func newApp(logger log.Logger, hs *http.Server, gs *grpc.Server, rr registry.Registrar) *kratos.App {
return kratos.New(
kratos.ID(id+"shop.api"),
kratos.Name(Name),
kratos.Version(Version),
kratos.Metadata(map[string]string{}),
kratos.Logger(logger),
kratos.Server(
hs,
//gs,
),
kratos.Registrar(rr),
)
}

func main() {
flag.Parse()
logger := log.With(log.NewStdLogger(os.Stdout),
"ts", log.DefaultTimestamp,
"caller", log.DefaultCaller,
"service.id", id,
"service.name", Name,
"service.version", Version,
"trace_id", tracing.TraceID(),
"span_id", tracing.SpanID(),
)
c := config.New(
config.WithSource(
file.NewSource(flagconf),
),
)
defer c.Close()

if err := c.Load(); err != nil {
panic(err)
}

var bc conf.Bootstrap
if err := c.Scan(&bc); err != nil {
panic(err)
}

var rc conf.Registry
if err := c.Scan(&rc); err != nil {
panic(err)
}

err := setTracerProvider(bc.Trace.Endpoint)
if err != nil {
panic(err)
}

app, cleanup, err := initApp(bc.Server, bc.Data, bc.Auth, bc.Service, &rc, logger)
if err != nil {
panic(err)
}
defer cleanup()

// start and wait for stop signal
if err := app.Run(); err != nil {
panic(err)
}
}

func setTracerProvider(url string) error {
// Create the Jaeger exporter
exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
if err != nil {
return err
}
tp := tracesdk.NewTracerProvider(
// Set the sampling rate based on the parent span to 100%
tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))),
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in an Resource.
tracesdk.WithResource(resource.NewSchemaless(
semconv.ServiceNameKey.String(Name),
attribute.String("env", "dev"),
)),
)
otel.SetTracerProvider(tp)
return nil
}

  • 修改 wire.go 文件
1
2
根目录执行命令,生成新的 wire_gen.go 文件
make wire

完整流程测试

  • 启动 user 服务
1
2
user 目录下执行命令
kratos run
  • 启动 shop 服务
1
2
shop 目录下执行命令
kratos run

访问用户创建接口

这里使用的是 apipost 接口测试工具, 具体操作看图示,注意传入的参数类型为 json

如图正确返回了,就证明接口访问成功,可以去数据库表中验证,是否是同样的数据插入

如图所示选择 shop.api 然后点击下方的 Find Traces

搜索之后点击进去,看到如图

这里需要注意,在咱们的代码中并未设置成统一的 SpanId,只有 TraceId 是一样的。

结束语

整个服务流程已经通了,这里需要注意的点是,访问用户登陆接口的时候,需要先获取一个验证码接口,然后拿到验证码给的 ID 和 code 进行登陆请求。code 是个 url 需要通过浏览器访问才能看到具体 code 是什么。还有查询用户详细信息的时候,需要携带注册或登陆给的 token。携带的方式是 bearer auth 的方式。

接下来会开始完善用户服务的其他信息,如:用户的收获地址之类的。感谢您的耐心阅读,动动手指点个赞吧。

参考

Go-kratos https://go-kratos.dev/docs/

关注我获取更新