从零实现系列|RPC

0.序言

为什么需要 RPC

最直观得是,客户端可以像调用本地程序一样,进行远程调用,使用者无需关注内部的实现细节。

另外一种广泛使用的调用方式是基于 HTTP 协议得 Restful API,让 gpt 总结一下。

对比维度Restful APIRPC
协议基于 HTTP 协议(如 HTTPS)通常使用自定义协议(如 TCP 或高效二进制协议)
通信方式基于 HTTP 的请求-响应模型(如 GET/POST)类似本地方法调用的请求-响应模型(对开发者透明)
报文格式文本格式(JSON/XML),冗余较多二进制编码(如 Protobuf),精简高效
性能较低(文本解析、冗余数据)较高(二进制压缩、高效序列化)
使用场景通用性强,适合跨语言、对外的开放接口高性能要求高,适合内部服务间通信
可扩展性扩展依赖网关等中间件,功能相对固定原生支持注册中心、负载均衡、超时处理等扩展功能
抽象模型面向资源的抽象,通过 URI 唯一标识资源,通过 HTTP 方法定义操作面向过程的抽象,客户端直接调用服务端的函数或方法

RPC 框架需要解决哪些问题

  • 通信协议:如何选择传输协议(TCP/HTTP/Unix Socket)?如何设计协议格式(如报文头、体结构)?

  • 编码方式:如何高效编码/解码数据?如何压缩报文?

  • 可用性问题

    • 超时处理:超时控制(如客户端连接超时、服务端处理超时)避免资源阻塞。
    • 请求管理:如何支持并发请求、异步请求?
  • 服务治理组件

    • 注册中心:服务动态注册与发现,健康检查(心跳机制)
    • 负载均衡:如何分配请求到多个服务实例?

以上种种,业务之外的公共能力,RPC 框架均需具备。

市面上的 RPC 框架 有哪些?

框架名称特点
net/rpcGo 标准库,轻量级,支持 TCP/HTTP 协议,默认使用 Gob 编码
gRPCGoogle 开源,基于 HTTP/2 和 Protobuf,跨语言,高性能,支持流式通信
rpcx高性能、支持多种编码(JSON/Protobuf),集成注册中心、负载均衡
go-micro微服务框架,包含 RPC 模块,支持插件化(注册中心、编码协议等)

本文如何从零实现 RPC 框架?

  1. 从零实现标准库 net/rpc
  2. 新增了协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性

1.服务端与消息编码

如何设计一个 RPC 请求

这个问题也可以问成:一个 RPC 请求,应该传入传出什么数据,选择使用什么数据结构?

举个例子:一个典型的 RPC 调用。

1
err = client.Call("Arith.Multiply", args, &reply)
  • Arith.Multiply:服务名 Arith 和方法名 Multipy
  • args:传入参数
  • reply:传出参数

我们将服务名、方法名放在请求的 header 中,传输传出参数放在 body 中。

其中 header 定义为:

1
2
3
4
5
6
// codec/codec.go
type Header struct {
ServiceMethod string // 服务名和方法名
Seq uint64 // 请求序号
Error string // 客户端置为空,服务端如何出现错误,将错误信息写入Error
}

用什么编解码方式进行编解码

编码方式可以选择 json、gob、protobuf 等等,为了更好的兼容性,我们抽象出来 Codec 的类型。

1
2
3
4
5
6
7
// Codec 接口:对消息体进行编码,比如 Gob、Json 会实现该接口,代表两种编码方式
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}

之后想使用哪个编码方式都可以,只要实现了 Codec 接口即可。

将各个编解码方式名称硬编码保存下来,并使用 map 保存各个编解码方式的构造函数。所以使用的流程是,通过编解码方式名称获取到该编解码方式的构造函数,调用该构造函数,创建编解码的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type Type string
const (
GobType Type = "application/gob"
JsonType Type = "application/json"
)

// NewCodecFunc Codec 接口类型的构造函数
type NewCodecFunc func(io.ReadWriteCloser) Codec

// NewCodecFuncMap 通过 type 来选择对应的 codec 的构造函数
var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec // 赋值为 Gob codec 的构造函数(NewGobCodec 为 Gob codec 的构造函数,在 codec/gob.go 实现。)
//NewCodecFuncMap[JsonType] = NewJsonCodec // 赋值为 Json codec 的构造函数, 暂不实现
}

如何实现编解码方式的接口

下面将以 Gob 为例,说明如何实现 Codec 接口。先定义 Gob 编解码的结构体 GobCodec 。

1
2
3
4
5
6
type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer // 使用缓冲流来提高性能,防止阻塞
dec *gob.Decoder
enc *gob.Encoder
}

实现 Codec 的接口函数,主要是封装 encoding/gob 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// ReadHeader 封装了 encoding/gob 的 Decode 方法,从连接中读取 Header 信息
func (c *GobCodec) ReadHeader(header *Header) error {
return c.dec.Decode(header)
}

// ReadBody 封装了 encoding/gob 的 Decode 方法,从连接中读取 Body 信息
func (c *GobCodec) ReadBody(body interface{}) error {
return c.dec.Decode(body)
}

// Write 封装了 encoding/gob 的 Encode 方法,将 Header 和 Body 信息写入连接中
func (c *GobCodec) Write(header *Header, body interface{}) (err error) {
defer func() { // 关闭前,先将 buf 中的数据写入连接中
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()

if err = c.enc.Encode(header); err != nil {
log.Panicln("rpc codec: gob error encoding header: ", err)
return err
}
if err = c.enc.Encode(body); err != nil {
log.Panicln("rpc codec: gob error encoding body: ", err)
return err
}
return nil
}

// Close 封装了 io.ReadWriteCloser 的 Close 方法,关闭连接
// io.ReadWriteCloser 包含 io.Closer 类型,io.Closer 类型包含 Close 方法
func (c *GobCodec) Close() error {
return c.conn.Close()
}

最后再实现 GobCodec 的构造函数,方便初始化时,穿给 Codec 接口变量。

1
2
3
4
5
6
7
8
9
func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}

P.S. 为什么 GobCodec 需要加一个 buf ?

用于缓冲写入操作,减少系统调用次数。如果没有 buf,每次小数据写入直接触发系统调用,增加延迟;频繁的 I/O 操作,影响性能。

通信过程如何协商编码方式

以 HTTP 报文为例,HTTP 报文分为 header 和 body 两个部分。客户端和服务端收发消息时,只需要先解析 header 部分,就知道 body 的格式 Content-Type、长度 Content-Length。

在 RPC 协议的报文里,为了提升性能,仅在报文最开始规划固定的字节,来协商编码方式。

1
2
3
4
5
// Option 定义 Option 结构体,封装了 MagicNumber 和 CodecType 字段,从 conn 中解析出 Option 的信息,表示 RPC 消息的编码方式
type Option struct {
MagicNumber int // 验证连接的合法性,确保客户端和服务端使用同一协议
CodecType codec.Type // 编码方式的名称
}

所以实际的报文是这样的。

1
| Option | Header1 | Body1 | Header2 | Body2 | ...

那 Option 使用什么编码方式呢?

为了方便,我们可以规定,Option 使用 JSON 编码,后面的 header、body 使用 Option 规定的编解码方式编解码。

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ Option 固定使用 JSON 编码 ------> | <------- Header/Body 编码方式由 CodeType 决定 ------->|

如何实现一个服务端

先看一下,服务端要做什么,要实现哪些功能。

  • 建立连接:实现 Accept 方法,等待 socket 连接,并开启协程处理请求。
  • 处理请求:
    • 解析 Option 信息,检查 Option.MagicNumber 是匹配,根据 Option.CodecType 实例化编解码器。
    • 读取请求:使用编解码器实例解码 header 和 body。
    • 处理请求。
    • 回复请求。

搞清楚需要实现哪些功能,就可以开始具体实现了。(可以先看主要的调用流程,具体实现实现细节晚点再看)

  • 服务器实例的构造函数。
1
2
3
4
5
6
7
8
const MagicNumber = 0x3bef5c

// Server 定义 Server 结构体,封装了 Accept、ServeConn、serveCodec 方法
type Server struct{} // 肚子里没什么要放的

func NewServer() *Server {
return &Server{}
}
  • Accept 处理连接:建立 socket 连接,使用 goroutine 处理连接
1
2
3
4
5
6
7
8
9
10
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept() // 建立 socket 连接
if err != nil {
log.Println("rpc server: accept error: ", err)
return
}
go server.ServeConn(conn) // 使用 goroutine 处理连接
}
}
  • ServeConn 处理消息:解析出 Option 信息,根据 CodecType 选择对应的 codec,调用 serveCodec 方法处理剩下的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() {
_ = conn.Close()
}()

var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil { // opt 是传出参数,读到 RPC 前面的 JSON 数据,这包含了 option 信息,表示 RPC 消息的编码方式
log.Println("rpc server: options error: ", err)
return
}
if opt.MagicNumber != MagicNumber { // 验证 MagicNumber,证明请求合法
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
f := codec.NewCodecFuncMap[opt.CodecType] // 获取编解码器的构造函数
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
server.serveCodec(f(conn))
}
  • serveCodec 处理请求:调用 readRequest 方法读取请求,调用 handleRequest 方法处理请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (server *Server) serveCodec(cc codec.Codec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)

for {
req, err := server.readRequest(cc)
if err != nil {
break
}

wg.Add(1)
go server.handleRequest(cc, req, sending, wg)
}
wg.Wait()
_ = cc.Close()
}
  • readRequest 读取请求:调用 readRequestHeader 方法读取请求头,调用 ReadBody 方法读取请求参数,返回 request 结构体
    • 这里定义了请求体的结构体,h 储存 header,argv 储存传入参数,replyValue 储存传出参数,也就是返回值。
    • argv 和 replyValue 为什么都是 reflect.Value 类型呢?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type request struct {
h *codec.Header
argv, replyValue reflect.Value
}

func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc) // 获取请求头
if err != nil {
return nil, err
}
req := request{
h: h,
}

req.argv = reflect.New(reflect.TypeOf(""))
if err = cc.ReadBody(req.argv.Interface()); err != nil {
log.Println("rpc server: read body error: ", err)
}

return &req, nil
}


// readRequestHeader 读取请求头,实际调用 编解码器 的 ReadHeader 函数即可。
func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
log.Println("rpc server: read header error: ", err)
}
return nil, err
}
return &h, nil
}

如何实现一个客户端

完整代码

2.高性能客户端

3.服务注册

这一章是干什么的?

服务端的主要工作:

  • 监听端口
  • 响应请求
    • 解析请求
    • 处理请求
      • 从请求中获取方法名、参数
      • 获取方法
      • 调用方法
    • 返回结果

这一章主要完成处理请求部分,其中,从请求中获取方法名、参数在上一章已完成。client 在发送请求时,方法名在请求头里 client.header.ServiceMethod,参数在请求体里 call.Args

所以我们重点关注

  • 获取方法
  • 调用方法

如何获取方法

服务端通过方法名获取实际的方法,需要两步:

  1. 在启动服务端服务时,我们可以将所有可调用的方法都加到一个 map 中。

  2. 当 client 调用 hello 时,我们从 map 中找到对应的方法皆可。

由于不同类型的结构体,可能有同名的结构体方法。我们的方法名,使用 结构体实例名.方法名 来作为方法名。比如上一章中的,结构体类型 Foo ,有 Sum 方法,客户端实例化 foo 对象后,远程调用 Client.Call 时,传入的就是 foo.Sum

如何方法注册到 map 中

首先,先说数据存储方式,简单来说是这样的,

flowchart LR
    A[Server.serviceMap] -->|map储存多个service| B[Service<br>一个 service 表示一个变量类型]
    B -->|service包含一个 map 变量| C[service.method]
    C -->|map 储存多个方法| D[methodType<br>一个 methodType 表示一个该变量类型的方法]
 

具体数据存储方式设计如下:

服务端保存着一个 serviceMap。使用 serviceMap 保存变量类型及其方法。key 是变量类型名,value 是 service 类型实例。

1
2
3
4
5
// server.go
// Server 定义 Server 结构体,封装了 Accept、ServeConn、serveCodec 方法
type Server struct {
serviceMap sync.Map
}

service 类型定义如下,一个 service 表示一个变量类型。其中,也包含一个 map,保存着这个变量的方法。key 是方法名,value 是 methodType 类型实例。

1
2
3
4
5
6
7
8
// service.go
// 一个 service 表示一个变量类型
type service struct {
name string // 变量类型名,用来打 log,例如字符串 "Foo"
typ reflect.Type // 变量类型,通过变量的类型,可以直接获取变量的方法,例如 Foo 类型,可获取到方法 Sum
rcvr reflect.Value // 变量的值,通过变量的值,可以调用变量的方法,例如 &foo,调用 Sum
method map[string]*methodType // map 储存变量的方法中所有可调用的方法
}

methodType 类型定义如下,一个 methodType 表示一个方法。

1
2
3
4
5
6
7
8
// service.go
// 一个 methodType 表示一个方法
type methodType struct {
method reflect.Method // 方法本身,用于调用方法,例如 Foo.Sum
ArgType reflect.Type // 参数的类型,用于判断参数是否正确
ReplyType reflect.Type // 返回的类型,用于判断返回值是否正确
numCalls uint64 // 方法调用次数,用于统计方法调用次数
}

最后,下面将以一个实际例子,演示如何将方法注册到 map 中。

graph LR
    A[startServer, 启动服务器] --> B[Server.Register , 为 Foo 类型注册服务]
    B --> C[newService, 为 Foo 类型创建 service 结构体]
    C --> D[获取 rcvr 值, 类型名和类型]
    C --> G[service.registerMethods, 获取方法列表]
    G --> K[检查方法的参数和返回值]
    K --> L[保存方法到 map 变量 service.method]

  1. 启动服务,将 Foo 类型注册到 rpc server 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// main.go

type Foo int // Foo 类型,实现了 Sum 方法
type Args struct{ Num1, Num2 int }
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}

func startServer(addr chan string) {

var foo Foo // 实例化 Foo 类型的对象
if err := geerpc.Register(&foo); err != nil { // 注册 Foo 类型的对象,注册的是 Foo 类型的对象,不是 Foo 类型的方法
log.Fatal("register error:", err)
}

l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
  1. 为 Foo 类型创建 service 对象,并保存到 server 的 map 中。
1
2
3
4
5
6
7
8
9
10
11
// server.go
var DefaultServer = NewServer()
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

func (server *Server) Register(rcvr interface{}) error {
s := newService(rcvr) // 为 rcvr 变量的类型创建 service 结构体
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup { // 调用 serviceMap.LoadOrStore 将 service 结构体保存到 map 中
return errors.New("rpc: service already defined: " + s.name)
}
return nil
}
  1. 创建 service 对象、获取对象的方法、检查方法参数和返回值,最后将方法保存到 map 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// service.go
// newService 通过发射获取 rcvr 变量的类型及其方法
func newService(rcvr interface{}) *service {
s := new(service)
s.rcvr = reflect.ValueOf(rcvr) // 获取 rcvr 变量的值,例如 &foo
s.name = reflect.Indirect(s.rcvr).Type().Name() // 获取 rcvr 变量的类型名,例如 "Foo"
s.typ = reflect.TypeOf(rcvr) // 获取 rcvr 变量的类型,例如 Foo
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods() // 获取 rcvr 变量的方法列表,例如 Foo.Sum,并将其保存到 service 的方法 map 中
return s
}

// registerMethods 获取 rcvr 变量的方法列表,例如 Foo.Sum,并将其保存到 service 的方法 map 中
func (s *service) registerMethods() {
s.method = make(map[string]*methodType)

for i := 0; i < s.typ.NumMethod(); i++ { // 遍历 rcvr 变量的方法列表
method := s.typ.Method(i) // 获取 rcvr 变量,第 i 个方法
mType := method.Type // 获取 rcvr 变量的方法的类型

// 检查方法的参数是否正确。rpc 的方法必须满足以下条件:
// 1. 方法有 3 个参数,第 1 个参数是 rcvr 变量(相当于 python 的 self,java 的 this),第 2 个参数是传入参数,第 3 个参数是传出参数,指针类型
// 2. 第 2 个参数和第 3 个参数都是导出的类型
// 3. 返回值只有一个,是 error 类型
// 例如这样 func (t *T) MethodName(argType T1, replyType *T2) error
if mType.NumIn() != 3 || mType.NumOut() != 1 { // 检查参数个数
continue
}
if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() { // 检测返回值个数和类型
continue
}
argType, replyType := mType.In(1), mType.In(2)
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) { // 检查参数类型是否是导出的类型或者内置类型
continue
}

// 检查完毕,将方法保存到 service 的方法 map 中
s.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
}
}

// isExportedOrBuiltinType 检查类型是否是导出的类型或者内置类型
func isExportedOrBuiltinType(t reflect.Type) bool {
return ast.IsExported(t.Name()) || t.PkgPath() == ""
}

如何从 map 获取方法

在 readRequest 时,从请求头中,获取方法名;根据方法名在对应的 service 的 map 中获取到方法、传入参数、传出参数。

graph LR
    A[Server.readRequest] -->|解析请求头| B[Server.readRequestHeader]
    B-->|返回请求头,包含方法名| A
    A -->|获取方法| C[Server.findService]
    A -->|获取方法传入参数,传出参数| D[GobCodec.ReadBody]
    E[request]
    C-->|方法保存到 request| E
    D-->|传入参数,传出参数保存到 request| E
  1. request 储存了一次远程调用请求的信息,方法、传入参数、传出参数指针。它的定义如下。
1
2
3
4
5
6
7
8
// server.go
// request 表示一次调用的所有信息
type request struct {
h *codec.Header // 请求头
svc *service // 请求对应的服务,使用 svc.call 调用对应的方法
mtype *methodType // 请求对应的方法,是 svc.call 的第一个参数
argv, replyv reflect.Value // 方法的传入参数和传出参数,是 svc.call 的第二个和第三个参数
}
  1. readRequest 获取方法名、方法、传入参数、传出参数,并储存在 request 实例中,为方法调用做准备。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// server.go
// readRequest 读取请求:调用 readRequestHeader 方法读取请求头,调用 ReadBody 方法读取请求参数,返回 request 结构体
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
// 读取请求头
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}

// 初始化请求结构体
req := &request{h: h}

// 根据请求头中的 ServiceMethod 字段找到对应的服务和方法类型
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
if err != nil {
return req, err
}

// 创建传入参数和传出参数的反射对象
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()

// 检查请求传入参数的类型是否为指针类型,如果不是,则使用 Addr() 方法将 req.argv 转换为指针类型
// 为什么?
// 因为如果传入值是值类型,传入后,是值拷贝,不会修改传入变量的原值,所以需要使用 Addr() 获取地址后传入。
argvi := req.argv.Interface() // 使用 interface() 方法将 req.argv 转换为 interface{} 类型,这样可以传入任意类型的参数
if req.argv.Type().Kind() != reflect.Ptr {
argvi = req.argv.Addr().Interface()
}

// ReadBody 方法会将请求参数解码到 argvi 中储存
if err = cc.ReadBody(argvi); err != nil {
log.Println("rpc server: read body err:", err)
return req, err
}
return req, nil
}

另外,需要注意一下,

  • 创建传入参数时,需区分值类型还是指针类型。
  • 创建传出参数时,需对 map、slice 特殊处理。(因为 reflect.New 初始化时,map、slice 都是 nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// service.go
// newArgv 创建一个参数变量
func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
// 参数可能是指针类型也可能是值类型
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem()) // 指针类型,则创建指针类型变量
} else {
argv = reflect.New(m.ArgType).Elem() // 值类型,则创建值类型变量
}
return argv
}

// newReplyv 创建一个传出参数变量
func (m *methodType) newReplyv() reflect.Value {
// 传出参数必须是指针类型
replyv := reflect.New(m.ReplyType.Elem())

// 为什么需要对 map 和 slice 特殊处理?
// reflect.New 创建的 map 和 slice 都是 nil,需要先初始化,再使用。
// 为什么 newArgv 方法不需要对 map 和 slice 特殊处理?
// 因为 argv 是用于接收调用方传入的参数,这些参数由调用方提供且已经初始化。
switch m.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))
}
return replyv
}

如何调用方法

在 handleRequest 时,通过 service.call 利用反射来调用函数。这里主要学习这个用法。

graph LR
    A[Server.serveCodec]-->|获取请求信息| B[Server.readRequest] 
    A -->|调用方法| C[Server.handleRequest]
    C --> D[request.svc.call, 也就是 Service.call]
    D -->|使用反射调用| E[methodType.method.Func.Call]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// server.go
// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用

if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// service.go
// call 调用 rcvr 变量的方法,例如 Foo.Sum
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
atomic.AddUint64(&m.numCalls, 1)
f := m.method.Func
// 用反射的方式调用方法
// 第一个参数是 rcvr 变量,例如 &foo,类似于 java 的 this,python 的 self
// 第 2 个参数是传入参数,第 3 个参数是传出参数,指针类型
returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})
if errInter := returnValues[0].Interface(); errInter != nil {
return errInter.(error)
}
return nil
}

更深入了解反射

请看这篇 https://www.aimtao.net/go#11-reflect

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
# 代码结构
version_3_service
├── client.go
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
├── server.go
├── service.go
└── service_test.go

4.超时处理

哪些地方需要超时处理

简单来看整个流程:

  • 客户端:

    1. 拨号连接 ✓

    2. 远程调用(发送数据)✓

    3. 等待服务端处理 ✓

    4. 接收返回的结果(接收数据)✓

  • 服务端:

    1. 端口监听

    2. 接收请求信息(接收数据)✓

    3. 调用方法处理请求信息(处理数据)✓

    4. 返回请求结果(发送数据)✓

以上打 ✓ 的步骤,均可能出现超时,主要为,

  • 客户端建立连接超时
  • 发送数据
    • 客户端/服务端写报文时超时
  • 处理数据
    • 服务端调用方法超时
  • 接收数据
    • 客户端等待服务端响应超时
    • 客户端/服务端读取报文超时

基于这个超时的情景,我们可以在以下三个地方设置超时处理机制。

客户端:

  • 建立连接
  • Client.call() 的整个过程(包含发送数据、等待处理、接收数据)

服务端:

  • Server.handleRequest() 的整个过程(包括处理数据、发送数据,接收数据先不管了)

建立连接的超时处理

启动一个 goroutine 来拨号建立连接,使用 select 设置超时器,阻塞等待拨号结果,如果在接收拨号结果之前,先收到了超时信号,则进行超时处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// client.go
type NewClientFunc func(conn net.Conn, opt *Option) (*Client, error)

func Dial(network, address string, opts ...*Option) (client *Client, err error) {
return dialTimeout(NewClient, network, address, opts...)
}

type dialResult struct {
client *Client
err error
}

func dialTimeout(f NewClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
// 默认使用 Gob 编码
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}

// 声明一个通道,用于传输拨号建立连接的结果
result := make(chan dialResult)

// 当发生错误时,保证 client 为 nil
defer func() {
if err != nil {
client = nil
}
}()

// 启动一个 goroutine 连接服务器,连接成功后,调用 f 创建 Client 实例,并将结果发送到 result 通道中
go func() {
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
result <- dialResult{client: nil, err: err}
return
}
client, err = f(conn, opt)
result <- dialResult{client: client, err: err}
}()

// 超时处理,阻塞等待,等待超时或收到结果
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: %v", opt.ConnectTimeout)
case result := <-result:
return result.client, result.err
}
}

超时的时长,设置在 Option struct 内,便于用户自定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// server.go

// Option 定义 Option 结构体,封装了 MagicNumber 和 CodecType 字段,从 conn 中解析出 Option 的信息,表示 RPC 消息的编码方式
type Option struct {
MagicNumber int
CodecType codec.Type
ConnectTimeout time.Duration // Client 建立连接的超时时间
HandleTimeout time.Duration // Client.Call() 整个过程的超时时间
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: time.Second * 10,
//HandleTimeout: time.Second * 10, // 默认为 0,不设置超时时间
}

Call 的超时处理

把选择权交给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// client.go

// Call 同步调用,阻塞等待响应结果
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))

// 等待响应结果/超时
select {
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
}

用户如何使用?

1
2
3
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)

handleRequest 超时处理

这里先不管读取数据了,仅对 handleRequest 做超时处理。handleRequest 其中又分为两个步骤,处理数据、发送数据,仅对处理数据做超时处理。

设置两个 channel,一个表示 req.svc.call 完成,一个表示 server.sendResponse 完成。当 req.svc.call 完成后,就不用管超时时间是否到达,继续让 server.sendResponse 发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// server.go

// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()

called := make(chan struct{}, 1) // 设置缓冲区为 1,防止在超时后,无人接收 channel 数据,导致 channel 发送时阻塞,导致 goroutine 泄漏
sent := make(chan struct{}, 1) // 设置缓冲区为 1,防止在超时后,无人接收 channel 数据,导致 channel 发送时阻塞,导致 goroutine 泄漏


go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用
called <- struct{}{} // 调用完成, 不管是否超时,继续发送数据
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()

if timeout == 0 { // 没有超时时间,直接等待
<-called
<-sent
return
}

// 有超时时间,使用 select 等待超时或调用完成
select {
case <-time.After(timeout):
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called: // 如果调用完成,则不管超时时间,等待 sent(仅对 req.svc.call 做超时处理)
<-sent
}
}

测试代码

测试客户端处理连接超时的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// client_test.go
func TestClient_dialTimeout(t *testing.T) {
t.Parallel() // 设置测试项并行执行

f := func(conn net.Conn, opt *Option) (client *Client, err error) {
_ = conn.Close()
time.Sleep(time.Second * 2)
return nil, nil // 模拟了一个没有错误的客户端连接
}

l, _ := net.Listen("tcp", ":0")

// 测试客户端连接有超时处理的情况
t.Run("connect timeout", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: time.Second})
_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")
})

// 测试客户端连接没有超时处理的情况
t.Run("0", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: 0})
_assert(err == nil, "0 means no limit")
})
}

测试客户端处理调用超时和服务端处理超时的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// server_test.go
type ServiceTemp int

// ServiceTemp 有一个方法 Timeout,该方法耗时2s
func (s ServiceTemp) Timeout(args int, reply *int) error {
time.Sleep(time.Second * time.Duration(args))
*reply = 0
return nil
}

func TestClient_Call(t *testing.T) {
t.Parallel()

addrCh := make(chan string)
go func(chan string) { // 启动一个服务器,监听 0 端口,注册 ServiceTemp 类型的对象,然后启动 Accept 方法,等待客户端连接
var s ServiceTemp
_ = Register(&s)
l, _ := net.Listen("tcp", ":0")
addrCh <- l.Addr().String()
Accept(l)
}(addrCh)
addr := <-addrCh

// 测试客户端处理调用超时的情况
t.Run("client call timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr)

ctx, _ := context.WithTimeout(context.Background(), time.Second) // 创建一个超时的 context,如果 1s 内没有返回结果,context.Done() 会传出信号 struct{}{}
var reply int
err := client.Call(ctx, "ServiceTemp.Timeout", 20, &reply) // 调用 ServiceTemp.Timeout 方法,Call 发生超时
_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")
})

// 测试服务端处理超时的情况
t.Run("server handle timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr, &Option{HandleTimeout: time.Second})
var reply int
err := client.Call(context.Background(), "ServiceTemp.Timeout", 20, &reply) // 调用 ServiceTemp.Timeout 方法,服务端处理超时
_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")
})
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
# 代码结构
version_4_timeout
├── client.go
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
├── server.go
├── service.go
└── service_test.go

5.支持 HTTP 协议

为什么需要支持 HTTP 协议

当前我们 RPC 框架是基于 TCP 协议的。

1
2
// main.go
l, _ := net.Listen("tcp", ":9999") // 使用 TCP 协议

为什么需要支持 HTTP 协议?

  • 兼容性:很多应用是基于 HTTP 协议的,RPC 框架支持 HTTP,便于兼容。
  • 方便调试:通过 HTTP 提供调试界面,可以实时监控RPC服务的状态和调用统计,方便开发和运维。
  • 安全性:HTTP 可以和 TLS/SSL 结合,提供安全的通信方式。

如何支持 HTTP 协议

基于通信流程梳理需要做哪些事情:

  1. 客户端向 RPC 服务器,发送 HTTP CONNECT 方法请求
  2. 服务端响应 HTTP CONNECT 方法请求,表示建立连接
  3. 客户端使用建立的连接,发送 RPC 报文(先发送 option,再发送 RPC 报文)
  4. 服务端处理 RPC 请求,并响应。

客户端发送请求

服务端响应请求

在哪里监听?


从零实现系列|RPC
https://www.aimtao.net/7days-rpc/
Posted on
2024-10-18
Licensed under