从零实现系列|RPC

0.序言

为什么需要 RPC

最直观得是,客户端可以像调用本地程序一样,进行远程调用,使用者无需关注内部的实现细节。

另外一种广泛使用的调用方式是基于 HTTP 协议得 Restful API,让 gpt 总结一下。

对比维度Restful APIRPC
协议基于 HTTP 协议(如 HTTPS)通常使用自定义协议(如 TCP 或高效二进制协议)
通信方式基于 HTTP 的请求-响应模型(如 GET/POST)类似本地方法调用的请求-响应模型(对开发者透明)
报文格式文本格式(JSON/XML),冗余较多二进制编码(如 Protobuf),精简高效
性能较低(文本解析、冗余数据)较高(二进制压缩、高效序列化)
使用场景通用性强,适合跨语言、对外的开放接口高性能要求高,适合内部服务间通信
可扩展性扩展依赖网关等中间件,功能相对固定原生支持注册中心、负载均衡、超时处理等扩展功能
抽象模型面向资源的抽象,通过 URI 唯一标识资源,通过 HTTP 方法定义操作面向过程的抽象,客户端直接调用服务端的函数或方法

RPC 框架需要解决哪些问题

  • 通信协议:如何选择传输协议(TCP/HTTP/Unix Socket)?如何设计协议格式(如报文头、体结构)?

  • 编码方式:如何高效编码/解码数据?如何压缩报文?

  • 可用性问题

    • 超时处理:超时控制(如客户端连接超时、服务端处理超时)避免资源阻塞。
    • 请求管理:如何支持并发请求、异步请求?
  • 服务治理组件

    • 注册中心:服务动态注册与发现,健康检查(心跳机制)
    • 负载均衡:如何分配请求到多个服务实例?

以上种种,业务之外的公共能力,RPC 框架均需具备。

市面上的 RPC 框架 有哪些

框架名称特点
net/rpcGo 标准库,轻量级,支持 TCP/HTTP 协议,默认使用 Gob 编码
gRPCGoogle 开源,基于 HTTP/2 和 Protobuf,跨语言,高性能,支持流式通信
rpcx高性能、支持多种编码(JSON/Protobuf),集成注册中心、负载均衡
go-micro微服务框架,包含 RPC 模块,支持插件化(注册中心、编码协议等)

本文如何从零实现 RPC 框架

  1. 从零实现标准库 net/rpc
  2. 新增了协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性

1.服务端与消息编码

如何设计一个 RPC 请求

这个问题也可以问成:一个 RPC 请求,应该传入传出什么数据,选择使用什么数据结构?

举个例子:一个典型的 RPC 调用。

1
err = client.Call("Arith.Multiply", args, &reply)
  • Arith.Multiply:服务名 Arith 和方法名 Multipy
  • args:传入参数
  • reply:传出参数

我们将服务名、方法名放在请求的 header 中,传输传出参数放在 body 中。

其中 header 定义为:

1
2
3
4
5
6
7
// codec/codec.go

type Header struct {
ServiceMethod string // 服务名和方法名
Seq uint64 // 请求序号
Error string // 客户端置为空,服务端如何出现错误,将错误信息写入Error
}

用什么编解码方式进行编解码

编码方式可以选择 json、gob、protobuf 等等,为了更好的兼容性,我们抽象出来 Codec 的类型。

1
2
3
4
5
6
7
8
9
// codec/codec.go

// Codec 接口:对消息体进行编码,比如 Gob、Json 会实现该接口,代表两种编码方式
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}

之后想使用哪个编码方式都可以,只要实现了 Codec 接口即可。

将各个编解码方式名称硬编码保存下来,并使用 map 保存各个编解码方式的构造函数。所以使用的流程是,通过编解码方式名称获取到该编解码方式的构造函数,调用该构造函数,创建编解码的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// codec/codec.go

type Type string
const (
GobType Type = "application/gob"
JsonType Type = "application/json"
)

// NewCodecFunc Codec 接口类型的构造函数
type NewCodecFunc func(io.ReadWriteCloser) Codec

// NewCodecFuncMap 通过 type 来选择对应的 codec 的构造函数
var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec // 赋值为 Gob codec 的构造函数(NewGobCodec 为 Gob codec 的构造函数,在 codec/gob.go 实现。)
//NewCodecFuncMap[JsonType] = NewJsonCodec // 赋值为 Json codec 的构造函数, 暂不实现
}

如何实现编解码方式的接口

下面将以 Gob 为例,说明如何实现 Codec 接口。先定义 Gob 编解码的结构体 GobCodec 。

1
2
3
4
5
6
7
8
// codec/gob.go

type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer // 使用缓冲流来提高性能,防止阻塞
dec *gob.Decoder
enc *gob.Encoder
}

实现 Codec 的接口函数,主要是封装 encoding/gob 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// codec/gob.go

// ReadHeader 封装了 encoding/gob 的 Decode 方法,从连接中读取 Header 信息
func (c *GobCodec) ReadHeader(header *Header) error {
return c.dec.Decode(header)
}

// ReadBody 封装了 encoding/gob 的 Decode 方法,从连接中读取 Body 信息
func (c *GobCodec) ReadBody(body interface{}) error {
return c.dec.Decode(body)
}

// Write 封装了 encoding/gob 的 Encode 方法,将 Header 和 Body 信息写入连接中
func (c *GobCodec) Write(header *Header, body interface{}) (err error) {
defer func() { // 关闭前,先将 buf 中的数据写入连接中
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()

if err = c.enc.Encode(header); err != nil {
log.Panicln("rpc codec: gob error encoding header: ", err)
return err
}
if err = c.enc.Encode(body); err != nil {
log.Panicln("rpc codec: gob error encoding body: ", err)
return err
}
return nil
}

// Close 封装了 io.ReadWriteCloser 的 Close 方法,关闭连接
// io.ReadWriteCloser 包含 io.Closer 类型,io.Closer 类型包含 Close 方法
func (c *GobCodec) Close() error {
return c.conn.Close()
}

最后再实现 GobCodec 的构造函数,方便初始化时,传给 Codec 接口变量。

1
2
3
4
5
6
7
8
9
10
11
// codec/gob.go

func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}

P.S. 为什么 GobCodec 需要加一个 buf ?

用于缓冲写入操作,减少系统调用次数。如果没有 buf,每次小数据写入直接触发系统调用,增加延迟;频繁的 I/O 操作,影响性能。

通信过程如何协商编码方式

以 HTTP 报文为例,HTTP 报文分为 header 和 body 两个部分。客户端和服务端收发消息时,只需要先解析 header 部分,就知道 body 的格式 Content-Type、长度 Content-Length。

在 RPC 协议的报文里,为了提升性能,仅在报文最开始规划固定的字节,来协商编码方式。

1
2
3
4
5
6
7
// server.go

// Option 定义 Option 结构体,封装了 MagicNumber 和 CodecType 字段,从 conn 中解析出 Option 的信息,表示 RPC 消息的编码方式
type Option struct {
MagicNumber int // 验证连接的合法性,确保客户端和服务端使用同一协议
CodecType codec.Type // 编码方式的名称
}

所以实际的报文是这样的。

1
| Option | Header1 | Body1 | Header2 | Body2 | ...

那 Option 使用什么编码方式呢?

为了方便,我们可以规定,Option 使用 JSON 编码,后面的 header、body 使用 Option 规定的编解码方式编解码。

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ Option 固定使用 JSON 编码 ------> | <------- Header/Body 编码方式由 CodeType 决定 ------->|

如何实现一个服务端

先看一下,服务端要做什么,要实现哪些功能。

  • 建立连接:实现 Accept 方法,等待 socket 连接,并开启协程处理请求。
  • 处理请求:
    • 解析 Option 信息,检查 Option.MagicNumber 是匹配,根据 Option.CodecType 实例化编解码器。
    • 读取请求:使用编解码器实例解码 header 和 body。
    • 处理请求。
    • 回复请求。

搞清楚需要实现哪些功能,就可以开始具体实现了。(可以先看主要的调用流程,具体实现实现细节晚点再看)

  • 服务器实例的构造函数。
1
2
3
4
5
6
7
8
9
10
// server.go

const MagicNumber = 0x3bef5c

// Server 定义 Server 结构体,封装了 Accept、ServeConn、serveCodec 方法
type Server struct{} // 肚子里没什么要放的

func NewServer() *Server {
return &Server{}
}
  • Accept 处理连接:建立 socket 连接,使用 goroutine 处理连接
1
2
3
4
5
6
7
8
9
10
11
12
// server.go

func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept() // 建立 socket 连接
if err != nil {
log.Println("rpc server: accept error: ", err)
return
}
go server.ServeConn(conn) // 使用 goroutine 处理连接
}
}
  • ServeConn 处理消息:解析出 Option 信息,根据 CodecType 选择对应的 codec,调用 serveCodec 方法处理剩下的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// server.go

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() {
_ = conn.Close()
}()

var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil { // opt 是传出参数,读到 RPC 前面的 JSON 数据,这包含了 option 信息,表示 RPC 消息的编码方式
log.Println("rpc server: options error: ", err)
return
}
if opt.MagicNumber != MagicNumber { // 验证 MagicNumber,证明请求合法
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
f := codec.NewCodecFuncMap[opt.CodecType] // 获取编解码器的构造函数
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
server.serveCodec(f(conn))
}
  • serveCodec 处理请求:调用 readRequest 方法读取请求,调用 handleRequest 方法处理请求。
    • 使用协程来并发处理请求。
    • 使用 sync.WaitGroup 保证即使因为读请求出错退出循环,也等待所有协程请求处理完成再关闭连接,优雅终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// serveCodec 处理请求:调用 readRequest 方法读取请求,调用 handleRequest 方法处理请求
func (server *Server) serveCodec(cc codec.Codec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)

for {
req, err := server.readRequest(cc)
if err != nil {
break
}

wg.Add(1)
go server.handleRequest(cc, req, sending, wg) // 使用协程来并发处理请求。
}
wg.Wait() // 即使因为读请求出错退出循环,也需要等待所有协程请求处理完成再关闭连接,优雅终止。
_ = cc.Close()
}
  • readRequest 读取请求:调用 readRequestHeader 方法读取请求头,再调用 ReadBody 方法读取请求参数,返回 request 结构体

    • 这里定义了请求体的结构体 request,其中,h 储存 header,argv 储存传入参数,replyValue 储存传出参数,也就是返回值。
    • readRequestHeader 就是进一步调用 编解码器 的 ReadHeader 方法,这里的编解码器是 gob。gob 的 ReadHeader 方法是封装的 encoding/gob 的 Decode 方法。ReadBody 同理。
  • 什么是 reflect.Value 类型?

    • 两个前置知识:具体看 reflect
      • 每个 interface{} 类型的变量都包含一对值 (type,value),type 表示变量的类型信息,value 表示变量的值信息。
      • 反射就是把 interface{} 类型变量转化为 reflect.Value 或 reflect.Type 类型变量,随后用 reflect 包中的方法对它们进行各种操作
    • reflect.Value 是反射值,是描述值的一个容器。
  • argv 和 replyValue 为什么都是 reflect.Value 类型呢?

    • 因为对于服务端来说,他并不知道客户端会调用哪个具体的方法、要传入什么类型的参数、要传出什么类型的参数。
    • 所以需要在运行时,动态地获取参数类型和返回值类型,并取指和赋值。
  • req.argv = reflect.New(reflect.TypeOf("")) 的含义?

    • reflect.TypeOf("") 获取空字符串的类型信息,得到 reflect.Type 类型变量,描述 string 类型。
    • reflect.New(t) 创建一个该类型 t 的指针值,返回一个 reflect.Value 对象,实际内部是一个指针类型,指针类型为 *t,这里也就是描述 *string 类型的容器。(相当于 new(string)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 举个例子
    v := reflect.New(reflect.TypeOf("")) // v 是 reflect.Value, 类型是 *string
    fmt.Println(v.Type()) // 输出 *string
    fmt.Println(v.Elem().Type()) // 输出 string
    v.Elem().SetString("hello") // 设置值
    fmt.Println(v.Elem().Interface()) // 输出 hello

    // 等价于:
    var s string // 声明字符串变量
    ptr := &s // 获取指针
    v = reflect.ValueOf(ptr) // 包装为 reflect.Value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// server.go

type request struct {
h *codec.Header // 读到的请求头
argv, replyValue reflect.Value // argv 储存传入参数,replyValue 储存传出参数。reflect.Value 是反射值,是描述值的一个容器。
}

// readRequest 读取请求:调用 readRequestHeader 方法读取请求头,调用 ReadBody 方法读取请求参数,返回 request 结构体
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := request{
h: h,
}

req.argv = reflect.New(reflect.TypeOf("")) // 初始化传入参数,使用 New 方法创建一个表示 *string 类型值的反射值。reflect.Value 是描述值的一个容器。
if err = cc.ReadBody(req.argv.Interface()); err != nil { // 读取请求参数,ReadBody 函数的参数类型是 interface{},需要使用 Interface() 方法,将 reflect.Value 转化为 interface{} 类型
log.Println("rpc server: read body error: ", err)
}

return &req, nil
}

func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
log.Println("rpc server: read header error: ", err)
}
return nil, err
}
return &h, nil
}
  • handleRequest 处理请求:这部分目前处理的比较简单,主要是根据请求,构造请求的响应信息,并通过 sendResponse 发送给请求方。
    • 为什么要加锁?处理请求可以并发的读,但是发送数据不能并发写入链接,否则多个回复报文交织在一起,影响客户端解析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// server.go

// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

log.Println("handleRequest: ", req.h, req.argv.Elem())
req.replyValue = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq)) // 构造请求的响应信息
server.sendResponse(cc, req.h, req.replyValue.Interface(), sending) // 发送响应
}

func (server *Server) sendResponse(cc codec.Codec, header *codec.Header, body interface{}, sending *sync.Mutex) {
sending.Lock() // 加锁,防止并发写
defer sending.Unlock()
if err := cc.Write(header, body); err != nil {
log.Println("rpc server: write response error: ", err)
}
}

如何实现一个客户端

这里在 main 函数中实现一个简易的客户端,验证一下 server 的功能。

客户端主要做的事是:

  • 连接到服务器
  • 构造 header 和 body,发送请求
  • 接收请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// main/main.go

func main() {
addr := make(chan string)
// 启动服务端
go startServer(addr)

// 以下是客户端的逻辑
addrString := <-addr
fmt.Println(addrString)
conn, _ := net.Dial("tcp", addrString) // 建立到服务端的连接
defer func() {
_ = conn.Close()
}()

time.Sleep(time.Second)

_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption) // 解析 conn 的前 8 个字节,解析 Option 信息,储存到 DefaultOption JSON 中
cc := codec.NewCodecFuncMap[geerpc.DefaultOption.CodecType](conn) // 根据 DefaultOption 中的 CodecType 选择对应的 codec

// 发送 5 个请求并接收响应
for i := 0; i < 5; i++ {
h := &codec.Header{ // 构造请求头
ServiceMethod: "Foo.Sum",
Seq: uint64(i),
}

// 发送 header 和 body,其中 body 是一个字符串,格式为 "geerpc req %d"
_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))

var replyHeader codec.Header
_ = cc.ReadHeader(&replyHeader) // 读取响应头
var reply string
_ = cc.ReadBody(&reply) // 读取响应体
log.Println("main: reply: ", replyHeader, reply)
}
}

func startServer(addr chan string) {
// 监听随机可以用的端口
listen, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error", err)
}
log.Println("startServer: start rpc server on", listen.Addr())

// 将监听地址返回给主协程
addr <- listen.Addr().String()
geerpc.Accept(listen) // 开始处理连接
}

/* output
[::]:43017
handleRequest: &{Foo.Sum 0 } geerpc req 0
main: reply: {Foo.Sum 0 } geerpc resp 0
handleRequest: &{Foo.Sum 1 } geerpc req 1
main: reply: {Foo.Sum 1 } geerpc resp 1
handleRequest: &{Foo.Sum 2 } geerpc req 2
main: reply: {Foo.Sum 2 } geerpc resp 2
handleRequest: &{Foo.Sum 3 } geerpc req 3
main: reply: {Foo.Sum 3 } geerpc resp 3
handleRequest: &{Foo.Sum 4 } geerpc req 4
main: reply: {Foo.Sum 4 } geerpc resp 4
*/

完整代码

1
2
3
4
5
6
7
8
9
# 代码结构
version_1_codec
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
└── server.go

2.高性能客户端

虽然这章节叫高性能客户端,实际上具体来说,应该是 client stub,为真正的 client (具体的业务代码)提供基础设施。

客户端需要提供哪些能力

首先我们期望用户可以怎么用我们这个框架。

1
2
3
4
5
6
// 初始化客户端,连接到服务器
client, err := geerpc.Dial("tcp", "127.0.0.1:8080")

// 发送请求
var args, reply string
err = client.Call("methodName", args, &reply)

所以肯定需要提供 Dial、Call 这两个方法给用户使用。但在这背后的能力,应该有这些:

  • 连接管理的能力:

    • 网络连接建立:通过 Dial 方法连接到服务器。
    • 协议协商:在建立连接后,完成协议交换(通过发送 option 协商)。
    • 连接状态管理:当用户主动关闭连接或者程序出现异常时,关闭连接并释放资源。
  • 请求管理的能力:

    • 请求封装:将单次的请求信息封装在一个结构体中。
    • 唯一标识:为每个请求生成一个唯一的序例号。
  • 通信能力:

    • 请求发送:
      • 序例化 header 和 body。
      • 发送请求(通过锁来保证原子性)。
      • 并发的处理多个请求。
    • 响应接收:
      • 持续接收响应。
      • 异常处理:服务出错、请求不存在(已移除)。

虽然这些能力看着很多,但大多是细节。接下来,我们自顶向下地实现 Dial、Call 这两个核心方法。

如何实现 Dial 方法

从主流程来看,Dial 有三件事:

  • 确定编码方式
  • 连接服务器
  • 返回一个 Client 实例,用户使用这个 Client 实例进行远程调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// client.go

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*geerpc.Option) (client *Client, err error) {

// 默认使用 Gob 编码
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}

// 真正拨号建立连接
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
defer func() {
if client == nil {
_ = conn.Close()
}
}()

// 返回 Client 实例
return NewClient(conn, opt)
}

确定编码方式:根据用户传入的 Option 来实例化 Codec,如果没有传入,则默认使用 gob 编码,所以这里是可选参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// client.go

func parseOptions(opts ...*geerpc.Option) (*geerpc.Option, error) {
// if opts is nil or pass nil as parameter
if len(opts) == 0 || opts[0] == nil {
return geerpc.DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = geerpc.DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = geerpc.DefaultOption.CodecType
}
return opt, nil
}

连接服务器:使用 net 包得 Dial 方法,获得一个 net.Conn 接口类型,表示面向流的网络连接。 GobCodec 会对 conn 进一步封装,向外提供读/写/关闭的能力。

返回 client 实例:这里是 Dail 的重中之重,很多特性都需要通过 client 的设计来实现。

如何抽象一个请求

用户拿着 Client 实例,可以和服务端进行多次请求的发送和接收,也就是 Client 需要管理多个请求。所以在设计 Client 之前,需要将请求抽象出来。

  • Args、Reply:请求的参数是 interface{} 类型,因为 codec 读写方法参数都是 interface{}。
  • Done:是一个 channel,请求结束时,调用 done 方法,给 Done chanel 写值,Client 拿着这个 channel 就可以知道请求结束了。(正常结束或者异常结束)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// client.go

// Call 实例表示一次 RPC 调用请求
type Call struct {
Seq uint64 // 请求的序号
ServiceMethod string // 请求的方法名
Args interface{} // 请求的参数
Reply interface{} // 请求的响应信息
Error error
Done chan *Call // 当调用结束后,会通过 Done 通知调用者 // 这个写法有意思,channel 的类型是 *Call
}

func (call *Call) done() {
call.Done <- call
}

如何设计一个 Client

(1)基础的收发消息

1
2
cc       codec.Codec      // 消息编解码器,用于序列化请求和反序列化响应
opt *geerpc.Option // 客户端配置,比如编码方式和协议参数。

(2)发送请求提前准备的信息:多个请求共用一个 header,放在 Client 里,避免每次构造。

1
header   codec.Header     // 每个请求,共用这个同一消息头

(3)处理并发请求的工具:因为请求时并发的,会存在两个问题:不知道什么时候收到响应?不知道收到的响应是哪个请求的?

  • seq:Client 发的每个请求和接收的每个响应的 header 中,都有一个序号,保证请求的顺序和唯一性。
  • pending:未处理完的请求,比如还未发送的请求、已经发送还未回复的请求。
    • 对于未处理完的请求,调用者一直在阻塞,等待响应。当我收到响应后,可以根据序号,拿到该请求的 channel,告知调用者已经收到结果,并将响应结果储存在对应 call 的 reply 变量中。
    • 如果没有这个 map,一无法通知,二也不知道通知谁,结果储存给谁。
1
2
seq      uint64           // 用于给每个请求分配一个编号,用于区分不同的请求。(每个请求间没有顺序要求)
pending map[uint64]*Call // 每个请求对应一个 Call 实例。未处理完的请求会被保存在该字段中

(4)保证并发读写的锁

  • sending:保证每次发送给服务器,只有一个请求在发,否则服务端交叉收到不好解析。
  • mu:保护 seq、pending 、shutdown、closing 字段,防止并发读写。
1
2
sending  sync.Mutex       // 互斥锁,用于确保在同一时间只有一个请求被发送
mu sync.Mutex // 互斥锁,保护 seq、pending、shutdown、closing 字段,防止并发读写

(5)异常处理的标志位:为什么要设计两个字段?

  • closing:表示用户正在主动关闭 Client,避免多次关闭客户端,还可以做优雅退出。
  • shutdown:表示因为异常错误,Client 无法发送接收了,快速失败,对于没处理完的请求返回错误,避免调用者长时间阻塞等待响应。
1
2
closing  bool             // 是否正在关闭连接
shutdown bool // 客户端是否已经关闭

整体如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client.go

// Client 表示一个 RPC 客户端,一个客户端可以完成多个请求(Call 实例)的发送和接收
// 管理连接、请求和响应,可同时被多个协程并发使用
// 提供 Dial 方法,用于建立连接;提供 Call 方法,用于发送请求并等待响应结果
type Client struct {
cc codec.Codec // 消息编解码器,用于序列化请求和反序列化响应
opt *geerpc.Option // 客户端配置,比如编码方式和协议参数。
sending sync.Mutex // 互斥锁,用于确保在同一时间只有一个请求被发送
header codec.Header // 每个请求,共用这个同一消息头
mu sync.Mutex // 互斥锁,保护 seq、pending 、shutdown字段,防止并发读写
seq uint64 // 用于给每个请求分配一个编号,用于区分不同的请求。(每个请求间没有顺序要求)
pending map[uint64]*Call // 每个请求对应一个 Call 实例。未处理完的请求会被保存在该字段中
closing bool // 是否正在关闭连接
shutdown bool // 客户端是否已经关闭
}

基于上面这些,我们要提供几个基础的方法。

(1)创建客户端

  • 确定编码方式
  • 根据编码方式实例化 Codec
  • 根据 Codec 实例化 Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// client.go

func NewClient(conn net.Conn, opt *geerpc.Option) (*Client, error) {

// 用 JSON 数据通知服务器,客户端的编码方式
// json.NewEncoder(conn) 创建一个 JSON Encoder 对象,Encode 方法将 opt 编码为 JSON 数据, JSON Encoder 对象将 Json 数据写入到 conn 中,也就是发给服务器
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}

newCodecFunc := codec.NewCodecFuncMap[opt.CodecType]
if newCodecFunc == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
return newClientCodec(newCodecFunc(conn), opt), nil
}

func newClientCodec(cc codec.Codec, opt *geerpc.Option) *Client {
client := &Client{
seq: 1, // seq starts with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive() // 启动协程持续接收响应
return client
}

(2)启动协程持续接收响应

  • 接收请求,先读取 header,其中有序号。
  • 请求已经收到响应,表示已经处理完了,该序号的请求就可以在 pending 中移出,并拿到该请求的实例 call。
  • 读取 body,并将响应储存在 call.Reply 中。(在此之前要处理两种错误)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// client.go

func (client *Client) receive() {
var err error
for err == nil { // 这个写法,会在 err 不为 nil 时退出循环,所以只会处理一次错误
// 读取请求头
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}

// 根据 h.Seq 找到对应的 Call 实例,并从 pending 中移除。
call := client.removeCall(h.Seq)

// 三种处理响应的情况
switch {
case call == nil: // Call 实例不存在,(可能客户端已经取消请求,但服务器还是在响应请求),忽略该请求
err = client.cc.ReadBody(nil)
case h.Error != "": // Call 实例存在,但服务器返回了错误
// 将错误信息写入 call.Error 中,调用 call.done() 通知调用方
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default: // Call 实例存在,服务器正常响应
// 读取响应体,将响应信息写入 call.Reply 中,调用 call.done() 通知调用方
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
//fmt.Println(call.Reply)
}
}
// 如果发生错误(如连接断开),调用 terminateCalls 方法: 将所有未完成的调用(pending 中的所有调用)标记为错误状态。通知所有调用方,释放资源。
client.terminateCalls(err)
}

(3)异常处理,快速失败

如果发生错误(如连接断开),调用 terminateCalls 方法: 将所有未完成的调用(pending 中的所有调用)标记为错误状态。通知所有调用方,释放资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client.go

// terminateCalls 方法用于在客户端关闭时,终止所有未完成的调用,并通知调用者发生了错误
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()

client.mu.Lock()
defer client.mu.Unlock()

client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}

(4)优雅退出

提供 Close 方法,给用户主动关闭 Client。

1
2
3
4
5
6
7
8
9
10
11
12
// client.go

func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()

if client.closing {
return ErrShutdown
}
client.closing = true
return client.cc.Close()
}

(5)可用性

暴露给用户。

1
2
3
4
5
6
7
// client.go

func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

(6)注册/移出请求

主要是将请求保存在 pending 中管理,根据 seq 快速拿到请求实例 call。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// client.go

// registerCall 方法用于注册一个 Call 实例,并返回该实例的序号。
func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()

if client.closing || client.shutdown {
return 0, ErrShutdown
}

call.Seq = client.seq
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}

// removeCall 方法用于从 pending 中移除一个 Call 实例,表示该请求已处理完成或已取消,并返回该实例。
func (client *Client) removeCall(seq uint64) *Call {
client.mu.Lock()
defer client.mu.Unlock()

call := client.pending[seq]
delete(client.pending, seq)
return call
}

如何设计异步/同步调用方法

基础设施已经有了,万事俱备,就差发送请求了。

**异步调用方法:**流程很明确,主要有三步。

  • 构造 Call 请求实例
  • registerCall 注册请求
  • Codec.Write 发送请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// client.go

// Go 异步调用,不阻塞等待响应结果
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}

// 为本次调用请求创建一个 Call 实例
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}

// 将 Call 实例发送到客户端
client.send(call)
return call
}

// send 发送请求到服务器
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()

// register this call.
seq, err := client.registerCall(call)
if err != nil {
call.Error = err
call.done()
return
}

// prepare request header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""

// encode and send the request
if err := client.cc.Write(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
// call 可能为 nil
// 比如由于网络或者某种错误,客户端在 receive() 中已经将该请求从 pending 中移除,此时 call 为 nil
if call != nil {
call.Error = err
call.done()
}
}
}

这个调用的方式是异步的,调完就完了,也不需要等待结果,因为有 receive 协程专门在接收数据。

举个例子,用户可以批量的调用,再统一的等待结果。

1
2
3
4
5
done1 := client.Go("Arith.Add", args1, &reply1, make(chan *Call, 1))
done2 := client.Go("Arith.Mul", args2, &reply2, make(chan *Call, 1))

call1 := <-done1
call2 := <-done2

也可以不关心返回的结果。(比如打日志、异步上报)

1
client.Go("LogService.Log", args, nil, nil) // 不需要 reply

同步调用方法

用户想调用完,等待结果再继续。我们可以在 Client.Go 方法的基础上,简单封装一个同步调用的方法。

1
2
3
4
5
6
7
// client.go

// Call 同步调用,阻塞等待响应结果
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done // 读 Done 的数据,阻塞等待 receive() 处理完响应结果,对 Done 写值
return call.Error
}

完整代码

1
2
3
4
5
6
7
8
9
10
# 代码结构
version_2_client
├── client.go
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
└── server.go

3.服务注册

这一章是干什么的

服务端的主要工作:

  • 监听端口
  • 响应请求
    • 解析请求
    • 处理请求
      • 从请求中获取方法名、参数
      • 获取方法
      • 调用方法
    • 返回结果

这一章主要完成处理请求部分,其中,从请求中获取方法名、参数在上一章已完成。client 在发送请求时,方法名在请求头里 client.header.ServiceMethod,参数在请求体里 call.Args

所以我们重点关注

  • 获取方法
  • 调用方法

如何获取方法

服务端通过方法名获取实际的方法,需要两步:

  1. 在启动服务端服务时,我们可以将所有可调用的方法都加到一个 map 中。

  2. 当 client 调用 hello 时,我们从 map 中找到对应的方法皆可。

由于不同类型的结构体,可能有同名的结构体方法。我们的方法名,使用 结构体实例名.方法名 来作为方法名。比如上一章中的,结构体类型 Foo ,有 Sum 方法,客户端实例化 foo 对象后,远程调用 Client.Call 时,传入的就是 foo.Sum

如何方法注册到 map 中

首先,先说数据存储方式,简单来说是这样的,

flowchart LR
    A[Server.serviceMap] -->|map储存多个service| B[Service<br>一个 service 表示一个变量类型]
    B -->|service包含一个 map 变量| C[service.method]
    C -->|map 储存多个方法| D[methodType<br>一个 methodType 表示一个该变量类型的方法]
 

具体数据存储方式设计如下:

服务端保存着一个 serviceMap。使用 serviceMap 保存变量类型及其方法。key 是变量类型名,value 是 service 类型实例。

1
2
3
4
5
// server.go
// Server 定义 Server 结构体,封装了 Accept、ServeConn、serveCodec 方法
type Server struct {
serviceMap sync.Map
}

service 类型定义如下,一个 service 表示一个变量类型。其中,也包含一个 map,保存着这个变量的方法。key 是方法名,value 是 methodType 类型实例。

1
2
3
4
5
6
7
8
// service.go
// 一个 service 表示一个变量类型
type service struct {
name string // 变量类型名,用来打 log,例如字符串 "Foo"
typ reflect.Type // 变量类型,通过变量的类型,可以直接获取变量的方法,例如 Foo 类型,可获取到方法 Sum
rcvr reflect.Value // 变量的值,通过变量的值,可以调用变量的方法,例如 &foo,调用 Sum
method map[string]*methodType // map 储存变量的方法中所有可调用的方法
}

methodType 类型定义如下,一个 methodType 表示一个方法。

1
2
3
4
5
6
7
8
// service.go
// 一个 methodType 表示一个方法
type methodType struct {
method reflect.Method // 方法本身,用于调用方法,例如 Foo.Sum
ArgType reflect.Type // 参数的类型,用于判断参数是否正确
ReplyType reflect.Type // 返回的类型,用于判断返回值是否正确
numCalls uint64 // 方法调用次数,用于统计方法调用次数
}

最后,下面将以一个实际例子,演示如何将方法注册到 map 中。

graph LR
    A[startServer, 启动服务器] --> B[Server.Register , 为 Foo 类型注册服务]
    B --> C[newService, 为 Foo 类型创建 service 结构体]
    C --> D[获取 rcvr 值, 类型名和类型]
    C --> G[service.registerMethods, 获取方法列表]
    G --> K[检查方法的参数和返回值]
    K --> L[保存方法到 map 变量 service.method]

  1. 启动服务,将 Foo 类型注册到 rpc server 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// main.go

type Foo int // Foo 类型,实现了 Sum 方法
type Args struct{ Num1, Num2 int }
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}

func startServer(addr chan string) {

var foo Foo // 实例化 Foo 类型的对象
if err := geerpc.Register(&foo); err != nil { // 注册 Foo 类型的对象,注册的是 Foo 类型的对象,不是 Foo 类型的方法
log.Fatal("register error:", err)
}

l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
  1. 为 Foo 类型创建 service 对象,并保存到 server 的 map 中。
1
2
3
4
5
6
7
8
9
10
11
// server.go
var DefaultServer = NewServer()
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

func (server *Server) Register(rcvr interface{}) error {
s := newService(rcvr) // 为 rcvr 变量的类型创建 service 结构体
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup { // 调用 serviceMap.LoadOrStore 将 service 结构体保存到 map 中
return errors.New("rpc: service already defined: " + s.name)
}
return nil
}
  1. 创建 service 对象、获取对象的方法、检查方法参数和返回值,最后将方法保存到 map 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// service.go
// newService 通过发射获取 rcvr 变量的类型及其方法
func newService(rcvr interface{}) *service {
s := new(service)
s.rcvr = reflect.ValueOf(rcvr) // 获取 rcvr 变量的值,例如 &foo
s.name = reflect.Indirect(s.rcvr).Type().Name() // 获取 rcvr 变量的类型名,例如 "Foo"
s.typ = reflect.TypeOf(rcvr) // 获取 rcvr 变量的类型,例如 Foo
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods() // 获取 rcvr 变量的方法列表,例如 Foo.Sum,并将其保存到 service 的方法 map 中
return s
}

// registerMethods 获取 rcvr 变量的方法列表,例如 Foo.Sum,并将其保存到 service 的方法 map 中
func (s *service) registerMethods() {
s.method = make(map[string]*methodType)

for i := 0; i < s.typ.NumMethod(); i++ { // 遍历 rcvr 变量的方法列表
method := s.typ.Method(i) // 获取 rcvr 变量,第 i 个方法
mType := method.Type // 获取 rcvr 变量的方法的类型

// 检查方法的参数是否正确。rpc 的方法必须满足以下条件:
// 1. 方法有 3 个参数,第 1 个参数是 rcvr 变量(相当于 python 的 self,java 的 this),第 2 个参数是传入参数,第 3 个参数是传出参数,指针类型
// 2. 第 2 个参数和第 3 个参数都是导出的类型
// 3. 返回值只有一个,是 error 类型
// 例如这样 func (t *T) MethodName(argType T1, replyType *T2) error
if mType.NumIn() != 3 || mType.NumOut() != 1 { // 检查参数个数
continue
}
if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() { // 检测返回值个数和类型
continue
}
argType, replyType := mType.In(1), mType.In(2)
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) { // 检查参数类型是否是导出的类型或者内置类型
continue
}

// 检查完毕,将方法保存到 service 的方法 map 中
s.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
}
}

// isExportedOrBuiltinType 检查类型是否是导出的类型或者内置类型
func isExportedOrBuiltinType(t reflect.Type) bool {
return ast.IsExported(t.Name()) || t.PkgPath() == ""
}

如何从 map 获取方法

在 readRequest 时,从请求头中,获取方法名;根据方法名在对应的 service 的 map 中获取到方法、传入参数、传出参数。

graph LR
    A[Server.readRequest] -->|解析请求头| B[Server.readRequestHeader]
    B-->|返回请求头,包含方法名| A
    A -->|获取方法| C[Server.findService]
    A -->|获取方法传入参数,传出参数| D[GobCodec.ReadBody]
    E[request]
    C-->|方法保存到 request| E
    D-->|传入参数,传出参数保存到 request| E
  1. request 储存了一次远程调用请求的信息,方法、传入参数、传出参数指针。它的定义如下。
1
2
3
4
5
6
7
8
// server.go
// request 表示一次调用的所有信息
type request struct {
h *codec.Header // 请求头
svc *service // 请求对应的服务,使用 svc.call 调用对应的方法
mtype *methodType // 请求对应的方法,是 svc.call 的第一个参数
argv, replyv reflect.Value // 方法的传入参数和传出参数,是 svc.call 的第二个和第三个参数
}
  1. readRequest 获取方法名、方法、传入参数、传出参数,并储存在 request 实例中,为方法调用做准备。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// server.go
// readRequest 读取请求:调用 readRequestHeader 方法读取请求头,调用 ReadBody 方法读取请求参数,返回 request 结构体
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
// 读取请求头
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}

// 初始化请求结构体
req := &request{h: h}

// 根据请求头中的 ServiceMethod 字段找到对应的服务和方法类型
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
if err != nil {
return req, err
}

// 创建传入参数和传出参数的反射对象
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()

// 检查请求传入参数的类型是否为指针类型,如果不是,则使用 Addr() 方法将 req.argv 转换为指针类型
// 为什么?
// 因为如果传入值是值类型,传入后,是值拷贝,不会修改传入变量的原值,所以需要使用 Addr() 获取地址后传入。
argvi := req.argv.Interface() // 使用 interface() 方法将 req.argv 转换为 interface{} 类型,这样可以传入任意类型的参数
if req.argv.Type().Kind() != reflect.Ptr {
argvi = req.argv.Addr().Interface()
}

// ReadBody 方法会将请求参数解码到 argvi 中储存
if err = cc.ReadBody(argvi); err != nil {
log.Println("rpc server: read body err:", err)
return req, err
}
return req, nil
}

另外,需要注意一下,

  • 创建传入参数时,需区分值类型还是指针类型。
  • 创建传出参数时,需对 map、slice 特殊处理。(因为 reflect.New 初始化时,map、slice 都是 nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// service.go
// newArgv 创建一个参数变量
func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
// 参数可能是指针类型也可能是值类型
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem()) // 指针类型,则创建指针类型变量
} else {
argv = reflect.New(m.ArgType).Elem() // 值类型,则创建值类型变量
}
return argv
}

// newReplyv 创建一个传出参数变量
func (m *methodType) newReplyv() reflect.Value {
// 传出参数必须是指针类型
replyv := reflect.New(m.ReplyType.Elem())

// 为什么需要对 map 和 slice 特殊处理?
// reflect.New 创建的 map 和 slice 都是 nil,需要先初始化,再使用。
// 为什么 newArgv 方法不需要对 map 和 slice 特殊处理?
// 因为 argv 是用于接收调用方传入的参数,这些参数由调用方提供且已经初始化。
switch m.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))
}
return replyv
}

如何调用方法

在 handleRequest 时,通过 service.call 利用反射来调用函数。这里主要学习这个用法。

graph LR
    A[Server.serveCodec]-->|获取请求信息| B[Server.readRequest] 
    A -->|调用方法| C[Server.handleRequest]
    C --> D[request.svc.call, 也就是 Service.call]
    D -->|使用反射调用| E[methodType.method.Func.Call]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// server.go
// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用

if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// service.go
// call 调用 rcvr 变量的方法,例如 Foo.Sum
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
atomic.AddUint64(&m.numCalls, 1)
f := m.method.Func
// 用反射的方式调用方法
// 第一个参数是 rcvr 变量,例如 &foo,类似于 java 的 this,python 的 self
// 第 2 个参数是传入参数,第 3 个参数是传出参数,指针类型
returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})
if errInter := returnValues[0].Interface(); errInter != nil {
return errInter.(error)
}
return nil
}

更深入了解反射

请看这篇 https://www.aimtao.net/go#11-reflect

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
# 代码结构
version_3_service
├── client.go
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
├── server.go
├── service.go
└── service_test.go

4.超时处理

哪些地方需要超时处理

简单来看整个流程:

  • 客户端:

    1. 拨号连接 ✓

    2. 远程调用(发送数据)✓

    3. 等待服务端处理 ✓

    4. 接收返回的结果(接收数据)✓

  • 服务端:

    1. 端口监听

    2. 接收请求信息(接收数据)✓

    3. 调用方法处理请求信息(处理数据)✓

    4. 返回请求结果(发送数据)✓

以上打 ✓ 的步骤,均可能出现超时,主要为,

  • 客户端建立连接超时
  • 发送数据
    • 客户端/服务端写报文时超时
  • 处理数据
    • 服务端调用方法超时
  • 接收数据
    • 客户端等待服务端响应超时
    • 客户端/服务端读取报文超时

基于这个超时的情景,我们可以在以下三个地方设置超时处理机制。

客户端:

  • 建立连接
  • Client.call() 的整个过程(包含发送数据、等待处理、接收数据)

服务端:

  • Server.handleRequest() 的整个过程(包括处理数据、发送数据,接收数据先不管了)

建立连接的超时处理

启动一个 goroutine 来拨号建立连接,使用 select 设置超时器,阻塞等待拨号结果,如果在接收拨号结果之前,先收到了超时信号,则进行超时处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// client.go
type NewClientFunc func(conn net.Conn, opt *Option) (*Client, error)

func Dial(network, address string, opts ...*Option) (client *Client, err error) {
return dialTimeout(NewClient, network, address, opts...)
}

type dialResult struct {
client *Client
err error
}

func dialTimeout(f NewClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
// 默认使用 Gob 编码
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}

// 声明一个通道,用于传输拨号建立连接的结果
result := make(chan dialResult)

// 当发生错误时,保证 client 为 nil
defer func() {
if err != nil {
client = nil
}
}()

// 启动一个 goroutine 连接服务器,连接成功后,调用 f 创建 Client 实例,并将结果发送到 result 通道中
go func() {
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
result <- dialResult{client: nil, err: err}
return
}
client, err = f(conn, opt)
result <- dialResult{client: client, err: err}
}()

// 超时处理,阻塞等待,等待超时或收到结果
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: %v", opt.ConnectTimeout)
case result := <-result:
return result.client, result.err
}
}

超时的时长,设置在 Option struct 内,便于用户自定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// server.go

// Option 定义 Option 结构体,封装了 MagicNumber 和 CodecType 字段,从 conn 中解析出 Option 的信息,表示 RPC 消息的编码方式
type Option struct {
MagicNumber int
CodecType codec.Type
ConnectTimeout time.Duration // Client 建立连接的超时时间
HandleTimeout time.Duration // Client.Call() 整个过程的超时时间
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: time.Second * 10,
//HandleTimeout: time.Second * 10, // 默认为 0,不设置超时时间
}

Call 的超时处理

把选择权交给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// client.go

// Call 同步调用,阻塞等待响应结果
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))

// 等待响应结果/超时
select {
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
}

用户如何使用?

1
2
3
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)

handleRequest 超时处理

这里先不管读取数据了,仅对 handleRequest 做超时处理。handleRequest 其中又分为两个步骤,处理数据、发送数据,仅对处理数据做超时处理。

设置两个 channel,一个表示 req.svc.call 完成,一个表示 server.sendResponse 完成。当 req.svc.call 完成后,就不用管超时时间是否到达,继续让 server.sendResponse 发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// server.go

// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()

called := make(chan struct{}, 1) // 设置缓冲区为 1,防止在超时后,无人接收 channel 数据,导致 channel 发送时阻塞,导致 goroutine 泄漏
sent := make(chan struct{}, 1) // 设置缓冲区为 1,防止在超时后,无人接收 channel 数据,导致 channel 发送时阻塞,导致 goroutine 泄漏


go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用
called <- struct{}{} // 调用完成, 不管是否超时,继续发送数据
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()

if timeout == 0 { // 没有超时时间,直接等待
<-called
<-sent
return
}

// 有超时时间,使用 select 等待超时或调用完成
select {
case <-time.After(timeout):
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called: // 如果调用完成,则不管超时时间,等待 sent(仅对 req.svc.call 做超时处理)
<-sent
}
}

测试代码

测试客户端处理连接超时的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// client_test.go
func TestClient_dialTimeout(t *testing.T) {
t.Parallel() // 设置测试项并行执行

f := func(conn net.Conn, opt *Option) (client *Client, err error) {
_ = conn.Close()
time.Sleep(time.Second * 2)
return nil, nil // 模拟了一个没有错误的客户端连接
}

l, _ := net.Listen("tcp", ":0")

// 测试客户端连接有超时处理的情况
t.Run("connect timeout", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: time.Second})
_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")
})

// 测试客户端连接没有超时处理的情况
t.Run("0", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: 0})
_assert(err == nil, "0 means no limit")
})
}

测试客户端处理调用超时和服务端处理超时的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// server_test.go
type ServiceTemp int

// ServiceTemp 有一个方法 Timeout,该方法耗时2s
func (s ServiceTemp) Timeout(args int, reply *int) error {
time.Sleep(time.Second * time.Duration(args))
*reply = 0
return nil
}

func TestClient_Call(t *testing.T) {
t.Parallel()

addrCh := make(chan string)
go func(chan string) { // 启动一个服务器,监听 0 端口,注册 ServiceTemp 类型的对象,然后启动 Accept 方法,等待客户端连接
var s ServiceTemp
_ = Register(&s)
l, _ := net.Listen("tcp", ":0")
addrCh <- l.Addr().String()
Accept(l)
}(addrCh)
addr := <-addrCh

// 测试客户端处理调用超时的情况
t.Run("client call timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr)

ctx, _ := context.WithTimeout(context.Background(), time.Second) // 创建一个超时的 context,如果 1s 内没有返回结果,context.Done() 会传出信号 struct{}{}
var reply int
err := client.Call(ctx, "ServiceTemp.Timeout", 20, &reply) // 调用 ServiceTemp.Timeout 方法,Call 发生超时
_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")
})

// 测试服务端处理超时的情况
t.Run("server handle timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr, &Option{HandleTimeout: time.Second})
var reply int
err := client.Call(context.Background(), "ServiceTemp.Timeout", 20, &reply) // 调用 ServiceTemp.Timeout 方法,服务端处理超时
_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")
})
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
# 代码结构
version_4_timeout
├── client.go
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
├── server.go
├── service.go
└── service_test.go

5.支持 HTTP 协议

为什么需要支持 HTTP 协议

当前我们 RPC 框架是基于 TCP 协议的。

1
2
// main.go
l, _ := net.Listen("tcp", ":9999") // 使用 TCP 协议

为什么需要支持 HTTP 协议?

  • 兼容性:很多应用是基于 HTTP 协议的,RPC 框架支持 HTTP,便于兼容。
  • 方便调试:通过 HTTP 提供调试界面,可以实时监控RPC服务的状态和调用统计,方便开发和运维。
  • 安全性:HTTP 可以和 TLS/SSL 结合,提供安全的通信方式。

如何支持 HTTP 协议

基于通信流程梳理需要做哪些事情:

  1. 客户端向 RPC 服务器,发送 HTTP CONNECT 方法请求
  2. 服务端响应 HTTP CONNECT 方法请求,表示建立连接
  3. 客户端使用建立的连接,发送 RPC 报文(先发送 option,再发送 RPC 报文)
  4. 服务端处理 RPC 请求,并响应。

客户端发送请求

服务端响应请求

在哪里监听?


从零实现系列|RPC
https://www.aimtao.net/7days-rpc/
Posted on
2024-10-18
Licensed under