微服务|RPC/gRPC

本文最后更新于:a year ago

本文回答了 RPC 是什么、为什么使用 RPC 等问题,并详细探讨了 protobuf 协议和 gRPC 框架。

1.RPC

1.1 RPC 是什么

  • RPC(Remote Procedure Call)远程过程调用。简单说就是,一个节点请求另一个节点提供的服务。
  • 函数调用是最常见的本地过程调用。
  • RPC 就是将本地过程调用,变成远程过程调用。

1.2 RPC 需要解决的问题

将本地函数放在服务器上运行,有三个主要问题需要解决。

(1)Call 的 ID 映射

  • 本地调用是通过函数指针来调用。
  • 在 RPC 中,所有函数必须有自己的 ID,且 ID 在所有进程中是唯一的。
  • 客户端与服务端分别维护一个函数与 Call ID 的映射表,二者的表不一定相同,但相同函数对应的 Call ID 必须相同。
  • 当客户端需要进行远程调用时,查表得到 Call ID,并传给服务端;服务端通过 Call ID 查表得到所调用的函数,并执行对应的函数代码。

(2)序列化和反序列化

  • 序列化:把对象转化为字节流,进行网络传输。
  • 反序列化:把网络中接收的字节流转化为对象。

(3)网络传输

  • 大部分 RPC 框架都基于 TCP/UDP 协议进行封装。
  • HTTP 1.x 一旦对方返回结果,就会断开连接,因此存在性能问题。
  • gRPC 是基于 HTTP 2.0 的,HTTP 2.0 支持长连接。

1.3 通过 HTTP 完成 Add 服务

通过 net/http 库,使用 get 请求完成远程的 add 服务。在这个过程中需要完成 RPC 的三件重要事情:

  • Call ID 的映射:使用特定的请求路径来标识,r.URL.path
  • 序列化和序列化:使用 json.Marshal 完成。
  • 网络传输:使用 http 发送 get 请求。

使用 http 的方式完成 RPC 的问题有两点,这是 RPC 框架需要解决的问题。

  • 写业务逻辑比较麻烦,每个函数都需要写一个请求。
  • 客户端和服务端需要清晰地明确参数是如何传递的。

(1)服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
http.HandleFunc("/add", func(writer http.ResponseWriter, request *http.Request) {
// 解析并获取参数
_ = request.ParseForm()
fmt.Println("path: ", request.URL.Path)
a, _ := strconv.Atoi(request.Form["a"][0])
b, _ := strconv.Atoi(request.Form["b"][0])

// 构造请求头
writer.Header().Set("Content-Type", "application/json")

// 序列化成 []byte 类型
bytes, _ := json.Marshal(map[string]int{
"data": a + b,
})

// 写入 respond
_, _ = writer.Write(bytes)
})

// 监听 8080 端口
_ = http.ListenAndServe(":8000", nil)
}

(2)客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type ResponseData struct {
Data int `json:"data"`
}

func Add(a, b int) int {

// 发送 get 请求
response, _ := http.Get(fmt.Sprintf("http://127.0.0.1:8000/%s?a=%d&b=%d", "add", a, b))
defer response.Body.Close() // 程序结束关闭 body

// 读取 body
body, _ := ioutil.ReadAll(response.Body)
responseData := ResponseData{} // 初始化结构体
json.Unmarshal(body, &responseData) // 反序列化 json,并将数据存入结构体

return responseData.Data
}
func main() {
fmt.Println(Add(1, 2))
}

1.4 RPC 开发的要素

(1)RPC 开发的四大要素

  • client:负责发起服务调用,传递参数。
  • clent stub:运行在客户端机器上,负责存储要调用的服务端地址等信息,还负责将客户端的请求打包成数据包,发送给服务端。
  • server:有客户端要调用的方法,负责执行调用的方法。
  • server stub:运行在服务端机器上,负责接受客户端的数据包,并调用在 server 上的方法,并将调用结果进行数据处理,打包返回给客户端。

(2)原理图

(3)过程

  • client:client 想要发起远程过程调用,通过调用 client stub 的方式,传递想要调用的方法及参数。
  • client stub:接收到 client 的调用请求,将 client 请求调用的方法名、参数等信息序列化,打包成数据包;并查找到远程服务器的 IP 地址及端口。
  • socket:,通过 socket 协议,将数据包发送给服务端。
  • server stub:接受客户端的数据包,并通过约定好的协议进行反序列化,得到请求的方法名和参数;并调用 server 对应的方法,并传入参数。
  • server:执行被调用的方法,处理业务;并将结果返回给 server stub。
  • server stub:将 server 返回的结果按照约定的协议,进行序列化,打包成数据包。
  • socket:通过 socket 协议,将数据包发送给服务端。
  • client stub:接收到返回的数据,按照约定进行反序列化,并将调用结果传给 client。
  • client:得到调用结果。

至此,整个 RPC 调用完成。

(4)动态代理技术

在 client stub 和 server stub 中,会使用 动态代理技术 自动生成一段代码,这样我们就可以更专注于业务的编码,不用对每个函数调用进行都进行封装一遍。

1.5 使用 go 内置的 RPC

下面简单介绍 go 内置的 RPC,主要用到 netnet/rpc 这两个包。

(1)服务端

  1. 实例化 server
  2. 将函数注册到 RPC 中
  3. 启动服务

RPC 主要解决了的问题:call id、序列化和反序列化(使用的 Gob 协议)。listen 和 accept 的都是 net 包完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// HelloService :想要将多个方法注册到 rpc 当中,可以使用 struct 来封装多个方法,然后注册 struct 即可。
type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error { // reply 传出参数
*reply = "hello " + request
return nil
}

func main() {
// 1.实例化 server
listener, _ := net.Listen("tcp", "localhost:8899")

// 2.将函数注册到 RPC 中
_ = rpc.RegisterName("CommonHelloService", &HelloService{})
/*
第一个参数:是服务的名字,随便起,与结构体 HelloService 命名无关,只要 client/server 在 Call 和 RegisterName 时保持一致就行。
第二个参数:为什么这里要取地址?因为结构体指针实现了 interface。具体原因,见下文。
*/

// 3.启动服务
conn, _ := listener.Accept() // conn 是 ServerCodec 类型,里面封装了读/写/关闭操作。

rpc.ServeConn(conn)
}

补充:为什么 RegisterName 的第二个参数要取地址?

这里接收 &HelloService{} 是一个 interface 变量,传给 interface 变量值还是指针,由 HelloService 在实现函数时,用的指针接受者还是值接受者决定。所以,

  • 如果定义成 func (s *HelloService) Hello(...),此处只能传递指针类型 &HelloService{}
  • 如果定义成 func (s HelloService) Hello(...),此处可以传递指针类型 &HelloService{} 或值类型 HelloService{}

如果不知道为什么,建议阅读 接口的值类型

(2)客户端

  1. 建立连接
  2. 远程调用函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {

// 1. 建立连接
client, err := rpc.Dial("tcp", "localhost:8899")
if err != nil {
panic("connect fail.")
}

// 2.远程调用函数
var reply string
err = client.Call("CommonHelloService.Hello", "xiaomi", &reply)
if err != nil {
panic("call fail.")
}
fmt.Println(reply)
}

1.6 序列化格式使用 json

RPC 使用的序列化协议是 Gob,而 json 是使用更加广泛的格式,为了满足跨语言的需求,一般会使用 json 格式来编解码数据。下面将其换成 json。

(1)服务端

只需要将 rpc.ServeConn 换成 rpc.ServeCodec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// HelloService :想要将多个方法注册到 rpc 当中,可以使用 struct 来封装多个方法,然后注册 struct 即可。
type HelloService struct {}

func (s *HelloService) Hello(request string, reply *string) error { // reply 传出参数
*reply = "hello " + request
return nil
}

func main() {
// 1.实例化 server
listener, _ := net.Listen("tcp", "localhost:8899")

// 2.将函数注册到 RPC 中
_ = rpc.RegisterName("HelloService", &HelloService{}) // 如上文所述原因,需传递指针变量。

// 3.启动服务
conn, _ := listener.Accept() // conn 是 ServerCodec 类型,里面封装了读/写/关闭操作。

rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) // ServeCodec: 使用指定的编解码器来解码请求和编码响应。这里传入使用 json 编解码的 ServerCodec。
}

(2)客户端

需要将 rpc.Dial 换成 net.Dial,否则使用 rpc.Dial 会使用 Gob 的协议进行编解码。并通过 conn 生成一个 ClientCodec。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {

// 1. 建立连接
conn, err := net.Dial("tcp", "localhost:8899")
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) // 使用json 编解码的 ClientCodec。
if err != nil {
panic("connect fail.")
}

// 2.远程调用函数
var reply string
err = client.Call("HelloService.Hello", "xiaomi", &reply)
if err != nil {
panic("call fail.")
}
fmt.Println(reply)
}

1.7 监听端口使用 http

内置的 RPC 监听的是 TCP 端口,跨语言完成调用时,客户端需要使用 socket 来传递 TCP 数据报。如果监听 http 请求,客户端只需发送 Get/Post 请求即可。

(1)服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
// 1.将函数注册到 RPC 中
_ = rpc.RegisterName("HelloService", &HelloService{}) // 如上文所述原因,需传递指针变量。

// 2.http 处理函数
http.HandleFunc("/jsonrpc", func(writer http.ResponseWriter, request *http.Request) {
var conn io.ReadWriteCloser = struct {
io.Writer
io.ReadCloser
}{
ReadCloser: request.Body,
Writer: writer,
}
rpc.ServeRequest(jsonrpc.NewServerCodec(conn)) // rpc.ServeRequest 只要拿到带有 writer/reader 的 conn 就可以。
})

// 3.监听 http 端口
http.ListenAndServe(":8899", nil)
}

(2)客户端

// 待补充

1.8 封装 Stub

需求:

  1. client 使用 client.Hello(request, &reply) 的格式优雅远程调用。
  2. client/server 使用同一个服务命令,手动写可能出现服务名称不一致/服务名称冲突的错误。
  3. server 可以专注业务逻辑。
  4. sever 无序关注服务名称、结构体类型。

解决:

  • 需求一,使用 clientStub 将 Dial 和 Call 过程进行封装。
  • 需求二,使用共同的 handler 文件,在 handler 中将服务名称定义成常量。
  • 需求三,将业务逻辑部分抽离出来,放到其他文件中(这里为了方便演示,放在了 handler 中,其实不能放在 handler 中,handler 是共有文件,应该新建一个文件。)。
  • 需求四,使用 serverStub 将 rpc.RegisterName 进行封装,并使用接口接收结构体指针,使 serverStub 与结构体类型解耦。

(1)handler

1
2
3
4
5
6
7
8
9
10
11
12
13
package handler

const HelloServiceName = "handler/CommonHelloService" // 这是服务的名字,随便起,与结构体 HelloService 命名无关,只要 client/server 在 Call 和 RegisterName 时保持一致就行。

// HelloService :想要将多个方法注册到 rpc 当中,可以使用 struct 来封装多个方法,然后注册 struct 即可。
type HelloService struct{}

// 实际的业务逻辑(真正被远程调用的函数)
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello " + request
return nil
}

(2)clientStub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package clientStub

import (
"learngo/RPCWithStub/handler"
"net/rpc"
)

type HelloServiceStub struct {
*rpc.Client // 最终用来 call 调用函数的是 *rpc.Client。这里封装成 HelloServiceStub,便于在 client 中,直接使用 HelloServiceStub.Hello 的形式调用。
}

// NewHelloServiceClient go 中没有类、对象,所以没有初始化方法。这里需要写一个 New 方法,获取到 *rpc.Client,实例化一个 HelloServiceStub。
func NewHelloServiceClient(protocol, address string) (HelloServiceStub, error) {
conn, err := rpc.Dial(protocol, address)
if err != nil {
panic(err)
}

return HelloServiceStub{conn}, nil
}

func (c HelloServiceStub) Hello(request string, reply *string) error { // 实现要调用的方法
err := c.Call(handler.HelloServiceName+".Hello", request, reply)
if err != nil {
panic(err)
}
return nil
}

(3)client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"

"learngo/RPCWithStub/clientStub"
)

func main() {

// 1.建立连接,获取到一个 *rpc.Client。
client, _ := clientStub.NewHelloServiceClient("tcp", "localhost:8899")

// 2.远程调用函数
var reply string
err := client.Hello("xiaomi", &reply) // 实现优雅调用。
if err != nil {
panic("call fail.")
}
fmt.Println(reply)
}

(4)serverStub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package serverStub

import (
"learngo/RPCWithStub/handler"
"net/rpc"
)

// 使用接口,将 RegisterHelloService 形参类型 与 handler 中的结构体类型解耦。
type HelloServer interface {
Hello(request string, reply *string) error
}

func RegisterHelloService(srv HelloServer) error { // 只要实现了 Hello 方法,都可以将结构体传给 HelloServer 类型的接口变量。
return rpc.RegisterName(handler.HelloServiceName, srv)
}

(5)server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"learngo/RPCWithStub/handler"
"learngo/RPCWithStub/serverStub"
"net"
"net/rpc"
)

func main() {
// 1.实例化 server
listener, _ := net.Listen("tcp", "localhost:8899")

// 2.将函数注册到 RPC 中
// 此处封装了注册过程中结构体的类型,结构体的类型不重要,重点是 Hello 这个函数,只要带有 Hello 方法的结构体即可。
_ = serverStub.RegisterHelloService(&handler.HelloService{})

// 3.启动服务
conn, _ := listener.Accept()

rpc.ServeConn(conn)
}

1.9 总结

为什么要花篇幅来分析内置 RPC 的使用?因为以上所解决的问题,就是 gRPC 或其他 RPC 框架所解决的问题。

更重要的是 serverStub/clientStub 可以通过 protobuf 自动生成,并且可以生成多语言版本。

2.protobuf

Reference Documents:https://developers.google.com/protocol-buffers/docs/proto3

2.1 是什么

Protocol Buffer,是 google 退出的一种轻量高效的结构化数据存储格式。和 json、xml 作用一样,但 protobuf 的性能远超 json、xml,数据压缩比比较高。

protobuf 经历了 protobuf2 和 protobuf3,目前主流版本是 protobuf3。

同类的格式还有:java 中的 dubbo/rmi/hessian,python 中的 messagepack,go 中的 gob。

2.2 protobuf 优缺点

优点:

  • 性能:压缩性好(压缩得越小,传输越快),序列化/反序列化快(比 json、xml 快 2-100 倍)。
  • 便捷性:使用简单(自动生成序列化/反序列化代码),维护成本低(只需要维护 proto 文件),向后兼容(增加内容,可以不破坏旧格式),加密性好(二进制流)。
  • 跨语言:跨平台,支持各种主流语言。

缺点:

  • 通用型差:任何语言都支持 json,但是 protobuf 需要专门的解析库。
  • 自解释性差:加密成了二进制流,只要通过 proto 文件才能了解数据结构。

2.3 环境配置

需要安装 protoc,下载 protoc-gen-go 依赖包。

1
2
3
brew install protobuf  #  这是 macOS 的安装命令,其他系统在上面的链接中自行查找。
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest # 生成 go 源代码用的。
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest #支持生成 grpc 部分代码

2.4 最小实践

protobuf 最原始的功能就是数据编码,使用主要分三步;

  1. 使用 proto 文件约束数据类型。(也就是结构体。)
  2. 自动生成源代码。
  3. 使用 github.com/golang/protobuf/proto 对数据进行序列化/反序列化。

注意:使用 proto 只能进行数据编码,加上 grpc 插件后,可以定义一些服务。

(1)使用 proto 文件约束数据类型

注意:1 是编号不是值,编码时只认编号不认变量名。(**坑:**联调时两个人编号不一致。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
syntax = "proto3";  // 使用 proto3 的语法
option go_package = ".;proto"; // 生成源码文件的包名,只影响 go 语言。具体使用见 2.6 go_package 的作用

service SayHello { // 定义服务(也就是方法接口) // 使用 grpc 插件才可以使用。
rpc Hello(HelloRequest) returns (HelloResponse);
}

message HelloRequest { // 定义消息类型(也就是结构体类型)
string name = 1; // 1 是编号不是值,编码时只认编号不认变量名。坑:联调时两个人编号不一致。
}

message HelloResponse {
string reply = 1;
}

(2)自动生成 go 源码

已经废弃的写法:

1
protoc -I . helloworld.proto --go_out=plugins=grpc:.
  • -I .:表示 include 当前目录。在当前目录寻找 helloworld.proto。
  • --go_out:表示生成 go 的源码,--java_out 即可生成 java 的源码。等号后面可以加选项,选项使用键值对表示。
  • plugins=grpc:一个键值对,表示使用 grpc 插件。
  • :.:使用 : 将前面的键值对和路径隔开。. 表示在当前目录下生成 bp.go 文件

推荐的写法:

1
protoc -I . helloworld.proto --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative
  • -I .:表示 include 当前目录。在当前目录寻找 helloworld.proto。
  • --go_out=.:表示生成用于 protobuf 编码的 go 源码,--java_out 即可生成 java 的源码。等号后面是生成 bp.go 的路径。
  • --go_opt=:等号后面可以加选项,选项使用键值对表示。
  • --go-grpc_out=.:表示生成 grpc 的源码。等号后面是生成 grpc 的 bp.go 的路径。
  • --go-grpc_opt=:等号后面可以加选项,选项使用键值对表示。只对 grpc 部分代码有效。

主要有两个变化:

  • 将键值对选项和生成路径分开: --go_out 只表示生成 bp.go 的路径,另用 --go_opt 表示键值对选项。
  • 将用于 protobuf 编码的 go 源码和 grpc 的源码分开生成:--go_out 表示前者,go-grpc_out 表示后者。

(4)使用 protobuf 编解码

使用 proto 序列化/反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
helloworld "learngo/grpc/proto"

"github.com/golang/protobuf/proto"
)

func main() {

// 实例化一个 struct
req := helloworld.HelloRequest{
Name: "Tom",
}

// 编码
bytes, _ := proto.Marshal(&req)
fmt.Println(bytes) // [10 3 84 111 109] // 编码后自解释性差

// 解码
newBytes := helloworld.HelloRequest{}
_ = proto.Unmarshal(bytes, &newBytes) // 传入一个空 struct 作为传出参数
fmt.Println(newBytes.Name) // Tom // 成功复原
}

2.5 基本类型和默认值

(1)基本类型

https://developers.google.com/protocol-buffers/docs/proto3#scalar

proto TypeNotesGo Type
doublefloat64
floatfloat32
int32Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead.使用可变长度编码。编码负数效率低下——如果您的字段可能具有负值,请使用 sint32。int32
int64Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead.使用可变长度编码。编码负数效率低下——如果您的字段可能具有负值,请使用 sint64。int64
uint32Uses variable-length encoding.使用可变长度编码。uint32
uint64Uses variable-length encoding.uint64
sint32Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s.使用可变长度编码。有符号的 int 值。这些比常规 int32 更有效地编码负数。int32
sint64Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s.使用可变长度编码。有符号的 int 值。这些比常规 int64 更有效地编码负数。int64
fixed32Always four bytes. More efficient than uint32 if values are often greater than 228.总是四个字节。如果值通常大于228,则比uint32更有效率。uint32
fixed64Always eight bytes. More efficient than uint64 if values are often greater than 256.总是八字节。如果值通常大于256,则比uint64更有效率。uint64
sfixed32Always four bytes.int32
sfixed64Always eight bytes.int64
boolbool
stringA string must always contain UTF-8 encoded or 7-bit ASCII text, and cannot be longer than 232.字符串必须始终包含UTF-8编码或7位ASCII文本,并且不能超过232。string
bytesMay contain any arbitrary sequence of bytes no longer than 232.可以包含任何不超过232的任意字节序列。[]byte

(2)默认值

https://developers.google.com/protocol-buffers/docs/proto3#default

当解析消息时,如果编码的消息不包含特定的 singular 元素,则解析对象中的相应字段将设置为该字段的默认值。

.proto Typedefault
string“”
byte‘’
boolfalse
数值类型0
enum默认时第一个定义的枚举值,必须为 0
message默认值根据使用的语言确定的

2.6 指定字段规则

(1)option

1
option go_package = "../common/stream/proto/v1;helloworld";
  • ../common/stream/proto/v1:指定生成 pb.go 文件所在的路径。
  • helloworld:生成 pb.go 文件的包名。
  • 注:go_package 只会对 go 进行约束,不会影响其他语言。

在开发过程中,会将 proto 文件和 pb.go 文件分开存放。文件结构如下,其中生成的 bp.go 文件可能是公共的,并且分目录和版本。

1
2
3
4
5
6
7
8
9
10
11
12
.
├── common
│   └── stream
│   └── proto
│   └── v1
│   └── stream.pb.go
├── proto
│   └── stream.proto
├── client
│   └── client.go
└── server
└── server.go

(2)singular

单数,表示字段出现 0 次或 1 次。proto3 每个字段的修饰符默认是 singular。

(3)repeated

复数,表示该字段可重复,对应的 go 中的切片。

1
2
3
message hello {
repeated int32 num = 1; // 在 go 中为 []int
}

2.7 嵌套 proto 文件

1)为什么需要嵌套 proto 文件?

(2)实际的例子:empty message 的应用

**需求:**Ping 是一个测试是否连通的服务,发送请求时无需传入参数。但 service 定义时必须传入 message,此时就可以使用 Empty message。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// hello.proto

syntax = "proto3";
option go_package=".;proto";

service Greeter {
// Ping 是一个用于测试是否连通的服务,发送请求时无需传入参数。
rpc Ping(Empty) returns (Pong); // rpc 服务必须传入一个 message,但函数本意并无需传入参数,所以只能写一个 Empty message。
}

message Pong {
int64 id = 1;
}

// Empty 是一个空的 message,很多 proto 文件中都会需要。
message Empty{

}

这是一个很常见的需求,无需每个 proto 文件中都定义一个 Empty message,会冲突。

**解决:**定义一个公共的 proto 文件,base.proto。在 hello.proto 中 improt base.proto。

1
2
3
4
5
6
7
8
9
10
11
12
13
// base.proto

syntax = "proto3";
option go_package=".;proto";

message Pong {
int64 id = 1;
}

// Empty 是一个空的 message,很多 proto 文件中都会需要。
message Empty{

}
1
2
3
4
5
6
7
8
9
10
// hello.proto

syntax = "proto3";
import "base.proto"; // 【import 同目录下的 proto 文件】
option go_package=".;proto";

service Greeter {
// Ping 是一个用于测试是否连通的服务,发送请求时无需传入参数。
rpc Ping(Empty) returns (Pong); // rpc 服务必须传入一个 message,但函数本意并无需传入参数,所以只能写一个 Empty message。
}

(3)使用 protobuf 内置的 message

注意 proto 文件中的导入方式。improt 时用路径表示,使用时用点表示

1
2
3
4
5
6
7
8
9
10
11
12
13
// hello.proto

syntax = "proto3";
import "google/protobuf/empty.proto"; // 【improt 内置的 message】
option go_package=".;proto";

service Greeter {
rpc Ping(google.protobuf.Empty) returns (Pong);
}

message Pong {
int64 id = 1;
}

go 文件使用 Empty message 类型时,需导入 empty.proto 文件中 go_package 路径。

1
2
3
4
5
6
7
8
9
// client.go

package main

import "github.com/golang/protobuf/ptypes/empty" // empty.proto 文件中 go_package 路径。

func main() {
empty.Empty{}
}

(4)improt 路径问题

improt 同目录下的 proto 文件可直接导入。但是导入不同目录下的 proto 文件,需要注意 import 不能使用相对路径。

解决:可以写 proto 文件名,并在 protoc 时使用 -I 参数在指定目录下寻找该 proto 文件。

比如在 3.5 验证器 小节,hello.proto 文件中需要 import validate.proto 文件。

1
protoc -I . -I $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/  hello.proto --go_out=. 

-I . -I $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/:表示在当前目录和 $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/ 这两个目录下,寻找 hello.proto 和 import 的文件。

2.8 嵌套 message 对象

(1)第一种方式

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = ".;proto";

message Result {
string name = 1;
string url = 2;
}

message Reply {
string message = 1;
repeated Result data = 2; // Result 数组
}

(2)第二种方式

内部嵌套,嵌套的 message 只用一次,防止 proto 文件中定义过多 message。

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = ".;proto";

message Reply {
string message = 1;
repeated Result data = 2;

message Result {
string name = 1; // Result 的序号和 Reply 的序号有很好的隔离性,互不干扰。
string url = 2;
}
}

使用上有些特殊,pb.go 文件里会定义为 Reply_Result,所以 go 中实例化时需要使用 Reply_Result。

1
2
3
4
5
6
7
8
type Reply_Result struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"`
}

2.9 enum 类型

(1)proto 文件定义

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = ".;enum";

message Reply {
string message = 1;
Gender gender = 2;
}

enum Gender { // 定义枚举类型
MALE = 0;
FEMALE = 1;
}

(2)go 文件中使用枚举类型

1
2
3
4
5
6
7
8
9
10
11
package main

import "learngo/repeated/enum" // 导入 bp.go 所在目录

func main() {
r := enum.Reply{
Message: "xxx",
Gender: enum.Gender_MALE, // 使用 枚举类型 赋值
}
fmt.Println(r)
}

2.10 map 类型

map 虽然方便,但是不要大量写 map,在传入参数和接收参数时,不知道具体字段是什么。

(1)proto 文件定义

1
2
3
4
5
6
syntax = "proto3";
option go_package = ".;enum";

message HelloReply {
map<string, string> m = 1; // 定义类型,传错类型会报错。
}

(2)go 文件中使用 map 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
mapproto "learngo/repeated/map_proto"
)

func main() {
reply := mapproto.HelloReply{
M: map[string]string{
"name": "Tom",
"company": "xiaomi",
},
}
fmt.Println(reply.M)
}

2.11 timestamp 类型

时间戳类型,protobuf 扩展的类型。

(1)proto 文件定义

timestamp 定义在 google/protobuf/timestamp.proto 文件中。

1
2
3
4
5
6
7
8
9
// hello.proto
syntax = "proto3";
option go_package = ".;hello";

import "google/protobuf/timestamp.proto";

message Reply {
google.protobuf.Timestamp requestTime= 1;
}

(2)go 文件中使用 timestamp 类型

自动生成的 bp.go 中是这样定义的,RequestTime 是 timestamppb.Timestamp 指针类型。

timestamppb.Timestamp 导入的是 timestamppb "google.golang.org/protobuf/types/known/timestamppb"

1
2
3
4
5
6
7
8
// hello.pb.go
type Reply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

RequestTime *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=requestTime,proto3" json:"requestTime,omitempty"`
}

所以在 go 中,需要导入 timestamppb "google.golang.org/protobuf/types/known/timestamppb"

具体使用如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// client.go
package main

import (
"fmt"
"time"

hello "learngo/timestamp/proto"

timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

func main() {
reply := hello.Reply{
RequestTime: timestamppb.New(time.Now()), // 【获取当前的时间戳】
}
fmt.Println(reply.RequestTime)
}

3.gRPC

3.1 最小实践

目的:server 实现一个 SayHello 的函数,client 进行远程调用。

实现:

  1. 使用 proto 文件,约定传输的消息类型、函数接口。
  2. 使用 protoc 生成 stub 文件 helloworld.pb.go 和 helloworld_gpc.pb.go,生成的主要内容是:
    • 消息类型的编解码;
    • client stub:实例化 client 的函数、client interface(用于封装远程调用函数);
    • server stub:server interface(用于封装远程调用函数)、注册 server 的函数。
  3. server 端:完成具体的业务逻辑来实现接口 + 实例化 grpc server、注册、监听。
  4. client 端:拨号、实例化 client、远程调用。

(1)helloworld.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
syntax = "proto3";
option go_package = ".;helloworld";

service Greeter { // 定义服务(也就是 interface)
rpc SayHello(HelloRequest) returns (HelloResponse);
}

message HelloRequest { // 定义消息类型(也就是结构体类型)
string name = 1; // 1 是编号不是值
}

message HelloResponse {
string reply = 1;
}

(2)server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"context" // 自带的
"net"

"google.golang.org/grpc" // 三方的

helloworld "learngo/grpc/proto" // 自己的
)

type Server struct{}

// SayHello 是 server 的接口,虽然在 proto 文件中没有定义 Context、error,但需要强制加上。
func (s *Server) SayHello(ctx context.Context, request *helloworld.HelloRequest) (*helloworld.HelloResponse, error) { // 具体业务逻辑
return &helloworld.HelloResponse{
Reply: "hello " + request.Name,
}, nil
}

func main() {

// 1.实例化一个 grpc 的 server
g := grpc.NewServer()

// 2.注册
// RegisterGreeterServer 是自动生成的函数。利用鸭子类型类传递实例化的结构体。
helloworld.RegisterGreeterServer(g, &Server{})

//3.启动监听
listener, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
panic("failed to listen:" + err.Error())
}
err = g.Serve(listener)
if err != nil {
panic("failed to start: " + err.Error())
}
}

(3)client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

helloworld "learngo/grpc/proto"
)

func main() {
// 1.拨号
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic("failed to dial: " + err.Error())
}
defer func(conn *grpc.ClientConn) { // 连接用完需关闭,使用闭包的方式。
err := conn.Close()
if err != nil {
panic("failed to close: " + err.Error())
}
}(conn)

// 2.实例化 client
// NewGreeterClient 是自动生成的代码。
client := helloworld.NewGreeterClient(conn)

// 3.远程调用
response, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "Tom"})
if err != nil {
panic("failed to call: " + err.Error())
}
fmt.Println(response.Reply)
}

3.2 流模式

3.2.1 四种模式

(1)简单模式(Simple RPC)

客户端发起一次请求,服务端响应一次数据。

(2)服务端数据流模式(Server-side streaming RPC)

客户端发起一次请求,服务端返回一段连续的数据流。例如:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据持续地返回给客户端。

(3)客户端数据流模式(Client-side streaming RPC)

客户端持续向服务端发送数据流,发送结束后,服务端返回一个响应。例如:传感器向服务端上报数据。

(4)双向数据流模式(Bidirectional streaming RPC)

客户端与服务端都可以向对方发送数据流,可以同时互相发送,实现实时交互。例如聊天机器人。

3.2.2 流模式最小实践

(1)stream.proto

在 service 中,以流模式发送的数据使用 stream 关键字定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
syntax = "proto3";

option go_package=".;proto";

service Greeter {
rpc GetStream(StreamRequestData) returns (stream StreamResponseData); // 服务端流模式,返回 response 是 stream
rpc PostStream(stream StreamRequestData) returns (StreamResponseData); // 客户端流模式
rpc AllStream(stream StreamRequestData) returns ( stream StreamResponseData); // 双向流模式
}

message StreamRequestData {
string data = 1;
}

message StreamResponseData {
string data = 1;
}

(2)server.go

流模式和普通 PRC 相比,server 有三点差异:

  • 参数:使用 stream 标记的参数,在函数实现时,需使用 protoc 自动生成的 streamServer 代替。(不
  • 发送数据流:使用 streamServer 类型的参数,调用 send 函数。
  • 接收数据流:使用 streamServer 类型的参数,调用 send 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main

import (
"fmt"
"net"
"sync"
"time"

"google.golang.org/grpc"

"learngo/streamingGrpc/proto"
)

const PORT = ":8088"

type server struct{}

// GetStream 实现服务端流模式
func (s *server) GetStream(req *proto.StreamRequestData, res proto.Greeter_GetStreamServer) error { // 使用 流模式 传输的数据,使用 proto 中自动生成的 streamServer 代替函数中的参数。
fmt.Println("\nServer-side streaming RPC")
fmt.Println(req.Data)
i := 0
for {
i++
_ = res.Send(&proto.StreamResponseData{ // Send 函数发送数据
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
if i > 3 {
break
}
}
return nil
}

// PostStream 实现客户端流模式
func (s *server) PostStream(clientStream proto.Greeter_PostStreamServer) error {
fmt.Println("\nClient-side streaming RPC")
for {
if recv, err := clientStream.Recv(); err != nil { // Recv 函数接收数据
fmt.Println(err)
break
} else {
fmt.Println(recv.Data)
}
}
return nil
}

// AllStream 实现双向流模式
func (s *server) AllStream(allStream proto.Greeter_AllStreamServer) error {
fmt.Println("\nBidirectional streaming RPC")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
msg, err := allStream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("msg from client: " + msg.Data)
}
}()
go func() {
defer wg.Done()
i := 0
for {
i++
_ = allStream.Send(&proto.StreamResponseData{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
if i > 3 {
break
}
time.Sleep(time.Second)
}
}()

wg.Wait()
return nil
}

func main() {

// 1.实例化 grpc server
s := grpc.NewServer()

// 2.注册 server
proto.RegisterGreeterServer(s, &server{})

// 3.监听端口,启动 server
listen, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
_ = s.Serve(listen)

}

(3)client.go

流模式和普通 PRC 相比,client 有三点差异:

  • 参数:使用 stream 标记的参数,函数调用时,无需提供。调用函数后获得返回值为对应的 streamClient。
  • 发送数据流:使用 streamClient 类型的函数返回值,调用 send 函数。
  • 接收数据流:使用 streamClient 类型的函数返回值,调用 send 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
"context"
"fmt"
"sync"
"time"

"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc"

"learngo/streamingGrpc/proto"
)

func main() {

// 1.拨号连接
conn, err := grpc.Dial(":8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
panic(err)
}
}(conn)

// 2.实例化一个 client。(client 结构体中变量为 conn,使用 grpc.ClientConnInterface 接收参数 conn)
client := proto.NewGreeterClient(conn)

// 3.通过 client 来分别调用不同的流模式 RPC。
// 服务端流模式
fmt.Println("\nServer-side streaming RPC")
// 返回值为 Greeter_GetStreamClient,可以调用 Recv 函数。
res, _ := client.GetStream(context.Background(), &proto.StreamRequestData{Data: "tell me time"})
for {
data, err := res.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(data)
}

// 客户端流模式
fmt.Println("\nClient-side streaming RPC")
postStream, err := client.PostStream(context.Background())
i := 0
for {
i++
_ = postStream.Send(&proto.StreamRequestData{
Data: fmt.Sprintf("client send count: %d", i),
})
time.Sleep(time.Second)
if i > 3 {
break
}
}

// 双向流模式
// 为了保证双向流模式,需使用2个协程分别发送和接收。
fmt.Println("\nBidirectional streaming RPC")
allStream, err := client.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
msg, _ := allStream.Recv()
fmt.Println("msg form server: " + msg.Data)
}
}()

go func() {
defer wg.Done()
i := 0
for {
i++
_ = allStream.Send(&proto.StreamRequestData{
Data: fmt.Sprintf("%d", i),
})
if i > 3 {
break
}
time.Sleep(time.Second)
}
}()
wg.Wait()
}

3.3 metadata 机制

(1)metadata 是什么?

在 HTTP 数据传输中,除了 get/post 的参数,还有些参数会通过 metadata 来传输。

(2)新建 metadata

注意:虽然写的的是 map[string]string,但实际上是 map[string][]string,每一个 key 对应的 value 都是一个 map[string]string

1
2
3
4
5
6
7
8
9
10
11
12
// 创建 metadata 数据的第一种方式
md := metadata.New(map[string]string{
"username": "admin",
"Username": "admin2",
"password": "Qwer123",
})
// 创建 metadata 数据的第二种方式
md = metadata.Pairs(
"Username", "admin",
"Username", "admin2", // "username" will have map value []string{"admin", "admin2"}
"Password", "Qwer123",
)

(3)发送 metadata

1
ctx := metadata.NewOutgoingContext(context.Background(), md) // 创建一个传出 metadata 数据的 context。

(4)接收 metadata

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 从 context 中获取 metadata 数据
md, ok := metadata.FromIncomingContext(ctx)
if ok {
fmt.Println("get metadata success.")
for key, val := range md {
fmt.Println(key, val)
}
} else {
fmt.Println("get metadata error.")
}

/* 打印 metadata 的内容:
user-agent [grpc-go/1.47.0]
username [admin admin2] // 注意:每一个 key 对应的 value 都是一个 map[string]string。
password [Qwer123]
:authority [127.0.0.1:8080]
content-type [application/grpc]
*/

(5)完整例子

metadata.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
syntax = "proto3";

option go_package = ".;proto";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string name = 1;
}

message HelloResponse {
string message = 1;
}

server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"context"
"fmt"
"net"

"google.golang.org/grpc/metadata"
"google.golang.org/grpc"

"learngo/metadata/proto"
)

type server struct{}

func (s server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error) {

// 从 context 中获取 metadata 数据
md, ok := metadata.FromIncomingContext(ctx)
if ok {
fmt.Println("get metadata success.")
for key, val := range md {
fmt.Println(key, val)
}
} else {
fmt.Println("get metadata error.")
}

response := &proto.HelloResponse{
Message: "hello, " + request.Name,
}
return response, nil
}

func main() {
// 1.实例化 grpc server
s := grpc.NewServer()

// 2.注册服务
proto.RegisterGreeterServer(s, server{})

// 3.监听并启动 grpc server
listener, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
err = s.Serve(listener)
if err != nil {
panic(err)
}
}

client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"context"
"fmt"

"google.golang.org/grpc/metadata"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"learngo/metadata/proto"
)

func main() {

// 创建 metadata 数据的第一种方式
md := metadata.New(map[string]string{
"username": "admin",
"Username": "admin2",
"password": "Qwer123",
})
// 创建 metadata 数据的第二种方式
md = metadata.Pairs(
"Username", "admin",
"Username", "admin2", // "username" will have map value []string{"admin", "admin2"}
"Password", "Qwer123",
)
ctx := metadata.NewOutgoingContext(context.Background(), md) // 创建一个传出 metadata 数据的 context。

// 1.拨号
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}

defer func(conn *grpc.ClientConn) {
err = conn.Close()
if err != nil {
panic(err)
}
}(conn)

// 2.实例化 client
client := proto.NewGreeterClient(conn)

// 3.远程调用
request := proto.HelloRequest{
Name: "Golang",
}
response, err := client.SayHello(ctx, &request)
if err != nil {
panic(err)
}
fmt.Println(response.Message)
}

3.4 Interceptor 拦截器

(1)拦截器分类

grpc 的拦截器可以实现在 server 端,也可以实现在 client 端。

  • server 端拦截器:在接收到请求后,在真正处理请求的前后,在拦截器中进行一些处理。

  • client 端拦截器:在发送请求的前后,在拦截器中进行一些处理。

第三方库 go-grpc-middleware 实现了很多拦截器的功能:认证(auth)、 日志( logging)、监控(monitoring)等,可以直接使用,值得借鉴学习。

由于 unary 方法和 stream 方法参数不同,拦截器又分为一元拦截器(unary interceptor)和流拦截器(stream interceptor)。所以共有 4 种类型变量,每个变量都是一个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
定义在 google.golang.org/grpc@v1.51.0/interceptor.go
*/

// grpc.UnaryServerInterceptor
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

// grpc.StreamServerInterceptor
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

// grpc.UnaryClientInterceptor
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

// grpc.StreamClientInterceptor
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

(2)拦截器流程

主要流程是:

  1. 定义拦截器
    • 预处理 pre-processing
    • 调用RPC方法 invoking RPC method
    • 后处理 post-processing
  2. 合适的时机指定拦截器
    • server:实例化 grpc server 时,作为 ServerOption 传入。
    • client:拨号时,作为 DialOption 传入。

(3) server 端的 unary 拦截器

如何找到该实现的函数:

  1. grpc.UnaryInterceptor 定义源码,需传入参数 i UnaryServerInterceptor
  2. 查看参数 UnaryServerInterceptor 定义源码,复制具体的函数,进行实现。

需求:在处理请求前后,打印两句 log。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package main

import (
"context"
"fmt"
"net"

"google.golang.org/grpc"

"learngo/interceptor/proto"
)

type server struct{}

func (s *server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error) {
return &proto.HelloResponse{
Message: "hello," + request.Name,
}, nil
}

func main() {
interceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // 定义 UnaryServerInterceptor 类型拦截器
fmt.Println("Request is coming.") // 在处理请求前进行处理,(预处理 pre-processing)
resp, err = handler(ctx, req) // 处理真正的请求,(调用RPC方法 invoking RPC method)
fmt.Println("The request has been processed.") // 处理请求后进行处理,(后处理 post-processing)
return resp, err // 返回请求响应结果
}
opt := grpc.UnaryInterceptor(interceptor) // 将 UnaryServerInterceptor 作为参数,new 一个 ServerOption 变量。
s := grpc.NewServer(opt) // 实例化 grpc server 时,可以传入多个 ServerOption。(可变参数列表),这个指定拦截器。
proto.RegisterGreeterServer(s, &server{})
listener, err := net.Listen("tcp", "0.0.0.0:8899")
if err != nil {
panic(err)
}
err = s.Serve(listener)
if err != nil {
panic(err)
}
}

除了上述写法,也可以将 UnaryServerInterceptor 拦截器方法单独拿出来,如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 func main() {
opt := grpc.UnaryInterceptor(interceptor) // 将 UnaryServerInterceptor 作为参数,new 一个 ServerOption 变量。
s := grpc.NewServer(opt) // 实例化 grpc server 时,可以传入多个 ServerOption。(可变参数列表),这个指定拦截器。
proto.RegisterGreeterServer(s, &server{})
listener, err := net.Listen("tcp", "0.0.0.0:8899")
if err != nil {
panic(err)
}
err = s.Serve(listener)
if err != nil {
panic(err)
}
}

func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // 定义 UnaryServerInterceptor 类型拦截器
fmt.Println("Request is coming.") // 在处理请求前进行处理,(预处理 pre-processing)
resp, err = handler(ctx, req) // 处理真正的请求,(调用RPC方法 invoking RPC method)
fmt.Println("The request has been processed.") // 处理请求后进行处理,(后处理 post-processing)
return resp, err // 返回请求响应结果
}

(4)client 端的 unary 拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"context"
"fmt"
"time"

"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc"

"learngo/interceptor/proto"
)

func main() {
interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 定义 UnaryClientInterceptor 类型变量
start := time.Now() // 发生请求之前记录开始时间,(预处理 pre-processing)
err := invoker(ctx, method, req, reply, cc, opts...) // 发生请求,(调用RPC方法 invoking RPC method)
duration := time.Since(start) // 记录请求使用时间,(后处理 post-processing)
fmt.Println("duration: ", duration)
return err // 返回 server 响应的数据
}

opts := []grpc.DialOption{ // 建立一个 DialOption 列表,拨号时传入
grpc.WithTransportCredentials(insecure.NewCredentials()), // 使用不安全的凭证
grpc.WithUnaryInterceptor(interceptor), // 指定 UnaryClientInterceptor 拦截器
}

//1.拨号
conn, err := grpc.Dial("127.0.0.1:8899", opts...) // 可变参数列表传入多个 DialOption
if err != nil {
panic(err)
}
defer func(conn *grpc.ClientConn) {
err = conn.Close()
if err != nil {
panic(err)
}
}(conn)

//2.实例化client
client := proto.NewGreeterClient(conn)

//3.远程调用
request := proto.HelloRequest{
Name: "Golang",
}
response, err := client.SayHello(context.Background(), &request)
if err != nil {
panic(err)
}
fmt.Println(response.Message)
}

(5)使用 metadata 和 interceptor 实现 auth 认证

  • 在 client interceptor 中,使用 metadata 传递 appid/appkey;
  • 在 server interceptor 中验证 appid/appkey 是否存在/合法;
  • 如果不存在/不合法,使用 grpc 自带的 status.Error 返回状态码和错误信息。

server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // 定义 UnaryServerInterceptor 类型拦截器
fmt.Println("Request is coming.") // 在处理请求前进行处理
md, ok := metadata.FromIncomingContext(ctx) // 获取 metadata 数据,并验证
if ok {
appid := md["appid"][0]
appkey := md["appkey"][0]
if appid != "123" || appkey != "456" {
err = status.Error(codes.InvalidArgument, "invalid token.")
return resp, err
}
} else {
status.Error(codes.Unauthenticated, "Token no exist.")
return resp, err
}
resp, err = handler(ctx, req) // 处理真正的请求
fmt.Println("The request has been processed.") // 处理请求后进行处理
return resp, err // 返回请求响应结果
}

client.go

1
2
3
4
5
6
7
8
9
10
11
interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 定义 UnaryClientInterceptor 类型变量
md := metadata.New(map[string]string{ // 创建 metadata 数据
"appid": "123",
"appkey": "456",
})
ctx = metadata.NewOutgoingContext(ctx, md) // 传输 metadata 数据
start := time.Now() // 发生请求之前记录开始时间
err := invoker(ctx, method, req, reply, cc, opts...) // 发生请求
duration := time.Since(start) // 记录请求使用时间
fmt.Println("duration: ", duration)
}

(6)grpc 内置的自定义认证接口

WithPerRPCCredentials

1
2
3
4
5
6
7
// WithPerRPCCredentials returns a DialOption which sets credentials and places
// auth state on each outbound RPC.
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
})
}

client 无需在拦截器中设置 appid/appkey, WithPerRPCCredentials 可以将传入的 credentials.PerRPCCredentials 转化为 DialOption。查看 credentials.PerRPCCredentials 源码可以看到,PerRPCCredentials 是一个接口,实现其两个方法,就可以实现该接口。

  • GetRequestMetadata:获取当前请求认证所需的元数据(metadata)
  • RequireTransportSecurity:是否需要基于 TLS 认证进行安全传输

client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"context"
"fmt"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"learngo/auth/proto"
)

type Credential struct{}

func (c Credential) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { // 获取当前请求认证所需的 metadata
return map[string]string{
"appid": "123",
"appkey": "456",
}, nil
}

func (c Credential) RequireTransportSecurity() bool { // 是否需要基于 TLS 认证进行安全传输
return false
}

func main() {
interceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { // 定义 UnaryClientInterceptor 类型变量
start := time.Now() // 发生请求之前记录开始时间
err := invoker(ctx, method, req, reply, cc, opts...) // 发生请求
duration := time.Since(start) // 记录请求使用时间
fmt.Println("duration: ", duration)
return err // 返回 server 响应的数据
}

opts := []grpc.DialOption{ // 建立一个 DialOption 列表,拨号时传入
grpc.WithTransportCredentials(insecure.NewCredentials()), // 使用不安全的凭证
grpc.WithUnaryInterceptor(interceptor), // 指定 UnaryClientInterceptor 拦截器
grpc.WithPerRPCCredentials(Credential{}), // 传入凭证
}

//1.拨号
conn, err := grpc.Dial("127.0.0.1:8899", opts...) // 可变参数列表传入多个 DialOption
if err != nil {
panic(err)
}
defer func(conn *grpc.ClientConn) {
err = conn.Close()
if err != nil {
panic(err)
}
}(conn)

//2.实例化client
client := proto.NewGreeterClient(conn)

//3.远程调用
request := proto.HelloRequest{
Name: "Golang",
}
response, err := client.SayHello(context.Background(), &request)
if err != nil {
panic(err)
}
fmt.Println(response.Message)
}

3.5 验证器

(1)一个开源项目

protoc-gen-validate,该项目目前处于 alpha 阶段,功能是对于 protobuf 中的 message,进行数据验证。

下面简单跑通一下。

(2)原理:

  1. 定义规则:proto 中定义了 HelloRequest 这种 message 类型,并在其中标记数据规则。
  2. 生成验证规则的代码:protoc-gen-validate 会为 HelloRequest 类型,生成多个结构体方法,其中有一个 Validate 方法。在 Validata 方法,根据设置的数据规则,验证数据。
  3. 使用:在实际使用中,HelloRequest.Validate 就可以验证数据。

(3)安装 protoc-gen-validate 二进制文件

注意看 README 文件,可能有更新。

1
2
3
4
5
6
7
8
# fetches this repo into $GOPATH
go get -d github.com/envoyproxy/protoc-gen-validate # -d 表示只下载源码不安装
git clone https://github.com/bufbuild/protoc-gen-validate.git
# installs PGV into $GOPATH/bin
cd protoc-gen-validate && make build
# build bin 文件后,删除 protoc-gen-validate 仓库就可以。

# go get 下载的 protoc-gen-validate 源码在 $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/

在 README 文件里有这样一段话,用以说明为什么使用上面的方式下载源码和构建二进制文件。

Yes, our go module path is github.com/envoyproxy/protoc-gen-validate not bufbuild this is intentional.

Changing the module path is effectively creating a new, independent module. We would prefer not to break our users. The Go team are working on better cmd/go support for modules that change paths, but progress is slow. Until then, we will continue to use the bufbuild module path.

补充说明:实际使用过程中,需要部分源码,也需要二进制文件 protoc-gen-validate。源码使用 go get -d 下载在 $GOPATH/pkg/mod/github.com/envoyproxy/ 目录下,二进制文件在新的文件夹里编译后放在 $GOPATH/bin 目录下。

(4)导入 proto 文件并生成 bp.go 文件

hello.proto

在 message 中设置验证条件。Constraint Rules

import "validate.proto"; 可能会显示找不到文件。没关系,后面使用 protoc 生成代码时,指定该文件的目录。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
syntax = "proto3";
option go_package = ".;proto";
import "validate.proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
uint64 id = 1 [(validate.rules).uint64.gt = 999]; // id > 999
string email = 2 [(validate.rules).string.email = true]; // email 符合邮件规则
}

message HelloResponse {
string message = 1;
}

生成代码命令

注意使用 -I 指定在哪个目录中找 validate.proto 。使用 go get -d 下载的代码,会在 $GOPATH/pkg/mod/ 目录下。

1
2
3
4
5
6
7
protoc -I . \   # 当前目录中寻找 hello.proto
-I $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/ \ # 这个目录中寻找 import 的 proto 文件
--go_out=. \ # pb.go 的生成目录
--go-grpc_out=. \ # grpc.pb.go 的生成目录
--validate_out="lang=go:." \ # pb.validate.go 的生成目录
--go-grpc_opt=require_unimplemented_servers=false \ # 该方法是为了向前兼容,这里关闭该方法的生成
hello.proto

server.go

在拦截器中验证数据,注意这里的类型转化。

  • 传入的时 interface{} 类型,不能直接调用 Validate 函数,需要转化为具体的类型(这个类型实现了 Validate 方法)。
  • 为了扩展更多的类型,拦截器中又不能转换为 HelloRequest(虽然也能跑,但是这个拦截器就只能验证 HelloRequest 这一种数据了),而应该转换为一个接口类型。
  • 只要这个接口内包含 Validate 方法就可以,所以我们声明一个 Validator 类型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package main

import (
"context"
"net"
"strconv"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"learngo/validate/proto"
)

type server struct{}

func (s *server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error) {
return &proto.HelloResponse{
Message: "NO." + strconv.FormatUint(request.Id, 10) + " email is " + request.Email,
}, nil
}

func main() {
// 1.实例化带有 interceptor 的 grpc server
option := grpc.UnaryInterceptor(interceptor)
s := grpc.NewServer(option)

// 2.注册
proto.RegisterGreeterServer(s, &server{})

// 3.启动并监听
listener, err := net.Listen("tcp", "0.0.0.0:8899")
if err != nil {
panic(err)
}
err = s.Serve(listener)
if err != nil {
panic(err)
}
}

type Validator interface { // 写一个接口,任何实现 Validate 方法的变量,都可以转成 Validator 类型,再调用 Validate 方法。
Validate() error
}

// 在拦截器中,验证传入的数据是否合法
func interceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if validator, ok := req.(Validator); ok { // interface{} 类型转换为 HelloRequest 类型。但是这里可能需要判断多种数据类型,不能直接写死成 req.(HelloRequest),需要使用转成 Validator 接口类型。
if err := validator.Validate(); err != nil { // 因为在 hello.pb.validate.go 中 HelloRequest 实现了 Validate 方法。也就直接实现了 Validator 接口,可以通过 Validator 接口调用 Validate 方法。
return nil, status.Error(codes.InvalidArgument, err.Error()) // 返回的 error 为 统一的错误码 + 具体的错误信息。
}
}
if err != nil {
return nil, err
}
return handler(ctx, req)
}

client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"context"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"learngo/validate/proto"
)

func main() {
// 1.拨号
conn, err := grpc.Dial("127.0.0.1:8899", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer func(clientConn *grpc.ClientConn) {
err := clientConn.Close()
if err != nil {
panic(err)
}
}(conn)

// 2.实例化 client
client := proto.NewGreeterClient(conn)

// 3.远程调用
response, err := client.SayHello(context.Background(), &proto.HelloRequest{
Id: 1000,
Email: "1234@gmail.com",
Mobile: "130000000000",
})
if err != nil {
panic(err)
}

fmt.Println(response.Message)
}

3.6 状态码

grpc 官方定义了状态码:https://github.com/grpc/grpc/blob/master/doc/statuscodes.md

3.7 错误处理

go 中的错误处理主要是 status 包内。Status 类型中封装了 Error 类型。status.Error() 内部就是 new 一个 status 类型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func main() {

// 生成 error
err := status.Error(codes.NotFound, "未找到记录")
panic(err) // panic: rpc error: code = NotFound desc = 未找到记录

err = status.New(codes.NotFound, "未找到记录").Err()
panic(err) // panic: rpc error: code = NotFound desc = 未找到记录

err = status.Errorf(codes.NotFound, "未找到%s记录", "Tom")
panic(err) // panic: rpc error: code = NotFound desc = 未找到Tom记录

err = status.Newf(codes.NotFound, "未找到%s记录", "Tom").Err()
panic(err) // panic: rpc error: code = NotFound desc = 未找到Tom记录

// 解析 error
if fromError, ok := status.FromError(err); ok {
fmt.Println(fromError.Code()) // 打印错误码 // NotFound
fmt.Println(fromError.Message()) // 打印错误信息 //未找到Tom记录
}
}

3.8 超时机制

使用 context.WithTimeout(context.Background(), time.Second*3) 生成带有 deadline 功能的 contex,在远程调用时使用。

server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"context"
"net"
"time"

"google.golang.org/grpc"

"learngo/deadline/proto"
)

type server struct{}

func (*server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error) {
return &proto.HelloResponse{
Reply: "hello, " + request.Name,
}, nil
}

func main() {
// 1. 实例化 grpc server
opt := grpc.UnaryInterceptor(func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
time.Sleep(time.Second * 5) // 等待 5s,模拟 server 超时
return handler(ctx, req)
})
s := grpc.NewServer(opt)

// 2. 注册
proto.RegisterGreeterServer(s, &server{})

// 3. 监听启动
listen, err := net.Listen("tcp", ":8899")
if err != nil {
panic(err)
}
err = s.Serve(listen)
if err != nil {
panic(err)
}
}

client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"context"
"fmt"
"time"

"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc"

"learngo/deadline/proto"
)

func main() {
// 1.拨号
conn, err := grpc.Dial("127.0.0.1:8899", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer func(clientConn *grpc.ClientConn) {
err := clientConn.Close()
if err != nil {
panic(err)
}
}(conn)

// 2.实例化客户端
client := proto.NewGreeterClient(conn)

// 3.远程调用
// 超时机制
//ctx, cancelFunc := context.WithTimeout(context.Background(), time.Second*3)
// 直接调用 cancelFunc 可以主动取消 goroutine。下面的代码用不到,使用 _ 代替。
ctx, _ := context.WithTimeout(context.Background(), time.Second*3)
response, err := client.SayHello(ctx, &proto.HelloRequest{Name: "Golang"})

if err != nil {
panic(err)
}
fmt.Println(response.Reply)
}

3.8 server/client stub 解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
syntax = "proto3";
option go_package=".;proto";

service Greeter {
rpc SayHello(HelloRequest) returns (HelloResponse);
}

message HelloRequest {
string name = 1;
}

message HelloResponse {
string reply = 1;
}

以上面的 hello.proto 为例,protoc 生成的 go 源码是 hello_grpc.pb.go 文件。

(1)server stub

1
2
3
type GreeterServer interface {
SayHello(context.Context, *HelloRequest) (*HelloResponse, error)
}
  • 声明 GreeterServer 接口,里面封装了 SayHello 函数。
  • 在服务端中,我们会声明一个结构体 server,让 server 实现 GreeterServer 接口,也就是为 server 定义结构体方法 SayHello 函数。
1
2
3
func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) {
s.RegisterService(&Greeter_ServiceDesc, srv)
}
  • 将 GreeterServer 注册到 grpc server 中。
  • 在服务端中,因为 server 实现了 GreeterServer 接口,也就可以将 server 注册到 grpc server 中。

(2)client stub

1
2
3
type GreeterClient interface {
SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
}
  • 声明 GreeterClient 接口,里面封装了 SayHello 函数。
1
2
3
4
5
6
7
8
9
10
11
12
type greeterClient struct {
cc grpc.ClientConnInterface
}

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error) {
out := new(HelloResponse)
err := c.cc.Invoke(ctx, "/Greeter/SayHello", in, out, opts...) // 实际上是利用 grpc.ClientConnInterface 的Invoke 方法进行远程调用。
if err != nil {
return nil, err
}
return out, nil
}
  • 声明 greeterClient 结构体,里面包含着 grpc.ClientConnInterface 变量,也就是可以使用 grpc.ClientConnInterface 的方法。
  • 并为 greeterClient 结构体实现了 SayHello 方法,所以 greeterClient 结构体实现了 GreeterClient 接口。
  • 为什么 greeterClient 结构体里面要包含一个 grpc.ClientConnInterface 变量?
    • 远程调用 server 端 SayHello 时,实际上调用的是 grpc.ClientConnInterface 的 Invoke 方法。
    • 在 Invoke 方法的参数中,有 context、路径(call ID)、传入参数、穿出参数、远程调用的选项。
1
2
3
func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient {
return &greeterClient{cc}
}
  • 这里是实例化 greeterClient 结构体的方法,提供给客户端使用。
  • 注意:这里返回的是 GreeterClient 接口变量,实际上返回的是 greeterClient 结构体变量,因为在上面 greeterClient 结构体实现了 GreeterClient 接口。

Reference

grpc API:https://pkg.go.dev/google.golang.org/grpc


微服务|RPC/gRPC
https://www.aimtao.net/rpc/
Posted on
2022-05-27
Licensed under