1.RPC 1.1 RPC 是什么 1.2 RPC 需要解决的问题 将本地函数放在服务器上运行,有三个主要问题需要解决。
(1)Call 的 ID 映射
本地调用是通过函数指针来调用。
在 RPC 中,所有函数必须有自己的 ID,且 ID 在所有进程中是唯一的。
客户端与服务端分别维护一个函数与 Call ID 的映射表,二者的表不一定相同,但相同函数对应的 Call ID 必须相同。
当客户端需要进行远程调用时,查表得到 Call ID,并传给服务端;服务端通过 Call ID 查表得到所调用的函数,并执行对应的函数代码。
(2)序列化和反序列化
序列化:把对象转化为字节流,进行网络传输。
反序列化:把网络中接收的字节流转化为对象。
(3)网络传输
大部分 RPC 框架都基于 TCP/UDP 协议进行封装。
HTTP 1.x 一旦对方返回结果,就会断开连接,因此存在性能问题。
gRPC 是基于 HTTP 2.0 的,HTTP 2.0 支持长连接。
1.3 通过 HTTP 完成 Add 服务 通过 net/http 库,使用 get 请求完成远程的 add 服务。在这个过程中需要完成 RPC 的三件重要事情:
使用 http 的方式完成 RPC 的问题有两点,这是 RPC 框架需要解决的问题。
写业务逻辑比较麻烦,每个函数都需要写一个请求。
客户端和服务端需要清晰地明确参数是如何传递的。
(1)服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 func main () { http.HandleFunc("/add" , func (writer http.ResponseWriter, request *http.Request) { _ = request.ParseForm() fmt.Println("path: " , request.URL.Path) a, _ := strconv.Atoi(request.Form["a" ][0 ]) b, _ := strconv.Atoi(request.Form["b" ][0 ]) writer.Header().Set("Content-Type" , "application/json" ) bytes, _ := json.Marshal(map [string ]int { "data" : a + b, }) _, _ = writer.Write(bytes) }) _ = http.ListenAndServe(":8000" , nil ) }
(2)客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type ResponseData struct { Data int `json:"data"` }func Add (a, b int ) int { response, _ := http.Get(fmt.Sprintf("http://127.0.0.1:8000/%s?a=%d&b=%d" , "add" , a, b)) defer response.Body.Close() body, _ := ioutil.ReadAll(response.Body) responseData := ResponseData{} json.Unmarshal(body, &responseData) return responseData.Data }func main () { fmt.Println(Add(1 , 2 )) }
1.4 RPC 开发的要素 (1)RPC 开发的四大要素
client:负责发起服务调用,传递参数。
clent stub:运行在客户端机器上,负责存储要调用的服务端地址等信息,还负责将客户端的请求打包成数据包,发送给服务端。
server:有客户端要调用的方法,负责执行调用的方法。
server stub:运行在服务端机器上,负责接受客户端的数据包,并调用在 server 上的方法,并将调用结果进行数据处理,打包返回给客户端。
(2)原理图
(3)过程
client:client 想要发起远程过程调用,通过调用 client stub 的方式,传递想要调用的方法及参数。
client stub:接收到 client 的调用请求,将 client 请求调用的方法名、参数等信息序列化,打包成数据包;并查找到远程服务器的 IP 地址及端口。
socket:,通过 socket 协议,将数据包发送给服务端。
server stub:接受客户端的数据包,并通过约定好的协议进行反序列化,得到请求的方法名和参数;并调用 server 对应的方法,并传入参数。
server:执行被调用的方法,处理业务;并将结果返回给 server stub。
server stub:将 server 返回的结果按照约定的协议,进行序列化,打包成数据包。
socket:通过 socket 协议,将数据包发送给服务端。
client stub:接收到返回的数据,按照约定进行反序列化,并将调用结果传给 client。
client:得到调用结果。
至此,整个 RPC 调用完成。
(4)动态代理技术
在 client stub 和 server stub 中,会使用 动态代理技术 自动生成一段代码,这样我们就可以更专注于业务的编码,不用对每个函数调用进行都进行封装一遍。
1.5 使用 go 内置的 RPC 下面简单介绍 go 内置的 RPC,主要用到 net
、net/rpc
这两个包。
(1)服务端
实例化 server
将函数注册到 RPC 中
启动服务
RPC 主要解决了的问题:call id、序列化和反序列化(使用的 Gob 协议)。listen 和 accept 的都是 net 包完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 type HelloService struct { }func (s *HelloService) Hello(request string , reply *string ) error { *reply = "hello " + request return nil }func main () { listener, _ := net.Listen("tcp" , "localhost:8899" ) _ = rpc.RegisterName("CommonHelloService" , &HelloService{}) conn, _ := listener.Accept() rpc.ServeConn(conn) }
补充:为什么 RegisterName 的第二个参数要取地址?
这里接收 &HelloService{} 是一个 interface 变量,传给 interface 变量值还是指针,由 HelloService 在实现函数时,用的指针接受者还是值接受者决定。所以,
如果不知道为什么,建议阅读 接口的值类型
(2)客户端
建立连接
远程调用函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func main () { client, err := rpc.Dial("tcp" , "localhost:8899" ) if err != nil { panic ("connect fail." ) } var reply string err = client.Call("CommonHelloService.Hello" , "xiaomi" , &reply) if err != nil { panic ("call fail." ) } fmt.Println(reply) }
1.6 序列化格式使用 json RPC 使用的序列化协议是 Gob,而 json 是使用更加广泛的格式,为了满足跨语言的需求,一般会使用 json 格式来编解码数据。下面将其换成 json。
(1)服务端
只需要将 rpc.ServeConn
换成 rpc.ServeCodec
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type HelloService struct {}func (s *HelloService) Hello(request string , reply *string ) error { *reply = "hello " + request return nil }func main () { listener, _ := net.Listen("tcp" , "localhost:8899" ) _ = rpc.RegisterName("HelloService" , &HelloService{}) conn, _ := listener.Accept() rpc.ServeCodec(jsonrpc.NewServerCodec(conn)) }
(2)客户端
需要将 rpc.Dial
换成 net.Dial
,否则使用 rpc.Dial
会使用 Gob 的协议进行编解码。并通过 conn 生成一个 ClientCodec。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func main () { conn, err := net.Dial("tcp" , "localhost:8899" ) client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn)) if err != nil { panic ("connect fail." ) } var reply string err = client.Call("HelloService.Hello" , "xiaomi" , &reply) if err != nil { panic ("call fail." ) } fmt.Println(reply) }
1.7 监听端口使用 http 内置的 RPC 监听的是 TCP 端口,跨语言完成调用时,客户端需要使用 socket 来传递 TCP 数据报。如果监听 http 请求,客户端只需发送 Get/Post 请求即可。
(1)服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func main () { _ = rpc.RegisterName("HelloService" , &HelloService{}) http.HandleFunc("/jsonrpc" , func (writer http.ResponseWriter, request *http.Request) { var conn io.ReadWriteCloser = struct { io.Writer io.ReadCloser }{ ReadCloser: request.Body, Writer: writer, } rpc.ServeRequest(jsonrpc.NewServerCodec(conn)) }) http.ListenAndServe(":8899" , nil ) }
(2)客户端
// 待补充
1.8 封装 Stub 需求:
client 使用 client.Hello(request, &reply)
的格式优雅远程调用。
client/server 使用同一个服务命令,手动写可能出现服务名称不一致/服务名称冲突的错误。
server 可以专注业务逻辑。
sever 无序关注服务名称、结构体类型。
解决:
需求一,使用 clientStub 将 Dial 和 Call 过程进行封装。
需求二,使用共同的 handler 文件,在 handler 中将服务名称定义成常量。
需求三,将业务逻辑部分抽离出来,放到其他文件中(这里为了方便演示,放在了 handler 中,其实不能放在 handler 中,handler 是共有文件,应该新建一个文件。)。
需求四,使用 serverStub 将 rpc.RegisterName
进行封装,并使用接口接收结构体指针,使 serverStub 与结构体类型解耦。
(1)handler
1 2 3 4 5 6 7 8 9 10 11 12 13 package handlerconst HelloServiceName = "handler/CommonHelloService" type HelloService struct {}func (s *HelloService) Hello(request string , reply *string ) error { *reply = "hello " + request return nil }
(2)clientStub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package clientStubimport ( "learngo/RPCWithStub/handler" "net/rpc" )type HelloServiceStub struct { *rpc.Client }func NewHelloServiceClient (protocol, address string ) (HelloServiceStub, error ) { conn, err := rpc.Dial(protocol, address) if err != nil { panic (err) } return HelloServiceStub{conn}, nil }func (c HelloServiceStub) Hello(request string , reply *string ) error { err := c.Call(handler.HelloServiceName+".Hello" , request, reply) if err != nil { panic (err) } return nil }
(3)client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package mainimport ( "fmt" "learngo/RPCWithStub/clientStub" )func main () { client, _ := clientStub.NewHelloServiceClient("tcp" , "localhost:8899" ) var reply string err := client.Hello("xiaomi" , &reply) if err != nil { panic ("call fail." ) } fmt.Println(reply) }
(4)serverStub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package serverStubimport ( "learngo/RPCWithStub/handler" "net/rpc" )type HelloServer interface { Hello(request string , reply *string ) error }func RegisterHelloService (srv HelloServer) error { return rpc.RegisterName(handler.HelloServiceName, srv) }
(5)server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package mainimport ( "learngo/RPCWithStub/handler" "learngo/RPCWithStub/serverStub" "net" "net/rpc" )func main () { listener, _ := net.Listen("tcp" , "localhost:8899" ) _ = serverStub.RegisterHelloService(&handler.HelloService{}) conn, _ := listener.Accept() rpc.ServeConn(conn) }
1.9 总结 为什么要花篇幅来分析内置 RPC 的使用?因为以上所解决的问题,就是 gRPC 或其他 RPC 框架所解决的问题。
更重要的是 serverStub/clientStub 可以通过 protobuf 自动生成 ,并且可以生成多语言版本。
2.protobuf Reference Documents:https://developers.google.com/protocol-buffers/docs/proto3
2.1 是什么 Protocol Buffer,是 google 退出的一种轻量高效的结构化数据存储格式。和 json、xml 作用一样,但 protobuf 的性能远超 json、xml,数据压缩比比较高。
protobuf 经历了 protobuf2 和 protobuf3,目前主流版本是 protobuf3。
同类的格式还有:java 中的 dubbo/rmi/hessian,python 中的 messagepack,go 中的 gob。
2.2 protobuf 优缺点 优点:
缺点:
2.3 环境配置 需要安装 protoc ,下载 protoc-gen-go 依赖包。
1 2 3 brew install protobuf go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
2.4 最小实践 protobuf 最原始的功能就是数据编码,使用主要分三步;
使用 proto 文件约束数据类型。(也就是结构体。)
自动生成源代码。
使用 github.com/golang/protobuf/proto
对数据进行序列化/反序列化。
注意:使用 proto 只能进行数据编码,加上 grpc 插件后,可以定义一些服务。
(1)使用 proto 文件约束数据类型
注意:1 是编号不是值,编码时只认编号不认变量名。(**坑:**联调时两个人编号不一致。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 syntax = "proto3" ; option go_package = ".;proto" ; service SayHello { rpc Hello(HelloRequest) returns (HelloResponse) ; }message HelloRequest { string name = 1 ; }message HelloResponse { string reply = 1 ; }
(2)自动生成 go 源码
已经废弃的写法:
1 protoc -I . helloworld.proto --go_out=plugins=grpc:.
-I .
:表示 include 当前目录。在当前目录寻找 helloworld.proto。--go_out
:表示生成 go 的源码,--java_out
即可生成 java 的源码。等号后面可以加选项,选项使用键值对表示。plugins=grpc
:一个键值对,表示使用 grpc 插件。:.
:使用 :
将前面的键值对和路径隔开。.
表示在当前目录下生成 bp.go
文件推荐的写法:
1 protoc -I . helloworld.proto --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative
-I .
:表示 include 当前目录。在当前目录寻找 helloworld.proto。
--go_out=.
:表示生成用于 protobuf 编码的 go 源码,--java_out
即可生成 java 的源码。等号后面是生成 bp.go 的路径。
--go_opt=
:等号后面可以加选项,选项使用键值对表示。
--go-grpc_out=.
:表示生成 grpc 的源码。等号后面是生成 grpc 的 bp.go 的路径。
--go-grpc_opt=
:等号后面可以加选项,选项使用键值对表示。只对 grpc 部分代码有效。
主要有两个变化:
(4)使用 protobuf 编解码
使用 proto 序列化/反序列化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package mainimport ( "fmt" helloworld "learngo/grpc/proto" "github.com/golang/protobuf/proto" )func main () { req := helloworld.HelloRequest{ Name: "Tom" , } bytes, _ := proto.Marshal(&req) fmt.Println(bytes) newBytes := helloworld.HelloRequest{} _ = proto.Unmarshal(bytes, &newBytes) fmt.Println(newBytes.Name) }
2.5 基本类型和默认值 (1)基本类型
https://developers.google.com/protocol-buffers/docs/proto3#scalar
.proto Type Notes Go Type double float64 float float32 int32 Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead.使用可变长度编码。编码负数效率低下——如果您的字段可能具有负值,请使用 sint32。 int32 int64 Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead.使用可变长度编码。编码负数效率低下——如果您的字段可能具有负值,请使用 sint64。 int64 uint32 Uses variable-length encoding.使用可变长度编码。 uint32 uint64 Uses variable-length encoding. uint64 sint32 Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s.使用可变长度编码。有符号的 int 值。这些比常规 int32 更有效地编码负数。 int32 sint64 Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s.使用可变长度编码。有符号的 int 值。这些比常规 int64 更有效地编码负数。 int64 fixed32 Always four bytes. More efficient than uint32 if values are often greater than 228.总是四个字节。如果值通常大于228,则比uint32更有效率。 uint32 fixed64 Always eight bytes. More efficient than uint64 if values are often greater than 256.总是八字节。如果值通常大于256,则比uint64更有效率。 uint64 sfixed32 Always four bytes. int32 sfixed64 Always eight bytes. int64 bool bool string A string must always contain UTF-8 encoded or 7-bit ASCII text, and cannot be longer than 232.字符串必须始终包含UTF-8编码或7位ASCII文本,并且不能超过232。 string bytes May contain any arbitrary sequence of bytes no longer than 232.可以包含任何不超过232的任意字节序列。 []byte
(2)默认值
https://developers.google.com/protocol-buffers/docs/proto3#default
当解析消息时,如果编码的消息不包含特定的 singular 元素,则解析对象中的相应字段将设置为该字段的默认值。
.proto Type default string “” byte ‘’ bool false 数值类型 0 enum 默认时第一个定义的枚举值,必须为 0 message 默认值根据使用的语言确定的
2.6 指定字段规则 (1)option
1 option go_package = "../common/stream/proto/v1;helloworld" ;
../common/stream/proto/v1
:指定生成 pb.go 文件所在的路径。
helloworld
:生成 pb.go 文件的包名。
注:go_package 只会对 go 进行约束,不会影响其他语言。
在开发过程中,会将 proto 文件和 pb.go 文件分开存放。文件结构如下,其中生成的 bp.go 文件可能是公共的,并且分目录和版本。
1 2 3 4 5 6 7 8 9 10 11 12 . ├── common │ └── stream │ └── proto │ └── v1 │ └── stream.pb.go ├── proto │ └── stream.proto ├── client │ └── client .go └── server └── server .go
(2)singular
单数,表示字段出现 0 次或 1 次。proto3 每个字段的修饰符默认是 singular。
(3)repeated
复数,表示该字段可重复,对应的 go 中的切片。
1 2 3 message hello { repeated int32 num = 1 ; }
2.7 嵌套 proto 文件 (1)为什么需要嵌套 proto 文件?
(2)实际的例子:empty message 的应用
**需求:**Ping 是一个测试是否连通的服务,发送请求时无需传入参数。但 service 定义时必须传入 message,此时就可以使用 Empty message。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 syntax = "proto3" ;option go_package=".;proto" ;service Greeter { rpc Ping(Empty) returns (Pong) ; }message Pong { int64 id = 1 ; }message Empty { }
这是一个很常见的需求,无需每个 proto 文件中都定义一个 Empty message,会冲突。
**解决:**定义一个公共的 proto 文件,base.proto。在 hello.proto 中 improt base.proto。
1 2 3 4 5 6 7 8 9 10 11 12 13 syntax = "proto3" ;option go_package=".;proto" ;message Pong { int64 id = 1 ; }message Empty { }
1 2 3 4 5 6 7 8 9 10 syntax = "proto3" ;import "base.proto" ; option go_package=".;proto" ;service Greeter { rpc Ping(Empty) returns (Pong) ; }
(3)使用 protobuf 内置的 message
注意 proto 文件中的导入方式。improt 时用路径表示,使用时用点表示
1 2 3 4 5 6 7 8 9 10 11 12 13 syntax = "proto3" ;import "google/protobuf/empty.proto" ; option go_package=".;proto" ;service Greeter { rpc Ping(google.protobuf.Empty) returns (Pong) ; }message Pong { int64 id = 1 ; }
go 文件使用 Empty message 类型时,需导入 empty.proto 文件中 go_package 路径。
1 2 3 4 5 6 7 8 9 package mainimport "github.com/golang/protobuf/ptypes/empty" func main () { empty.Empty{} }
(4)improt 路径问题
improt 同目录下的 proto 文件可直接导入。但是导入不同目录下的 proto 文件,需要注意 import 不能使用相对路径。
解决:可以写 proto 文件名,并在 protoc 时使用 -I
参数在指定目录下寻找该 proto 文件。
比如在 3.5 验证器 小节,hello.proto 文件中需要 import validate.proto 文件。
1 protoc -I . -I $GOPATH /pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/ hello.proto --go_out=.
-I . -I $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/
:表示在当前目录和 $GOPATH/pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/
这两个目录下,寻找 hello.proto 和 import 的文件。
2.8 嵌套 message 对象 (1)第一种方式
1 2 3 4 5 6 7 8 9 10 11 12 syntax = "proto3" ;option go_package = ".;proto" ;message Result { string name = 1 ; string url = 2 ; }message Reply { string message = 1 ; repeated Result data = 2 ; }
(2)第二种方式
内部嵌套,嵌套的 message 只用一次,防止 proto 文件中定义过多 message。
1 2 3 4 5 6 7 8 9 10 11 12 syntax = "proto3" ;option go_package = ".;proto" ;message Reply { string message = 1 ; repeated Result data = 2 ; message Result { string name = 1 ; string url = 2 ; } }
使用上有些特殊,pb.go 文件里会定义为 Reply_Result,所以 go 中实例化时需要使用 Reply_Result。
1 2 3 4 5 6 7 8 type Reply_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` Url string `protobuf:"bytes,2,opt,name=url,proto3" json:"url,omitempty"` }
2.9 enum 类型 (1)proto 文件定义
1 2 3 4 5 6 7 8 9 10 11 12 syntax = "proto3" ;option go_package = ".;enum" ;message Reply { string message = 1 ; Gender gender = 2 ; }enum Gender { MALE = 0 ; FEMALE = 1 ; }
(2)go 文件中使用枚举类型
1 2 3 4 5 6 7 8 9 10 11 package mainimport "learngo/repeated/enum" func main () { r := enum.Reply{ Message: "xxx" , Gender: enum.Gender_MALE, } fmt.Println(r) }
2.10 map 类型 map 虽然方便,但是不要大量写 map,在传入参数和接收参数时,不知道具体字段是什么。
(1)proto 文件定义
1 2 3 4 5 6 syntax = "proto3" ;option go_package = ".;enum" ;message HelloReply { map<string , string > m = 1 ; }
(2)go 文件中使用 map 类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package mainimport ( "fmt" mapproto "learngo/repeated/map_proto" )func main () { reply := mapproto.HelloReply{ M: map [string ]string { "name" : "Tom" , "company" : "xiaomi" , }, } fmt.Println(reply.M) }
2.11 timestamp 类型 时间戳类型,protobuf 扩展的类型。
(1)proto 文件定义
timestamp 定义在 google/protobuf/timestamp.proto 文件中。
1 2 3 4 5 6 7 8 9 syntax = "proto3" ;option go_package = ".;hello" ;import "google/protobuf/timestamp.proto" ;message Reply { google.protobuf.Timestamp requestTime= 1 ; }
(2)go 文件中使用 timestamp 类型
自动生成的 bp.go 中是这样定义的,RequestTime 是 timestamppb.Timestamp
指针类型。
timestamppb.Timestamp
导入的是 timestamppb "google.golang.org/protobuf/types/known/timestamppb"
。
1 2 3 4 5 6 7 8 type Reply struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields RequestTime *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=requestTime,proto3" json:"requestTime,omitempty"` }
所以在 go 中,需要导入 timestamppb "google.golang.org/protobuf/types/known/timestamppb"
。
具体使用如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package mainimport ( "fmt" "time" hello "learngo/timestamp/proto" timestamppb "google.golang.org/protobuf/types/known/timestamppb" )func main () { reply := hello.Reply{ RequestTime: timestamppb.New(time.Now()), } fmt.Println(reply.RequestTime) }
3.gRPC 3.1 最小实践 目的:server 实现一个 SayHello 的函数,client 进行远程调用。
实现:
使用 proto 文件,约定传输的消息类型、函数接口。
使用 protoc 生成 stub 文件 helloworld.pb.go 和 helloworld_gpc.pb.go,生成的主要内容是:
消息类型的编解码; client stub:实例化 client 的函数、client interface(用于封装远程调用函数); server stub:server interface(用于封装远程调用函数)、注册 server 的函数。 server 端:完成具体的业务逻辑来实现接口 + 实例化 grpc server、注册、监听。
client 端:拨号、实例化 client、远程调用。
(1)helloworld.proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 syntax = "proto3" ; option go_package = ".;helloworld" ; service Greeter { rpc SayHello(HelloRequest) returns (HelloResponse); } message HelloRequest { string name = 1 ; } message HelloResponse { string reply = 1 ; }
(2)server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package mainimport ( "context" "net" "google.golang.org/grpc" helloworld "learngo/grpc/proto" )type Server struct {}func (s *Server) SayHello(ctx context.Context, request *helloworld.HelloRequest) (*helloworld.HelloResponse, error ) { return &helloworld.HelloResponse{ Reply: "hello " + request.Name, }, nil }func main () { g := grpc.NewServer() helloworld.RegisterGreeterServer(g, &Server{}) listener, err := net.Listen("tcp" , "0.0.0.0:8080" ) if err != nil { panic ("failed to listen:" + err.Error()) } err = g.Serve(listener) if err != nil { panic ("failed to start: " + err.Error()) } }
(3)client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package mainimport ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" helloworld "learngo/grpc/proto" )func main () { conn, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic ("failed to dial: " + err.Error()) } defer func (conn *grpc.ClientConn) { err := conn.Close() if err != nil { panic ("failed to close: " + err.Error()) } }(conn) client := helloworld.NewGreeterClient(conn) response, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "Tom" }) if err != nil { panic ("failed to call: " + err.Error()) } fmt.Println(response.Reply) }
3.2 流模式 3.2.1 四种模式 (1)简单模式(Simple RPC)
客户端发起一次请求,服务端响应一次数据。
(2)服务端数据流模式(Server-side streaming RPC)
客户端发起一次请求,服务端返回一段连续的数据流。例如:客户端向服务端发送一个股票代码,服务端就把该股票的实时数据持续地返回给客户端。
(3)客户端数据流模式(Client-side streaming RPC)
客户端持续向服务端发送数据流,发送结束后,服务端返回一个响应。例如:传感器向服务端上报数据。
(4)双向数据流模式(Bidirectional streaming RPC)
客户端与服务端都可以向对方发送数据流,可以同时互相发送,实现实时交互。例如聊天机器人。
3.2.2 流模式最小实践 (1)stream.proto
在 service 中,以流模式发送的数据使用 stream 关键字定义。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 syntax = "proto3" ; option go_package=".;proto" ; service Greeter { rpc GetStream(StreamRequestData) returns (stream StreamResponseData); rpc PostStream(stream StreamRequestData) returns (StreamResponseData); rpc AllStream(stream StreamRequestData) returns ( stream StreamResponseData); } message StreamRequestData { string data = 1 ; } message StreamResponseData { string data = 1 ; }
(2)server.go
流模式和普通 PRC 相比,server 有三点差异:
参数:使用 stream 标记的参数,在函数实现时,需使用 protoc 自动生成的 streamServer 代替。(不
发送数据流:使用 streamServer 类型的参数,调用 send 函数。
接收数据流:使用 streamServer 类型的参数,调用 send 函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 package mainimport ( "fmt" "net" "sync" "time" "google.golang.org/grpc" "learngo/streamingGrpc/proto" )const PORT = ":8088" type server struct {}func (s *server) GetStream(req *proto.StreamRequestData, res proto.Greeter_GetStreamServer) error { fmt.Println("\nServer-side streaming RPC" ) fmt.Println(req.Data) i := 0 for { i++ _ = res.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("%v" , time.Now().Unix()), }) time.Sleep(time.Second) if i > 3 { break } } return nil }func (s *server) PostStream(clientStream proto.Greeter_PostStreamServer) error { fmt.Println("\nClient-side streaming RPC" ) for { if recv, err := clientStream.Recv(); err != nil { fmt.Println(err) break } else { fmt.Println(recv.Data) } } return nil }func (s *server) AllStream(allStream proto.Greeter_AllStreamServer) error { fmt.Println("\nBidirectional streaming RPC" ) wg := sync.WaitGroup{} wg.Add(2 ) go func () { defer wg.Done() for { msg, err := allStream.Recv() if err != nil { fmt.Println(err) break } fmt.Println("msg from client: " + msg.Data) } }() go func () { defer wg.Done() i := 0 for { i++ _ = allStream.Send(&proto.StreamResponseData{ Data: fmt.Sprintf("%v" , time.Now().Unix()), }) if i > 3 { break } time.Sleep(time.Second) } }() wg.Wait() return nil }func main () { s := grpc.NewServer() proto.RegisterGreeterServer(s, &server{}) listen, err := net.Listen("tcp" , PORT) if err != nil { panic (err) } _ = s.Serve(listen) }
(3)client.go
流模式和普通 PRC 相比,client 有三点差异:
参数:使用 stream 标记的参数,函数调用时,无需提供。调用函数后获得返回值为对应的 streamClient。
发送数据流:使用 streamClient 类型的函数返回值,调用 send 函数。
接收数据流:使用 streamClient 类型的函数返回值,调用 send 函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 package mainimport ( "context" "fmt" "sync" "time" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc" "learngo/streamingGrpc/proto" )func main () { conn, err := grpc.Dial(":8088" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic (err) } defer func (conn *grpc.ClientConn) { err := conn.Close() if err != nil { panic (err) } }(conn) client := proto.NewGreeterClient(conn) fmt.Println("\nServer-side streaming RPC" ) res, _ := client.GetStream(context.Background(), &proto.StreamRequestData{Data: "tell me time" }) for { data, err := res.Recv() if err != nil { fmt.Println(err) break } fmt.Println(data) } fmt.Println("\nClient-side streaming RPC" ) postStream, err := client.PostStream(context.Background()) i := 0 for { i++ _ = postStream.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("client send count: %d" , i), }) time.Sleep(time.Second) if i > 3 { break } } fmt.Println("\nBidirectional streaming RPC" ) allStream, err := client.AllStream(context.Background()) wg := sync.WaitGroup{} wg.Add(2 ) go func () { defer wg.Done() for { msg, _ := allStream.Recv() fmt.Println("msg form server: " + msg.Data) } }() go func () { defer wg.Done() i := 0 for { i++ _ = allStream.Send(&proto.StreamRequestData{ Data: fmt.Sprintf("%d" , i), }) if i > 3 { break } time.Sleep(time.Second) } }() wg.Wait() }
(1)metadata 是什么?
在 HTTP 数据传输中,除了 get/post 的参数,还有些参数会通过 metadata 来传输。
(2)新建 metadata
注意:虽然写的的是 map[string]string
,但实际上是 map[string][]string
,每一个 key 对应的 value 都是一个 map[string]string
。
1 2 3 4 5 6 7 8 9 10 11 12 md := metadata.New(map [string ]string { "username" : "admin" , "Username" : "admin2" , "password" : "Qwer123" , }) md = metadata.Pairs( "Username" , "admin" , "Username" , "admin2" , "Password" , "Qwer123" , )
(3)发送 metadata
1 ctx := metadata.NewOutgoingContext(context.Background(), md)
(4)接收 metadata
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 md, ok := metadata.FromIncomingContext(ctx)if ok { fmt.Println("get metadata success." ) for key, val := range md { fmt.Println(key, val) } } else { fmt.Println("get metadata error." ) }
(5)完整例子
metadata.proto
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 syntax = "proto3" ;option go_package = ".;proto" ;service Greeter { rpc SayHello (HelloRequest) returns (HelloResponse) ; }message HelloRequest { string name = 1 ; }message HelloResponse { string message = 1 ; }
server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 package mainimport ( "context" "fmt" "net" "google.golang.org/grpc/metadata" "google.golang.org/grpc" "learngo/metadata/proto" )type server struct {}func (s server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error ) { md, ok := metadata.FromIncomingContext(ctx) if ok { fmt.Println("get metadata success." ) for key, val := range md { fmt.Println(key, val) } } else { fmt.Println("get metadata error." ) } response := &proto.HelloResponse{ Message: "hello, " + request.Name, } return response, nil }func main () { s := grpc.NewServer() proto.RegisterGreeterServer(s, server{}) listener, err := net.Listen("tcp" , ":8080" ) if err != nil { panic (err) } err = s.Serve(listener) if err != nil { panic (err) } }
client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 package mainimport ( "context" "fmt" "google.golang.org/grpc/metadata" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "learngo/metadata/proto" )func main () { md := metadata.New(map [string ]string { "username" : "admin" , "Username" : "admin2" , "password" : "Qwer123" , }) md = metadata.Pairs( "Username" , "admin" , "Username" , "admin2" , "Password" , "Qwer123" , ) ctx := metadata.NewOutgoingContext(context.Background(), md) conn, err := grpc.Dial("127.0.0.1:8080" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic (err) } defer func (conn *grpc.ClientConn) { err = conn.Close() if err != nil { panic (err) } }(conn) client := proto.NewGreeterClient(conn) request := proto.HelloRequest{ Name: "Golang" , } response, err := client.SayHello(ctx, &request) if err != nil { panic (err) } fmt.Println(response.Message) }
3.4 Interceptor 拦截器 (1)拦截器分类
grpc 的拦截器可以实现在 server 端,也可以实现在 client 端。
第三方库 go-grpc-middleware 实现了很多拦截器的功能:认证(auth)、 日志( logging)、监控(monitoring)等,可以直接使用,值得借鉴学习。
由于 unary 方法和 stream 方法参数不同,拦截器又分为一元拦截器(unary interceptor)和流拦截器(stream interceptor)。所以共有 4 种类型变量,每个变量都是一个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 type UnaryServerInterceptor func (ctx context.Context, req interface {}, info *UnaryServerInfo, handler UnaryHandler) (resp interface {}, err error )type StreamServerInterceptor func (srv interface {}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error type UnaryClientInterceptor func (ctx context.Context, method string , req, reply interface {}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error type StreamClientInterceptor func (ctx context.Context, desc *StreamDesc, cc *ClientConn, method string , streamer Streamer, opts ...CallOption) (ClientStream, error )
(2)拦截器流程
主要流程是:
定义拦截器
预处理 pre-processing 调用RPC方法 invoking RPC method 后处理 post-processing 合适的时机指定拦截器
server:实例化 grpc server 时,作为 ServerOption 传入。 client:拨号时,作为 DialOption 传入。 (3) server 端的 unary 拦截器
如何找到该实现的函数:
看 grpc.UnaryInterceptor
定义源码,需传入参数 i UnaryServerInterceptor
。
查看参数 UnaryServerInterceptor
定义源码,复制具体的函数,进行实现。
需求:在处理请求前后,打印两句 log。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package mainimport ( "context" "fmt" "net" "google.golang.org/grpc" "learngo/interceptor/proto" )type server struct {}func (s *server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error ) { return &proto.HelloResponse{ Message: "hello," + request.Name, }, nil }func main () { interceptor := func (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { fmt.Println("Request is coming." ) resp, err = handler(ctx, req) fmt.Println("The request has been processed." ) return resp, err } opt := grpc.UnaryInterceptor(interceptor) s := grpc.NewServer(opt) proto.RegisterGreeterServer(s, &server{}) listener, err := net.Listen("tcp" , "0.0.0.0:8899" ) if err != nil { panic (err) } err = s.Serve(listener) if err != nil { panic (err) } }
除了上述写法,也可以将 UnaryServerInterceptor 拦截器方法单独拿出来,如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func main () { opt := grpc.UnaryInterceptor(interceptor) s := grpc.NewServer(opt) proto.RegisterGreeterServer(s, &server{}) listener, err := net.Listen("tcp" , "0.0.0.0:8899" ) if err != nil { panic (err) } err = s.Serve(listener) if err != nil { panic (err) } }func interceptor (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { fmt.Println("Request is coming." ) resp, err = handler(ctx, req) fmt.Println("The request has been processed." ) return resp, err }
(4)client 端的 unary 拦截器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package mainimport ( "context" "fmt" "time" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc" "learngo/interceptor/proto" )func main () { interceptor := func (ctx context.Context, method string , req, reply interface {}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) duration := time.Since(start) fmt.Println("duration: " , duration) return err } opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(interceptor), } conn, err := grpc.Dial("127.0.0.1:8899" , opts...) if err != nil { panic (err) } defer func (conn *grpc.ClientConn) { err = conn.Close() if err != nil { panic (err) } }(conn) client := proto.NewGreeterClient(conn) request := proto.HelloRequest{ Name: "Golang" , } response, err := client.SayHello(context.Background(), &request) if err != nil { panic (err) } fmt.Println(response.Message) }
(5)使用 metadata 和 interceptor 实现 auth 认证
在 client interceptor 中,使用 metadata 传递 appid/appkey;
在 server interceptor 中验证 appid/appkey 是否存在/合法;
如果不存在/不合法,使用 grpc 自带的 status.Error
返回状态码和错误信息。
server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func interceptor (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { fmt.Println("Request is coming." ) md, ok := metadata.FromIncomingContext(ctx) if ok { appid := md["appid" ][0 ] appkey := md["appkey" ][0 ] if appid != "123" || appkey != "456" { err = status.Error(codes.InvalidArgument, "invalid token." ) return resp, err } } else { status.Error(codes.Unauthenticated, "Token no exist." ) return resp, err } resp, err = handler(ctx, req) fmt.Println("The request has been processed." ) return resp, err }
client.go
1 2 3 4 5 6 7 8 9 10 11 interceptor := func (ctx context.Context, method string , req, reply interface {}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { md := metadata.New(map [string ]string { "appid" : "123" , "appkey" : "456" , }) ctx = metadata.NewOutgoingContext(ctx, md) start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) duration := time.Since(start) fmt.Println("duration: " , duration) }
(6)grpc 内置的自定义认证接口
WithPerRPCCredentials
1 2 3 4 5 6 7 func WithPerRPCCredentials (creds credentials.PerRPCCredentials) DialOption { return newFuncDialOption(func (o *dialOptions) { o.copts.PerRPCCredentials = append (o.copts.PerRPCCredentials, creds) }) }
client 无需在拦截器中设置 appid/appkey, WithPerRPCCredentials 可以将传入的 credentials.PerRPCCredentials
转化为 DialOption
。查看 credentials.PerRPCCredentials
源码可以看到,PerRPCCredentials 是一个接口,实现其两个方法,就可以实现该接口。
client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package mainimport ( "context" "fmt" "time" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "learngo/auth/proto" )type Credential struct {}func (c Credential) GetRequestMetadata(ctx context.Context, uri ...string ) (map [string ]string , error ) { return map [string ]string { "appid" : "123" , "appkey" : "456" , }, nil }func (c Credential) RequireTransportSecurity() bool { return false }func main () { interceptor := func (ctx context.Context, method string , req, reply interface {}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) duration := time.Since(start) fmt.Println("duration: " , duration) return err } opts := []grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithUnaryInterceptor(interceptor), grpc.WithPerRPCCredentials(Credential{}), } conn, err := grpc.Dial("127.0.0.1:8899" , opts...) if err != nil { panic (err) } defer func (conn *grpc.ClientConn) { err = conn.Close() if err != nil { panic (err) } }(conn) client := proto.NewGreeterClient(conn) request := proto.HelloRequest{ Name: "Golang" , } response, err := client.SayHello(context.Background(), &request) if err != nil { panic (err) } fmt.Println(response.Message) }
3.5 验证器 (1)一个开源项目
protoc-gen-validate ,该项目目前处于 alpha 阶段,功能是对于 protobuf 中的 message,进行数据验证。
下面简单跑通一下。
(2)原理:
定义规则:proto 中定义了 HelloRequest 这种 message 类型,并在其中标记数据规则。
生成验证规则的代码:protoc-gen-validate 会为 HelloRequest 类型,生成多个结构体方法,其中有一个 Validate 方法。在 Validata 方法,根据设置的数据规则,验证数据。
使用:在实际使用中,HelloRequest.Validate 就可以验证数据。
(3)安装 protoc-gen-validate 二进制文件
注意看 README 文件,可能有更新。
1 2 3 4 5 6 7 8 go get -d github.com/envoyproxy/protoc-gen-validate git clone https://github.com/bufbuild/protoc-gen-validate.gitcd protoc-gen-validate && make build
在 README 文件里有这样一段话,用以说明为什么使用上面的方式下载源码和构建二进制文件。
Yes, our go module path is github.com/envoyproxy/protoc-gen-validate
not bufbuild
this is intentional.
Changing the module path is effectively creating a new, independent module. We would prefer not to break our users. The Go team are working on better cmd/go
support for modules that change paths, but progress is slow. Until then, we will continue to use the bufbuild
module path.
补充说明:实际使用过程中,需要部分源码,也需要二进制文件 protoc-gen-validate。源码使用 go get -d
下载在 $GOPATH/pkg/mod/github.com/envoyproxy/
目录下,二进制文件在新的文件夹里编译后放在 $GOPATH/bin
目录下。
(4)导入 proto 文件并生成 bp.go 文件
hello.proto
在 message 中设置验证条件。Constraint Rules
import "validate.proto";
可能会显示找不到文件。没关系,后面使用 protoc 生成代码时,指定该文件的目录。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 syntax = "proto3" ;option go_package = ".;proto" ;import "validate.proto" ;service Greeter { rpc SayHello (HelloRequest) returns (HelloResponse) ; }message HelloRequest { uint64 id = 1 [(validate.rules).uint64 .gt = 999 ]; string email = 2 [(validate.rules).string .email = true ]; }message HelloResponse { string message = 1 ; }
生成代码命令
注意使用 -I
指定在哪个目录中找 validate.proto 。使用 go get -d 下载的代码,会在 $GOPATH/pkg/mod/
目录下。
1 2 3 4 5 6 7 protoc -I . \ -I $GOPATH /pkg/mod/github.com/envoyproxy/protoc-gen-validate@v0.9.1/validate/ \ --go_out=. \ --go-grpc_out=. \ --validate_out="lang=go:." \ --go-grpc_opt=require_unimplemented_servers=false \ hello.proto
server.go
在拦截器中验证数据,注意这里的类型转化。
传入的时 interface{}
类型,不能直接调用 Validate 函数,需要转化为具体的类型(这个类型实现了 Validate 方法)。
为了扩展更多的类型,拦截器中又不能转换为 HelloRequest(虽然也能跑,但是这个拦截器就只能验证 HelloRequest 这一种数据了),而应该转换为一个接口类型。
只要这个接口内包含 Validate 方法就可以,所以我们声明一个 Validator 类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package mainimport ( "context" "net" "strconv" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "learngo/validate/proto" )type server struct {}func (s *server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error ) { return &proto.HelloResponse{ Message: "NO." + strconv.FormatUint(request.Id, 10 ) + " email is " + request.Email, }, nil }func main () { option := grpc.UnaryInterceptor(interceptor) s := grpc.NewServer(option) proto.RegisterGreeterServer(s, &server{}) listener, err := net.Listen("tcp" , "0.0.0.0:8899" ) if err != nil { panic (err) } err = s.Serve(listener) if err != nil { panic (err) } }type Validator interface { Validate() error }func interceptor (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { if validator, ok := req.(Validator); ok { if err := validator.Validate(); err != nil { return nil , status.Error(codes.InvalidArgument, err.Error()) } } if err != nil { return nil , err } return handler(ctx, req) }
client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package mainimport ( "context" "fmt" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "learngo/validate/proto" )func main () { conn, err := grpc.Dial("127.0.0.1:8899" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic (err) } defer func (clientConn *grpc.ClientConn) { err := clientConn.Close() if err != nil { panic (err) } }(conn) client := proto.NewGreeterClient(conn) response, err := client.SayHello(context.Background(), &proto.HelloRequest{ Id: 1000 , Email: "1234@gmail.com" , Mobile: "130000000000" , }) if err != nil { panic (err) } fmt.Println(response.Message) }
3.6 状态码 grpc 官方定义了状态码:https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
3.7 错误处理 go 中的错误处理主要是 status 包内。Status 类型中封装了 Error 类型。status.Error()
内部就是 new 一个 status 类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package mainimport ( "fmt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" )func main () { err := status.Error(codes.NotFound, "未找到记录" ) panic (err) err = status.New(codes.NotFound, "未找到记录" ).Err() panic (err) err = status.Errorf(codes.NotFound, "未找到%s记录" , "Tom" ) panic (err) err = status.Newf(codes.NotFound, "未找到%s记录" , "Tom" ).Err() panic (err) if fromError, ok := status.FromError(err); ok { fmt.Println(fromError.Code()) fmt.Println(fromError.Message()) } }
3.8 超时机制 使用 context.WithTimeout(context.Background(), time.Second*3)
生成带有 deadline 功能的 contex,在远程调用时使用。
server.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "context" "net" "time" "google.golang.org/grpc" "learngo/deadline/proto" )type server struct {}func (*server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloResponse, error ) { return &proto.HelloResponse{ Reply: "hello, " + request.Name, }, nil }func main () { opt := grpc.UnaryInterceptor(func (ctx context.Context, req interface {}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface {}, err error ) { time.Sleep(time.Second * 5 ) return handler(ctx, req) }) s := grpc.NewServer(opt) proto.RegisterGreeterServer(s, &server{}) listen, err := net.Listen("tcp" , ":8899" ) if err != nil { panic (err) } err = s.Serve(listen) if err != nil { panic (err) } }
client.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "context" "fmt" "time" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc" "learngo/deadline/proto" )func main () { conn, err := grpc.Dial("127.0.0.1:8899" , grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { panic (err) } defer func (clientConn *grpc.ClientConn) { err := clientConn.Close() if err != nil { panic (err) } }(conn) client := proto.NewGreeterClient(conn) ctx, _ := context.WithTimeout(context.Background(), time.Second*3 ) response, err := client.SayHello(ctx, &proto.HelloRequest{Name: "Golang" }) if err != nil { panic (err) } fmt.Println(response.Reply) }
3.8 server/client stub 解析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 syntax = "proto3" ;option go_package=".;proto" ;service Greeter { rpc SayHello(HelloRequest) returns (HelloResponse) ; }message HelloRequest { string name = 1 ; }message HelloResponse { string reply = 1 ; }
以上面的 hello.proto 为例,protoc 生成的 go 源码是 hello_grpc.pb.go 文件。
(1)server stub
1 2 3 type GreeterServer interface { SayHello(context.Context, *HelloRequest) (*HelloResponse, error ) }
1 2 3 func RegisterGreeterServer(s grpc .ServiceRegistrar, srv GreeterServer) { s.RegisterService(&Greeter_ServiceDesc, srv ) }
(2)client stub
1 2 3 type GreeterClient interface { SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error ) }
1 2 3 4 5 6 7 8 9 10 11 12 type greeterClient struct { cc grpc.ClientConnInterface }func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error ) { out := new (HelloResponse) err := c.cc.Invoke(ctx, "/Greeter/SayHello" , in, out, opts...) if err != nil { return nil , err } return out, nil }
声明 greeterClient 结构体,里面包含着 grpc.ClientConnInterface 变量,也就是可以使用 grpc.ClientConnInterface 的方法。
并为 greeterClient 结构体实现了 SayHello 方法,所以 greeterClient 结构体实现了 GreeterClient 接口。
为什么 greeterClient 结构体里面要包含一个 grpc.ClientConnInterface 变量?
远程调用 server 端 SayHello 时,实际上调用的是 grpc.ClientConnInterface 的 Invoke 方法。 在 Invoke 方法的参数中,有 context、路径(call ID)、传入参数、穿出参数、远程调用的选项。 1 2 3 func NewGreeterClient (cc grpc.ClientConnInterface) GreeterClient { return &greeterClient{cc} }
Reference grpc API:https://pkg.go.dev/google.golang.org/grpc