微服务基础|RPC/gRPC

1.RPC

1.1 RPC 是什么

  • RPC(Remote Procedure Call)远程过程调用。简单说就是,一个节点请求另一个节点提供的服务。
  • 函数调用是最常见的本地过程调用。
  • RPC 就是将本地过程调用,变成远程过程调用。

1.2 RPC 需要解决的问题

将本地函数放在服务器上运行,有三个主要问题需要解决。

(1)Call 的 ID 映射

  • 本地调用是通过函数指针来调用。
  • 在 RPC 中,所有函数必须有自己的 ID,且 ID 在所有进程中是唯一的。
  • 客户端与服务端分别维护一个函数与 Call ID 的映射表,二者的表不一定相同,但相同函数对应的 Call ID 必须相同。
  • 当客户端需要进行远程调用时,查表得到 Call ID,并传给服务端;服务端通过 Call ID 查表得到所调用的函数,并执行对应的函数代码。

(2)序列化和反序列化

  • 序列化:把对象转化为字节流,进行网络传输。
  • 反序列化:把网络中接收的字节流转化为对象。

(3)网络传输

  • 大部分 RPC 框架都基于 TCP/UDP 协议进行封装。
  • HTTP 1.x 一旦对方返回结果,就会断开连接,因此存在性能问题。
  • gRPC 是基于 HTTP 2.0 的,HTTP 2.0 支持长连接。

1.3 通过 HTTP 完成 Add 服务

通过 net/http 库,使用 get 请求完成远程的 add 服务。在这个过程中需要完成 RPC 的三件重要事情:

  • Call ID 的映射:使用特定的请求路径来标识,r.URL.path
  • 序列化和序列化:使用 json.Marshal 完成。
  • 网络传输:使用 http 发送 get 请求。

使用 http 的方式完成 RPC 的问题有两点,这是 RPC 框架需要解决的问题。

  • 写业务逻辑比较麻烦,每个函数都需要写一个请求。
  • 客户端和服务端需要清晰地明确参数是如何传递的。

(1)服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func main() {
http.HandleFunc("/add", func(writer http.ResponseWriter, request *http.Request) {
// 解析并获取参数
_ = request.ParseForm()
fmt.Println("path: ", request.URL.Path)
a, _ := strconv.Atoi(request.Form["a"][0])
b, _ := strconv.Atoi(request.Form["b"][0])

// 构造请求头
writer.Header().Set("Content-Type", "application/json")

// 序列化成 []byte 类型
bytes, _ := json.Marshal(map[string]int{
"data": a + b,
})

// 写入 respond
_, _ = writer.Write(bytes)
})

// 监听 8080 端口
_ = http.ListenAndServe(":8000", nil)
}

(2)客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type ResponseData struct {
Data int `json:"data"`
}

func Add(a, b int) int {

// 发送 get 请求
response, _ := http.Get(fmt.Sprintf("http://127.0.0.1:8000/%s?a=%d&b=%d", "add", a, b))
defer response.Body.Close() // 程序结束关闭 body

// 读取 body
body, _ := ioutil.ReadAll(response.Body)
responseData := ResponseData{} // 初始化结构体
json.Unmarshal(body, &responseData) // 反序列化 json,并将数据存入结构体

return responseData.Data
}
func main() {
fmt.Println(Add(1, 2))
}

1.4 RPC 开发的要素

(1)RPC 开发的四大要素

  • client:负责发起服务调用,传递参数。
  • clent stub:运行在客户端机器上,负责存储要调用的服务端地址等信息,还负责将客户端的请求打包成数据包,发送给服务端。
  • server:有客户端要调用的方法,负责执行调用的方法。
  • server stub:运行在服务端机器上,负责接受客户端的数据包,并调用在 server 上的方法,并将调用结果进行数据处理,打包返回给客户端。

(2)原理图

(3)过程

  • client:client 想要发起远程过程调用,通过调用 client stub 的方式,传递想要调用的方法及参数。
  • client stub:接收到 client 的调用请求,将 client 请求调用的方法名、参数等信息序列化,打包成数据包;并查找到远程服务器的 IP 地址及端口。
  • socket:,通过 socket 协议,将数据包发送给服务端。
  • server stub:接受客户端的数据包,并通过约定好的协议进行反序列化,得到请求的方法名和参数;并调用 server 对应的方法,并传入参数。
  • server:执行被调用的方法,处理业务;并将结果返回给 server stub。
  • server stub:将 server 返回的结果按照约定的协议,进行序列化,打包成数据包。
  • socket:通过 socket 协议,将数据包发送给服务端。
  • client stub:接收到返回的数据,按照约定进行反序列化,并将调用结果传给 client。
  • client:得到调用结果。

至此,整个 RPC 调用完成。

(4)动态代理技术

在 client stub 和 server stub 中,会使用 动态代理技术 自动生成一段代码,这样我们就可以更专注于业务的编码,不用对每个函数调用进行都进行封装一遍。

1.5 使用 go 内置的 RPC

下面简单介绍 go 内置的 RPC,主要用到 netnet/rpc 这两个包。

(1)服务端

  1. 实例化 server
  2. 将函数注册到 RPC 中
  3. 启动服务

RPC 主要解决了的问题:call id、序列化和反序列化(使用的 Gob 协议)。listen 和 accept 的都是 net 包完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// HelloService :想要将多个方法注册到 rpc 当中,可以使用 struct 来封装多个方法,然后注册 struct 即可。
type HelloService struct {
}

func (s *HelloService) Hello(request string, reply *string) error { // reply 传出参数
*reply = "hello " + request
return nil
}

func main() {
// 1.实例化 server
listener, _ := net.Listen("tcp", "localhost:8899")

// 2.将函数注册到 RPC 中
_ = rpc.RegisterName("CommonHelloService", &HelloService{})
/*
第一个参数:是服务的名字,随便起,与结构体 HelloService 命名无关,只要 client/server 在 Call 和 RegisterName 时保持一致就行。
第二个参数:为什么这里要取地址?因为结构体指针实现了 interface。具体原因,见下文。
*/

// 3.启动服务
conn, _ := listener.Accept() // conn 是 ServerCodec 类型,里面封装了读/写/关闭操作。

rpc.ServeConn(conn)
}

补充:为什么 RegisterName 的第二个参数要取地址?

这里接收 &HelloService{} 是一个 interface 变量,传给 interface 变量值还是指针,由 HelloService 在实现函数时,用的指针接受者还是值接受者决定。所以,

  • 如果定义成 func (s *HelloService) Hello(...),此处只能传递指针类型 &HelloService{}
  • 如果定义成 func (s HelloService) Hello(...),此处可以传递指针类型 &HelloService{} 或值类型 HelloService{}

如果不知道为什么,建议阅读 接口的值类型

(2)客户端

  1. 建立连接
  2. 远程调用函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func main() {

// 1. 建立连接
client, err := rpc.Dial("tcp", "localhost:8899")
if err != nil {
panic("connect fail.")
}

// 2.远程调用函数
var reply string
err = client.Call("CommonHelloService.Hello", "xiaomi", &reply)
if err != nil {
panic("call fail.")
}
fmt.Println(reply)
}

1.6 序列化格式使用 json

RPC 使用的序列化协议是 Gob,而 json 是使用更加广泛的格式,为了满足跨语言的需求,一般会使用 json 格式来编解码数据。下面将其换成 json。

(1)服务端

只需要将 rpc.ServeConn 换成 rpc.ServeCodec

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// HelloService :想要将多个方法注册到 rpc 当中,可以使用 struct 来封装多个方法,然后注册 struct 即可。
type HelloService struct {}

func (s *HelloService) Hello(request string, reply *string) error { // reply 传出参数
*reply = "hello " + request
return nil
}

func main() {
// 1.实例化 server
listener, _ := net.Listen("tcp", "localhost:8899")

// 2.将函数注册到 RPC 中
_ = rpc.RegisterName("HelloService", &HelloService{}) // 如上文所述原因,需传递指针变量。

// 3.启动服务
conn, _ := listener.Accept() // conn 是 ServerCodec 类型,里面封装了读/写/关闭操作。

rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) // ServeCodec: 使用指定的编解码器来解码请求和编码响应。这里传入使用 json 编解码的 ServerCodec。
}

(2)客户端

需要将 rpc.Dial 换成 net.Dial,否则使用 rpc.Dial 会使用 Gob 的协议进行编解码。并通过 conn 生成一个 ClientCodec。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func main() {

// 1. 建立连接
conn, err := net.Dial("tcp", "localhost:8899")
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) // 使用json 编解码的 ClientCodec。
if err != nil {
panic("connect fail.")
}

// 2.远程调用函数
var reply string
err = client.Call("HelloService.Hello", "xiaomi", &reply)
if err != nil {
panic("call fail.")
}
fmt.Println(reply)
}

1.7 监听端口使用 http

内置的 RPC 监听的是 TCP 端口,跨语言完成调用时,客户端需要使用 socket 来传递 TCP 数据报。如果监听 http 请求,客户端只需发送 Get/Post 请求即可。

(1)服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func main() {
// 1.将函数注册到 RPC 中
_ = rpc.RegisterName("HelloService", &HelloService{}) // 如上文所述原因,需传递指针变量。

// 2.http 处理函数
http.HandleFunc("/jsonrpc", func(writer http.ResponseWriter, request *http.Request) {
var conn io.ReadWriteCloser = struct {
io.Writer
io.ReadCloser
}{
ReadCloser: request.Body,
Writer: writer,
}
rpc.ServeRequest(jsonrpc.NewServerCodec(conn)) // rpc.ServeRequest 只要拿到带有 writer/reader 的 conn 就可以。
})

// 3.监听 http 端口
http.ListenAndServe(":8899", nil)
}

(2)客户端

// 待补充

1.8 封装 Stub

需求:

  1. client 使用 client.Hello(request, &reply) 的格式优雅远程调用。
  2. client/server 使用同一个服务命令,手动写可能出现服务名称不一致/服务名称冲突的错误。
  3. server 可以专注业务逻辑。
  4. sever 无序关注服务名称、结构体类型。

解决:

  • 需求一,使用 clientStub 将 Dial 和 Call 过程进行封装。
  • 需求二,使用共同的 handler 文件,在 handler 中将服务名称定义成常量。
  • 需求三,将业务逻辑部分抽离出来,放到其他文件中(这里为了方便演示,放在了 handler 中,其实不能放在 handler 中,handler 是共有文件,应该新建一个文件。)。
  • 需求四,使用 serverStub 将 rpc.RegisterName 进行封装,并使用接口接收结构体指针,使 serverStub 与结构体类型解耦。

(1)handler

1
2
3
4
5
6
7
8
9
10
11
12
13
package handler

const HelloServiceName = "handler/CommonHelloService" // 这是服务的名字,随便起,与结构体 HelloService 命名无关,只要 client/server 在 Call 和 RegisterName 时保持一致就行。

// HelloService :想要将多个方法注册到 rpc 当中,可以使用 struct 来封装多个方法,然后注册 struct 即可。
type HelloService struct{}

// 实际的业务逻辑(真正被远程调用的函数)
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello " + request
return nil
}

(2)clientStub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package clientStub

import (
"learngo/RPCWithStub/handler"
"net/rpc"
)

type HelloServiceStub struct {
*rpc.Client // 最终用来 call 调用函数的是 *rpc.Client。这里封装成 HelloServiceStub,便于在 client 中,直接使用 HelloServiceStub.Hello 的形式调用。
}

// NewHelloServiceClient go 中没有类、对象,所以没有初始化方法。这里需要写一个 New 方法,获取到 *rpc.Client,实例化一个 HelloServiceStub。
func NewHelloServiceClient(protocol, address string) (HelloServiceStub, error) {
conn, err := rpc.Dial(protocol, address)
if err != nil {
panic(err)
}

return HelloServiceStub{conn}, nil
}

func (c HelloServiceStub) Hello(request string, reply *string) error { // 实现要调用的方法
err := c.Call(handler.HelloServiceName+".Hello", request, reply)
if err != nil {
panic(err)
}
return nil
}

(3)client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package main

import (
"fmt"

"learngo/RPCWithStub/clientStub"
)

func main() {

// 1.建立连接,获取到一个 *rpc.Client。
client, _ := clientStub.NewHelloServiceClient("tcp", "localhost:8899")

// 2.远程调用函数
var reply string
err := client.Hello("xiaomi", &reply) // 实现优雅调用。
if err != nil {
panic("call fail.")
}
fmt.Println(reply)
}

(4)serverStub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package serverStub

import (
"learngo/RPCWithStub/handler"
"net/rpc"
)

// 使用接口,将 RegisterHelloService 形参类型 与 handler 中的结构体类型解耦。
type HelloServer interface {
Hello(request string, reply *string) error
}

func RegisterHelloService(srv HelloServer) error { // 只要实现了 Hello 方法,都可以将结构体传给 HelloServer 类型的接口变量。
return rpc.RegisterName(handler.HelloServiceName, srv)
}

(5)server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
"learngo/RPCWithStub/handler"
"learngo/RPCWithStub/serverStub"
"net"
"net/rpc"
)

func main() {
// 1.实例化 server
listener, _ := net.Listen("tcp", "localhost:8899")

// 2.将函数注册到 RPC 中
// 此处封装了注册过程中结构体的类型,结构体的类型不重要,重点是 Hello 这个函数,只要带有 Hello 方法的结构体即可。
_ = serverStub.RegisterHelloService(&handler.HelloService{})

// 3.启动服务
conn, _ := listener.Accept()

rpc.ServeConn(conn)
}

1.9 总结

为什么要花篇幅来分析内置 RPC 的使用?因为以上所解决的问题,就是 gRPC 或其他 RPC 框架所解决的问题。

更重要的是 serverStub/clientStub 可以通过 protobuf 自动生成,并且可以生成多语言版本。

2.protobuf

Reference Documents:https://developers.google.com/protocol-buffers/docs/proto3

2.1 是什么

Protocol Buffer,是 google 退出的一种轻量高效的结构化数据存储格式。和 json、xml 作用一样,但 protobuf 的性能远超 json、xml,数据压缩比比较高。

protobuf 经历了 protobuf2 和 protobuf3,目前主流版本是 protobuf3。

同类的格式还有:java 中的 dubbo/rmi/hessian,python 中的 messagepack,go 中的 gob。

2.2 protobuf 优缺点

优点:

  • 性能:压缩性好(压缩得越小,传输越快),序列化/反序列化快(比 json、xml 快 2-100 倍)。
  • 便捷性:使用简单(自动生成序列化/反序列化代码),维护成本低(只需要维护 proto 文件),向后兼容(增加内容,可以不破坏旧格式),加密性好(二进制流)。
  • 跨语言:跨平台,支持各种主流语言。

缺点:

  • 通用型差:任何语言都支持 json,但是 protobuf 需要专门的解析库。
  • 自解释性差:加密成了二进制流,只要通过 proto 文件才能了解数据结构。

2.3 环境配置

需要安装 protoc,下载 protoc-gen-go 依赖包。

1
2
3
brew install protobuf  #  这是 macOS 的安装命令,其他系统在上面的链接中自行查找。
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest # 生成 go 源代码用的。
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@vlatest #支持生成 grpc 部分代码

2.4 最小实践

protobuf 最原始的功能就是数据编码,使用主要分三步;

  1. 使用 proto 文件约束数据类型。(也就是结构体。)
  2. 自动生成源代码。
  3. 使用 github.com/golang/protobuf/proto 对数据进行序列化/反序列化。

注意:使用 proto 只能进行数据编码,加上 grpc 插件后,可以定义一些服务。

(1)使用 proto 文件约束数据类型

注意:1 是编号不是值,编码时只认编号不认变量名。(坑:联调时两个人编号不一致。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
syntax = "proto3";  // 使用 proto3 的语法
option go_package = ".;proto"; // 生成源码文件的包名,只影响 go 语言。具体使用见 2.6 go_package 的作用

service SayHello { // 定义服务(也就是方法接口) // 使用 grpc 插件才可以使用。
rpc Hello(HelloRequest) returns (HelloResponse);
}

message HelloRequest { // 定义消息类型(也就是结构体类型)
string name = 1; // 1 是编号不是值,编码时只认编号不认变量名。坑:联调时两个人编号不一致。
}

message HelloResponse {
string reply = 1;
}

(2)自动生成 go 源码

1
protoc -I . helloworld.proto --go_out=plugins=grpc:.
  • -I .:表示 include 当前目录。helloworld.proto 在当前目录下。
  • --go_out:表示生成 go 的源码,--java_out 即可生成 java 的源码。等号后面可以加选项,选项使用键值对表示。
  • plugins=grpc:一个键值对,表示使用 grpc 插件。
  • :.:表示在当前目录下生成 bp.go 文件。

补充:生成 go 源码的工具是

(4)使用 protobuf 编解码

使用 proto 序列化/反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package main

import (
"fmt"
helloworld "learngo/grpc/proto"

"github.com/golang/protobuf/proto"
)

func main() {

// 实例化一个 struct
req := helloworld.HelloRequest{
Name: "Tom",
}

// 编码
bytes, _ := proto.Marshal(&req)
fmt.Println(bytes) // [10 3 84 111 109] // 编码后自解释性差

// 解码
newBytes := helloworld.HelloRequest{}
_ = proto.Unmarshal(bytes, &newBytes) // 传入一个空 struct 作为传出参数
fmt.Println(newBytes.Name) // Tom // 成功复原
}

2.5 基本类型和默认值

(1)基本类型

https://developers.google.com/protocol-buffers/docs/proto3#scalar

.proto TypeNotesGo Type
doublefloat64
floatfloat32
int32Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead.使用可变长度编码。编码负数效率低下——如果您的字段可能具有负值,请使用 sint32。int32
int64Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead.使用可变长度编码。编码负数效率低下——如果您的字段可能具有负值,请使用 sint64。int64
uint32Uses variable-length encoding.使用可变长度编码。uint32
uint64Uses variable-length encoding.uint64
sint32Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s.使用可变长度编码。有符号的 int 值。这些比常规 int32 更有效地编码负数。int32
sint64Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s.使用可变长度编码。有符号的 int 值。这些比常规 int64 更有效地编码负数。int64
fixed32Always four bytes. More efficient than uint32 if values are often greater than 228.总是四个字节。如果值通常大于228,则比uint32更有效率。uint32
fixed64Always eight bytes. More efficient than uint64 if values are often greater than 256.总是八字节。如果值通常大于256,则比uint64更有效率。uint64
sfixed32Always four bytes.int32
sfixed64Always eight bytes.int64
boolbool
stringA string must always contain UTF-8 encoded or 7-bit ASCII text, and cannot be longer than 232.字符串必须始终包含UTF-8编码或7位ASCII文本,并且不能超过232。string
bytesMay contain any arbitrary sequence of bytes no longer than 232.可以包含任何不超过232的任意字节序列。[]byte

(2)默认值

https://developers.google.com/protocol-buffers/docs/proto3#default

当解析消息时,如果编码的消息不包含特定的 singular 元素,则解析对象中的相应字段将设置为该字段的默认值。

.proto Typedefault
string“”
byte‘’
boolfalse
数值类型0
enum默认时第一个定义的枚举值,必须为 0
message默认值根据使用的语言确定的

2.6 指定字段规则

(1)option

1
option go_package = "../common/stream/proto/v1;helloworld";
  • ../common/stream/proto/v1:指定生成 pb.go 文件所在的路径。
  • helloworld:生成 pb.go 文件的包名。
  • 注:go_package 只会对 go 进行约束,不会影响其他语言。

在开发过程中,会将 proto 文件和 pb.go 文件分开存放。文件结构如下,其中生成的 bp.go 文件可能是公共的,并且分目录和版本。

1
2
3
4
5
6
7
8
9
10
11
12
.
├── common
│   └── stream
│   └── proto
│   └── v1
│   └── stream.pb.go
├── proto
│   └── stream.proto
├── client
│   └── client.go
└── server
└── server.go

(2)singular

单数,表示字段出现 0 次或 1 次。proto3 每个字段的修饰符默认是 singular。

(3)repeated

复数,表示该字段可重复,对应的 go 中的切片。

1
2
3
message hello {
repeated int32 num = 1; // 在 go 中为 []int
}

2.7 嵌套 proto 文件

1)为什么需要嵌套 proto 文件?

  • 消息类型可能比较多,不可能只写一个 proto 文件。
  • 有些 meassage 类型可能通用,比如 protobuf 就内置了一些 message(github.com/golang/protobuf/ptypes)。

(2)实际的例子:empty message 的应用

需求:Ping 是一个测试是否连通的服务,发送请求时无需传入参数。但 service 定义时必须传入 message,此时就可以使用 Empty message。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// hello.proto

syntax = "proto3";
option go_package=".;proto";

service Greeter {
// Ping 是一个用于测试是否连通的服务,发送请求时无需传入参数。
rpc Ping(Empty) returns (Pong); // rpc 服务必须传入一个 message,但函数本意并无需传入参数,所以只能写一个 Empty message。
}

message Pong {
int64 id = 1;
}

// Empty 是一个空的 message,很多 proto 文件中都会需要。
message Empty{

}

这是一个很常见的需求,无需每个 proto 文件中都定义一个 Empty message,会冲突。

解决:定义一个公共的 proto 文件,base.proto。在 hello.proto 中 improt base.proto。

1
2
3
4
5
6
7
8
9
10
11
12
13
// base.proto

syntax = "proto3";
option go_package=".;proto";

message Pong {
int64 id = 1;
}

// Empty 是一个空的 message,很多 proto 文件中都会需要。
message Empty{

}
1
2
3
4
5
6
7
8
9
10
// hello.proto

syntax = "proto3";
import "base.proto"; // 【import 同目录下的 proto 文件】
option go_package=".;proto";

service Greeter {
// Ping 是一个用于测试是否连通的服务,发送请求时无需传入参数。
rpc Ping(Empty) returns (Pong); // rpc 服务必须传入一个 message,但函数本意并无需传入参数,所以只能写一个 Empty message。
}

(3)使用 protobuf 内置的 message

注意 proto 文件中的导入方式。improt 时用路径表示,使用时用点表示

1
2
3
4
5
6
7
8
9
10
11
12
13
// hello.proto

syntax = "proto3";
import "google/protobuf/empty.proto"; // 【improt 内置的 message】
option go_package=".;proto";

service Greeter {
rpc Ping(google.protobuf.Empty) returns (Pong);
}

message Pong {
int64 id = 1;
}

go 文件使用 Empty message 类型时,需导入empty.proto 文件中 go_package 路径。

1
2
3
4
5
6
7
8
9
// client.go

package main

import "github.com/golang/protobuf/ptypes/empty" // empty.proto 文件中 go_package 路径。

func main() {
empty.Empty{}
}

2.8 嵌套 message 对象

(1)第一种方式

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = ".;proto";

message Result {
string name = 1;
string url = 2;
}

message Reply {
string message = 1;
repeated Result data = 2; // Result 数组
}

(2)第二种方式

内部嵌套,嵌套的 message 只用一次,防止 proto 文件中定义过多 message。

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = ".;proto";

message Reply {
string message = 1;
repeated Result data = 2;

message Result {
string name = 1; // Result 的序号和 Reply 的序号有很好的隔离性,互不干扰。
string url = 2;
}
}

使用上有些特殊,pb.go 文件里会定义为 Reply_Result,所以 go 中实例化时需要使用 Reply_Result。

1
2
3
4
5
6
7
8
type Reply_Result struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"`
}

2.9 enum 类型

(1)proto 文件定义

1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";
option go_package = ".;enum";

message Reply {
string message = 1;
Gender gender = 2;
}

enum Gender { // 定义枚举类型
MALE = 0;
FEMALE = 1;
}

(2)go 文件中使用枚举类型

1
2
3
4
5
6
7
8
9
10
11
package main

import "learngo/repeated/enum" // 导入 bp.go 所在目录

func main() {
r := enum.Reply{
Message: "xxx",
Gender: enum.Gender_MALE, // 使用 枚举类型 赋值
}
fmt.Println(r)
}

2.10 map 类型

map 虽然方便,但是不要大量写 map,在传入参数和接收参数时,不知道具体字段是什么。

(1)proto 文件定义

1
2
3
4
5
6
syntax = "proto3";
option go_package = ".;enum";

message HelloReply {
map<string, string> m = 1; // 定义类型,传错类型会报错。
}

(2)go 文件中使用 map 类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package main

import (
"fmt"
mapproto "learngo/repeated/map_proto"
)

func main() {
reply := mapproto.HelloReply{
M: map[string]string{
"name": "Tom",
"company": "xiaomi",
},
}
fmt.Println(reply.M)
}

2.11 timestamp 类型

时间戳类型,protobuf 扩展的类型。

(1)proto 文件定义

timestamp 定义在 google/protobuf/timestamp.proto 文件中。

1
2
3
4
5
6
7
8
9
// hello.proto
syntax = "proto3";
option go_package = ".;hello";

import "google/protobuf/timestamp.proto";

message Reply {
google.protobuf.Timestamp requestTime= 1;
}

(2)go 文件中使用 timestamp 类型

bp.go 中是这个样定义的,RequestTime 是 timestamppb.Timestamp 指针类型。

timestamppb.Timestamp 导入的是 timestamppb "google.golang.org/protobuf/types/known/timestamppb"

1
2
3
4
5
6
7
8
// hello.pb.go
type Reply struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

RequestTime *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=requestTime,proto3" json:"requestTime,omitempty"`
}

所以在 go 中,需要导入 timestamppb "google.golang.org/protobuf/types/known/timestamppb"

具体使用如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// client.go
package main

import (
"fmt"
"time"

hello "learngo/timestamp/proto"

timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)

func main() {
reply := hello.Reply{
RequestTime: timestamppb.New(time.Now()), // 【获取当前的时间戳】
}
fmt.Println(reply.RequestTime)
}

3.gRPC

3.1 最小实践

目的:server 实现一个 SayHello 的函数,client 进行远程调用。

实现:

  1. 使用 proto 文件,约定传输的消息类型、函数接口。
  2. 使用 protoc 生成 stub 文件 helloworld.pb.go,生成的主要内容是:
    • 消息类型的编解码;
    • client:实例化 client 的函数、client interface(用于封装远程调用函数);
    • server:server interface(用于封装远程调用函数)、注册 server 的函数。
  3. server 端:完成具体的业务逻辑来实现接口 + 实例化 grpc server、注册、监听。
  4. client 端:拨号、实例化 client、远程调用。

(1)helloworld.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
syntax = "proto3";
option go_package = ".;helloworld";

service Greeter { // 定义服务(也就是 interface)
rpc SayHello(HelloRequest) returns (HelloResponse);
}

message HelloRequest { // 定义消息类型(也就是结构体类型)
string name = 1; // 1 是编号不是值
}

message HelloResponse {
string reply = 1;
}

(2)server.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"context" // 自带的
"net"

// 三方的
"google.golang.org/grpc"

helloworld "learngo/grpc/proto" // 自己的
)

type Server struct{}

// SayHello 是 server 的接口,虽然在 proto 文件中没有定义 Context、error,但需要强制加上。
func (s *Server) SayHello(ctx context.Context, request *helloworld.HelloRequest) (*helloworld.HelloResponse, error) { // 具体业务逻辑
return &helloworld.HelloResponse{
Reply: "hello " + request.Name,
}, nil
}

func main() {

// 1.实例化一个 grpc 的 server
g := grpc.NewServer()

// 2.注册
// RegisterGreeterServer 是自动生成的函数。利用鸭子类型类传递实例化的结构体。
helloworld.RegisterGreeterServer(g, &Server{})

//3.启动监听
listener, err := net.Listen("tcp", "0.0.0.0:8080")
if err != nil {
panic("failed to listen:" + err.Error())
}
err = g.Serve(listener)
if err != nil {
panic("failed to start: " + err.Error())
}
}

(3)client.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package main

import (
"context"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

helloworld "learngo/grpc/proto"
)

func main() {
// 1.拨号
conn, err := grpc.Dial("127.0.0.1:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic("failed to dial: " + err.Error())
}
defer func(conn *grpc.ClientConn) { // 连接用完需关闭,使用闭包的方式。
err := conn.Close()
if err != nil {
panic("failed to close: " + err.Error())
}
}(conn)

// 2.实例化 client
// NewGreeterClient 是自动生成的代码。
client := helloworld.NewGreeterClient(conn)

// 3.远程调用
response, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "Tom"})
if err != nil {
panic("failed to call: " + err.Error())
}
fmt.Println(response.Reply)
}

3.2 流模式

3.2.1 四种模式

(1)简单模式(Simple RPC)

客户端发起一次请求,服务端响应一次数据。

(2)服务端数据流模式(Server-side streaming RPC)

客户端发起一次请求,服务端返回一段连续的数据流。例如:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据持续地返回给客户端。

(3)客户端数据流模式(Client-side streaming RPC)

客户端持续向服务端发送数据流,发送结束后,服务端返回一个响应。例如:传感器向服务端上报数据。

(4)双向数据流模式(Bidirectional streaming RPC)

客户端与服务端都可以向对方发送数据流,可以同时互相发送,实现实时交互。例如聊天机器人。

3.2.2 流模式最小实践

(1)stream.proto

在 service 中,以流模式发送的数据使用 stream 关键字定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
syntax = "proto3";

option go_package=".;proto";

service Greeter {
rpc GetStream(StreamRequestData) returns (stream StreamResponseData); // 服务端流模式,返回 response 是 stream
rpc PostStream(stream StreamRequestData) returns (StreamResponseData); // 客户端流模式
rpc AllStream(stream StreamRequestData) returns ( stream StreamResponseData); // 双向流模式
}

message StreamRequestData {
string data = 1;
}

message StreamResponseData {
string data = 1;
}

(2)server.go

流模式和普通 PRC 相比,server 有三点差异:

  • 参数:使用 stream 标记的参数,在函数实现时,需使用 protoc 自动生成的 streamServer 代替。(不
  • 发送数据流:使用 streamServer 类型的参数,调用 send 函数。
  • 接收数据流:使用 streamServer 类型的参数,调用 send 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package main

import (
"fmt"
"net"
"sync"
"time"

"learngo/streamingGrpc/proto"

"google.golang.org/grpc"
)

const PORT = ":8088"

type server struct{}

// GetStream 实现服务端流模式
func (s *server) GetStream(req *proto.StreamRequestData, res proto.Greeter_GetStreamServer) error { // 使用 流模式 传输的数据,使用 proto 中自动生成的 streamServer 代替函数中的参数。
fmt.Println("\nServer-side streaming RPC")
fmt.Println(req.Data)
i := 0
for {
i++
_ = res.Send(&proto.StreamResponseData{ // Send 函数发送数据
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
time.Sleep(time.Second)
if i > 3 {
break
}
}
return nil
}

// PostStream 实现客户端流模式
func (s *server) PostStream(clientStream proto.Greeter_PostStreamServer) error {
fmt.Println("\nClient-side streaming RPC")
for {
if recv, err := clientStream.Recv(); err != nil { // Recv 函数接收数据
fmt.Println(err)
break
} else {
fmt.Println(recv.Data)
}
}
return nil
}

// AllStream 实现双向流模式
func (s *server) AllStream(allStream proto.Greeter_AllStreamServer) error {
fmt.Println("\nBidirectional streaming RPC")
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
msg, err := allStream.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("msg from client: " + msg.Data)
}
}()
go func() {
defer wg.Done()
i := 0
for {
i++
_ = allStream.Send(&proto.StreamResponseData{
Data: fmt.Sprintf("%v", time.Now().Unix()),
})
if i > 3 {
break
}
time.Sleep(time.Second)
}
}()

wg.Wait()
return nil
}

func main() {

// 1.实例化 grpc server
s := grpc.NewServer()

// 2.注册 server
proto.RegisterGreeterServer(s, &server{})

// 3.监听端口,启动 server
listen, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
_ = s.Serve(listen)

}

(3)client.go

流模式和普通 PRC 相比,client 有三点差异:

  • 参数:使用 stream 标记的参数,函数调用时,无需提供。调用函数后获得返回值为对应的 streamClient。
  • 发送数据流:使用 streamClient 类型的函数返回值,调用 send 函数。
  • 接收数据流:使用 streamClient 类型的函数返回值,调用 send 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package main

import (
"context"
"fmt"
"learngo/streamingGrpc/proto"
"sync"
"time"

"google.golang.org/grpc/credentials/insecure"

"google.golang.org/grpc"
)

func main() {

// 1.拨号连接
conn, err := grpc.Dial(":8088", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
panic(err)
}
}(conn)

// 2.实例化一个 client。(client 结构体中变量为 conn,使用 grpc.ClientConnInterface 接收参数 conn)
client := proto.NewGreeterClient(conn)

// 3.通过 client 来分别调用不同的流模式 RPC。
// 服务端流模式
fmt.Println("\nServer-side streaming RPC")
// 返回值为 Greeter_GetStreamClient,可以调用 Recv 函数。
res, _ := client.GetStream(context.Background(), &proto.StreamRequestData{Data: "tell me time"})
for {
data, err := res.Recv()
if err != nil {
fmt.Println(err)
break
}
fmt.Println(data)
}

// 客户端流模式
fmt.Println("\nClient-side streaming RPC")
postStream, err := client.PostStream(context.Background())
i := 0
for {
i++
_ = postStream.Send(&proto.StreamRequestData{
Data: fmt.Sprintf("client send count: %d", i),
})
time.Sleep(time.Second)
if i > 3 {
break
}
}

// 双向流模式
// 为了保证双向流模式,需使用2个协程分别发送和接收。
fmt.Println("\nBidirectional streaming RPC")
allStream, err := client.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
for {
msg, _ := allStream.Recv()
fmt.Println("msg form server: " + msg.Data)
}
}()

go func() {
defer wg.Done()
i := 0
for {
i++
_ = allStream.Send(&proto.StreamRequestData{
Data: fmt.Sprintf("%d", i),
})
if i > 3 {
break
}
time.Sleep(time.Second)
}
}()
wg.Wait()
}

3.3 metadata 机制

3.4 拦截器

3.5 验证器

3.6 状态码

3.7 错误处理

3.8 超时机制


微服务基础|RPC/gRPC
https://www.aimtao.net/rpc/
Posted on
2022-05-27
Licensed under