从零实现系列|RPC

0.序言

为什么需要 RPC

最直观得是,客户端可以像调用本地程序一样,进行远程调用,使用者无需关注内部的实现细节。

另外一种广泛使用的调用方式是基于 HTTP 协议得 Restful API,让 gpt 总结一下。

对比维度Restful APIRPC
协议基于 HTTP 协议(如 HTTPS)通常使用自定义协议(如 TCP 或高效二进制协议)
通信方式基于 HTTP 的请求-响应模型(如 GET/POST)类似本地方法调用的请求-响应模型(对开发者透明)
报文格式文本格式(JSON/XML),冗余较多二进制编码(如 Protobuf),精简高效
性能较低(文本解析、冗余数据)较高(二进制压缩、高效序列化)
使用场景通用性强,适合跨语言、对外的开放接口高性能要求高,适合内部服务间通信
可扩展性扩展依赖网关等中间件,功能相对固定原生支持注册中心、负载均衡、超时处理等扩展功能
抽象模型面向资源的抽象,通过 URI 唯一标识资源,通过 HTTP 方法定义操作面向过程的抽象,客户端直接调用服务端的函数或方法

RPC 框架需要解决哪些问题

  • 通信协议:如何选择传输协议(TCP/HTTP/Unix Socket)?如何设计协议格式(如报文头、体结构)?

  • 编码方式:如何高效编码/解码数据?如何压缩报文?

  • 可用性问题

    • 超时处理:超时控制(如客户端连接超时、服务端处理超时)避免资源阻塞。
    • 请求管理:如何支持并发请求、异步请求?
  • 服务治理组件

    • 注册中心:服务动态注册与发现,健康检查(心跳机制)
    • 负载均衡:如何分配请求到多个服务实例?

以上种种,业务之外的公共能力,RPC 框架均需具备。

注意:RPC 框架主要实现的是 client stub、server stub,为用户封装细节。

对于 stub 不了解,可以看了解:RPC 开发的四大要素

市面上的 RPC 框架 有哪些

框架名称特点
net/rpcGo 标准库,轻量级,支持 TCP/HTTP 协议,默认使用 Gob 编码
gRPCGoogle 开源,基于 HTTP/2 和 Protobuf,跨语言,高性能,支持流式通信
rpcx高性能、支持多种编码(JSON/Protobuf),集成注册中心、负载均衡
go-micro微服务框架,包含 RPC 模块,支持插件化(注册中心、编码协议等)

本文如何从零实现 RPC 框架

  1. 从零实现标准库 net/rpc
  2. 新增了协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性

1.服务端与消息编码

如何设计一个 RPC 请求

这个问题也可以问成:一个 RPC 请求,应该传入传出什么数据,选择使用什么数据结构?

举个例子:一个典型的 RPC 调用。

1
err = client.Call("Arith.Multiply", args, &reply)
  • Arith.Multiply:服务名 Arith 和方法名 Multipy
  • args:传入参数
  • reply:传出参数

我们将服务名、方法名放在请求的 header 中,传输传出参数放在 body 中。

其中 header 定义为:

1
2
3
4
5
6
7
// codec/codec.go

type Header struct {
ServiceMethod string // 服务名和方法名
Seq uint64 // 请求序号
Error string // 客户端置为空,服务端如何出现错误,将错误信息写入Error
}

用什么编解码方式进行编解码

编码方式可以选择 json、gob、protobuf 等等,为了更好的兼容性,我们抽象出来 Codec 的类型。

1
2
3
4
5
6
7
8
9
// codec/codec.go

// Codec 接口:对消息体进行编码,比如 Gob、Json 会实现该接口,代表两种编码方式
type Codec interface {
io.Closer
ReadHeader(*Header) error
ReadBody(interface{}) error
Write(*Header, interface{}) error
}

之后想使用哪个编码方式都可以,只要实现了 Codec 接口即可。

将各个编解码方式名称硬编码保存下来,并使用 map 保存各个编解码方式的构造函数。所以使用的流程是,通过编解码方式名称获取到该编解码方式的构造函数,调用该构造函数,创建编解码的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// codec/codec.go

type Type string
const (
GobType Type = "application/gob"
JsonType Type = "application/json"
)

// NewCodecFunc Codec 接口类型的构造函数
type NewCodecFunc func(io.ReadWriteCloser) Codec

// NewCodecFuncMap 通过 type 来选择对应的 codec 的构造函数
var NewCodecFuncMap map[Type]NewCodecFunc

func init() {
NewCodecFuncMap = make(map[Type]NewCodecFunc)
NewCodecFuncMap[GobType] = NewGobCodec // 赋值为 Gob codec 的构造函数(NewGobCodec 为 Gob codec 的构造函数,在 codec/gob.go 实现。)
//NewCodecFuncMap[JsonType] = NewJsonCodec // 赋值为 Json codec 的构造函数, 暂不实现
}

如何实现编解码方式的接口

下面将以 Gob 为例,说明如何实现 Codec 接口。先定义 Gob 编解码的结构体 GobCodec 。

1
2
3
4
5
6
7
8
// codec/gob.go

type GobCodec struct {
conn io.ReadWriteCloser
buf *bufio.Writer // 使用缓冲流来提高性能,防止阻塞
dec *gob.Decoder
enc *gob.Encoder
}

实现 Codec 的接口函数,主要是封装 encoding/gob 的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// codec/gob.go

// ReadHeader 封装了 encoding/gob 的 Decode 方法,从连接中读取 Header 信息
func (c *GobCodec) ReadHeader(header *Header) error {
return c.dec.Decode(header)
}

// ReadBody 封装了 encoding/gob 的 Decode 方法,从连接中读取 Body 信息
func (c *GobCodec) ReadBody(body interface{}) error {
return c.dec.Decode(body)
}

// Write 封装了 encoding/gob 的 Encode 方法,将 Header 和 Body 信息写入连接中
func (c *GobCodec) Write(header *Header, body interface{}) (err error) {
defer func() { // 关闭前,先将 buf 中的数据写入连接中
_ = c.buf.Flush()
if err != nil {
_ = c.Close()
}
}()

if err = c.enc.Encode(header); err != nil {
log.Panicln("rpc codec: gob error encoding header: ", err)
return err
}
if err = c.enc.Encode(body); err != nil {
log.Panicln("rpc codec: gob error encoding body: ", err)
return err
}
return nil
}

// Close 封装了 io.ReadWriteCloser 的 Close 方法,关闭连接
// io.ReadWriteCloser 包含 io.Closer 类型,io.Closer 类型包含 Close 方法
func (c *GobCodec) Close() error {
return c.conn.Close()
}

最后再实现 GobCodec 的构造函数,方便初始化时,传给 Codec 接口变量。

1
2
3
4
5
6
7
8
9
10
11
// codec/gob.go

func NewGobCodec(conn io.ReadWriteCloser) Codec {
buf := bufio.NewWriter(conn)
return &GobCodec{
conn: conn,
buf: buf,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
}
}

P.S. 为什么 GobCodec 需要加一个 buf ?

用于缓冲写入操作,减少系统调用次数。如果没有 buf,每次小数据写入直接触发系统调用,增加延迟;频繁的 I/O 操作,影响性能。

通信过程如何协商编码方式

以 HTTP 报文为例,HTTP 报文分为 header 和 body 两个部分。客户端和服务端收发消息时,只需要先解析 header 部分,就知道 body 的格式 Content-Type、长度 Content-Length。

在 RPC 协议的报文里,为了提升性能,仅在报文最开始规划固定的字节,来协商编码方式。

1
2
3
4
5
6
7
// server.go

// Option 定义 Option 结构体,封装了 MagicNumber 和 CodecType 字段,从 conn 中解析出 Option 的信息,表示 RPC 消息的编码方式
type Option struct {
MagicNumber int // 验证连接的合法性,确保客户端和服务端使用同一协议
CodecType codec.Type // 编码方式的名称
}

所以实际的报文是这样的。

1
| Option | Header1 | Body1 | Header2 | Body2 | ...

那 Option 使用什么编码方式呢?

为了方便,我们可以规定,Option 使用 JSON 编码,后面的 header、body 使用 Option 规定的编解码方式编解码。

1
2
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ Option 固定使用 JSON 编码 ------> | <------- Header/Body 编码方式由 CodeType 决定 ------->|

如何实现一个服务端

先看一下,服务端要做什么,要实现哪些功能。

  • 建立连接:实现 Accept 方法,等待 socket 连接,并开启协程处理请求。
  • 处理请求:
    • 解析 Option 信息,检查 Option.MagicNumber 是匹配,根据 Option.CodecType 实例化编解码器。
    • 读取请求:使用编解码器实例解码 header 和 body。
    • 处理请求。
    • 回复请求。

搞清楚需要实现哪些功能,就可以开始具体实现了。(可以先看主要的调用流程,具体实现实现细节晚点再看)

  • 服务器实例的构造函数。
1
2
3
4
5
6
7
8
9
10
// server.go

const MagicNumber = 0x3bef5c

// Server 定义 Server 结构体,封装了 Accept、ServeConn、serveCodec 方法
type Server struct{} // 肚子里没什么要放的

func NewServer() *Server {
return &Server{}
}
  • Accept 处理连接:建立 socket 连接,使用 goroutine 处理连接
1
2
3
4
5
6
7
8
9
10
11
12
// server.go

func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept() // 建立 socket 连接
if err != nil {
log.Println("rpc server: accept error: ", err)
return
}
go server.ServeConn(conn) // 使用 goroutine 处理连接
}
}
  • ServeConn 处理消息:解析出 Option 信息,根据 CodecType 选择对应的 codec,调用 serveCodec 方法处理剩下的消息

    • json.NewDecoder(conn).Decode(&opt) 的含义?
      • json.NewDecoder(conn) 创建一个从连接读取JSON数据的解码器 *Decoder
      • *Decoder.Decode(&opt) 将JSON数据解码到 opt 结构体中,一次只读一个 JSON。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// server.go

func (server *Server) ServeConn(conn io.ReadWriteCloser) {
defer func() {
_ = conn.Close()
}()

var opt Option
if err := json.NewDecoder(conn).Decode(&opt); err != nil { // opt 是传出参数,读到 RPC 前面的 JSON 数据,这包含了 option 信息,表示 RPC 消息的编码方式
log.Println("rpc server: options error: ", err)
return
}
if opt.MagicNumber != MagicNumber { // 验证 MagicNumber,证明请求合法
log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)
return
}
f := codec.NewCodecFuncMap[opt.CodecType] // 获取编解码器的构造函数
if f == nil {
log.Printf("rpc server: invalid codec type %s", opt.CodecType)
return
}
server.serveCodec(f(conn))
}
  • serveCodec 处理请求:调用 readRequest 方法读取请求,调用 handleRequest 方法处理请求。
    • 使用协程来并发处理请求。
    • 使用 sync.WaitGroup 保证即使因为读请求出错退出循环,也等待所有协程请求处理完成再关闭连接,优雅终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// serveCodec 处理请求:调用 readRequest 方法读取请求,调用 handleRequest 方法处理请求
func (server *Server) serveCodec(cc codec.Codec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)

for {
req, err := server.readRequest(cc)
if err != nil {
break
}

wg.Add(1)
go server.handleRequest(cc, req, sending, wg) // 使用协程来并发处理请求。
}
wg.Wait() // 即使因为读请求出错退出循环,也需要等待所有协程请求处理完成再关闭连接,优雅终止。
_ = cc.Close()
}
  • readRequest 读取请求:调用 readRequestHeader 方法读取请求头,再调用 ReadBody 方法读取请求参数,返回 request 结构体

    • 这里定义了请求体的结构体 request,其中,h 储存 header,argv 储存传入参数,replyValue 储存传出参数,也就是返回值。
    • readRequestHeader 就是进一步调用 编解码器 的 ReadHeader 方法,这里的编解码器是 gob。gob 的 ReadHeader 方法是封装的 encoding/gob 的 Decode 方法。ReadBody 同理。
  • 什么是 reflect.Value 类型?

    • 两个前置知识:具体看 reflect
      • 每个 interface{} 类型的变量都包含一对值 (type,value),type 表示变量的类型信息,value 表示变量的值信息。
      • 反射就是把 interface{} 类型变量转化为 reflect.Value 或 reflect.Type 类型变量,随后用 reflect 包中的方法对它们进行各种操作
    • reflect.Value 是反射值,是描述值的一个容器。
  • argv 和 replyValue 为什么都是 reflect.Value 类型呢?

    • 因为对于服务端来说,他并不知道客户端会调用哪个具体的方法、要传入什么类型的参数、要传出什么类型的参数。
    • 所以需要在运行时,动态地获取参数类型和返回值类型,并取指和赋值。
  • req.argv = reflect.New(reflect.TypeOf("")) 的含义?

    • reflect.TypeOf("") 获取空字符串的类型信息,得到 reflect.Type 类型变量,描述 string 类型。
    • reflect.New(t) 创建一个该类型 t 的指针值,返回一个 reflect.Value 对象,实际内部是一个指针类型,指针类型为 *t,这里也就是描述 *string 类型的容器。(相当于 new(string)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 举个例子
    v := reflect.New(reflect.TypeOf("")) // v 是 reflect.Value, 类型是 *string
    fmt.Println(v.Type()) // 输出 *string
    fmt.Println(v.Elem().Type()) // 输出 string
    v.Elem().SetString("hello") // 设置值
    fmt.Println(v.Elem().Interface()) // 输出 hello

    // 等价于:
    var s string // 声明字符串变量
    ptr := &s // 获取指针
    v = reflect.ValueOf(ptr) // 包装为 reflect.Value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// server.go

type request struct {
h *codec.Header // 读到的请求头
argv, replyValue reflect.Value // argv 储存传入参数,replyValue 储存传出参数。reflect.Value 是反射值,是描述值的一个容器。
}

// readRequest 读取请求:调用 readRequestHeader 方法读取请求头,调用 ReadBody 方法读取请求参数,返回 request 结构体
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}
req := request{
h: h,
}

req.argv = reflect.New(reflect.TypeOf("")) // 初始化传入参数,使用 New 方法创建一个表示 *string 类型值的反射值。reflect.Value 是描述值的一个容器。
if err = cc.ReadBody(req.argv.Interface()); err != nil { // 读取请求参数,ReadBody 函数的参数类型是 interface{},需要使用 Interface() 方法,将 reflect.Value 转化为 interface{} 类型
log.Println("rpc server: read body error: ", err)
}

return &req, nil
}

func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {
var h codec.Header
if err := cc.ReadHeader(&h); err != nil {
if err != io.EOF && !errors.Is(err, io.ErrUnexpectedEOF) {
log.Println("rpc server: read header error: ", err)
}
return nil, err
}
return &h, nil
}
  • handleRequest 处理请求:这部分目前处理的比较简单,主要是根据请求,构造请求的响应信息,并通过 sendResponse 发送给请求方。
    • 为什么要加锁?处理请求可以并发的读,但是发送数据不能并发写入链接,否则多个回复报文交织在一起,影响客户端解析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// server.go

// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

log.Println("handleRequest: ", req.h, req.argv.Elem())
req.replyValue = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq)) // 构造请求的响应信息
server.sendResponse(cc, req.h, req.replyValue.Interface(), sending) // 发送响应
}

func (server *Server) sendResponse(cc codec.Codec, header *codec.Header, body interface{}, sending *sync.Mutex) {
sending.Lock() // 加锁,防止并发写
defer sending.Unlock()
if err := cc.Write(header, body); err != nil {
log.Println("rpc server: write response error: ", err)
}
}

如何实现一个客户端

这里在 main 函数中实现一个简易的客户端,验证一下 server 的功能。

客户端主要做的事:

  • 连接到服务器
  • 构造 header 和 body,发送请求
  • 接收请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// main/main.go

func main() {
addr := make(chan string)
// 启动服务端
go startServer(addr)

// 以下是客户端的逻辑
addrString := <-addr
fmt.Println(addrString)
conn, _ := net.Dial("tcp", addrString) // 建立到服务端的连接
defer func() {
_ = conn.Close()
}()

time.Sleep(time.Second)

_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption) // 将 DefaultOption 以 JSON 格式编码,并发给服务端,用于协商 RPC 通信参数
cc := codec.NewCodecFuncMap[geerpc.DefaultOption.CodecType](conn) // 根据 DefaultOption 中的 CodecType 选择对应的 codec

// 发送 5 个请求并接收响应
for i := 0; i < 5; i++ {
h := &codec.Header{ // 构造请求头
ServiceMethod: "Foo.Sum",
Seq: uint64(i),
}

// 发送 header 和 body,其中 body 是一个字符串,格式为 "geerpc req %d"
_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))

var replyHeader codec.Header
_ = cc.ReadHeader(&replyHeader) // 读取响应头
var reply string
_ = cc.ReadBody(&reply) // 读取响应体
log.Println("main: reply: ", replyHeader, reply)
}
}

func startServer(addr chan string) {
// 监听随机可以用的端口
listen, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error", err)
}
log.Println("startServer: start rpc server on", listen.Addr())

// 将监听地址返回给主协程
addr <- listen.Addr().String()
geerpc.Accept(listen) // 开始处理连接
}

/* output
[::]:43017
handleRequest: &{Foo.Sum 0 } geerpc req 0
main: reply: {Foo.Sum 0 } geerpc resp 0
handleRequest: &{Foo.Sum 1 } geerpc req 1
main: reply: {Foo.Sum 1 } geerpc resp 1
handleRequest: &{Foo.Sum 2 } geerpc req 2
main: reply: {Foo.Sum 2 } geerpc resp 2
handleRequest: &{Foo.Sum 3 } geerpc req 3
main: reply: {Foo.Sum 3 } geerpc resp 3
handleRequest: &{Foo.Sum 4 } geerpc req 4
main: reply: {Foo.Sum 4 } geerpc resp 4
*/

完整代码

1
2
3
4
5
6
7
8
9
# 代码结构
version_1_codec
├── codec
│   ├── codec.go # 编码方式的抽象接口
│   └── gob.go # 实现 Codec 接口
├── go.mod
├── main
│   └── main.go # 简易客户端,发送请求
└── server.go # 服务端 stub,接受请求

2.高性能客户端

虽然这章节叫高性能客户端,实际上具体来说,应该是 client stub,为真正的 client (具体的业务代码)提供基础设施。

客户端需要提供哪些能力

首先我们期望用户可以怎么用我们这个框架。

1
2
3
4
5
6
// 初始化客户端,连接到服务器
client, err := geerpc.Dial("tcp", "127.0.0.1:8080")

// 发送请求
var args, reply string
err = client.Call("methodName", args, &reply)

所以肯定需要提供 Dial、Call 这两个方法给用户使用。但在这背后的能力,应该有这些:

  • 连接管理的能力:

    • 网络连接建立:通过 Dial 方法连接到服务器。
    • 协议协商:在建立连接后,完成协议交换(通过发送 option 协商)。
    • 连接状态管理:当用户主动关闭连接或者程序出现异常时,关闭连接并释放资源。
  • 请求管理的能力:

    • 请求封装:将单次的请求信息封装在一个结构体中。
    • 唯一标识:为每个请求生成一个唯一的序例号。
  • 通信能力:

    • 请求发送:
      • 序例化 header 和 body。
      • 发送请求(通过锁来保证原子性)。
      • 并发的处理多个请求。
    • 响应接收:
      • 持续接收响应。
      • 异常处理:服务出错、请求不存在(已移除)。

虽然这些能力看着很多,但大多是细节。接下来,我们自顶向下地实现 Dial、Call 这两个核心方法。

如何实现 Dial 方法

从主流程来看,Dial 有三件事:

  • 确定编码方式
  • 连接服务器
  • 返回一个 Client 实例,用户使用这个 Client 实例进行远程调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// client.go

// Dial connects to an RPC server at the specified network address
func Dial(network, address string, opts ...*geerpc.Option) (client *Client, err error) {

// 默认使用 Gob 编码
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}

// 真正拨号建立连接
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
defer func() {
if client == nil {
_ = conn.Close()
}
}()

// 返回 Client 实例
return NewClient(conn, opt)
}

确定编码方式:根据用户传入的 Option 来实例化 Codec,如果没有传入,则默认使用 gob 编码,所以这里是可选参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// client.go

func parseOptions(opts ...*geerpc.Option) (*geerpc.Option, error) {
// if opts is nil or pass nil as parameter
if len(opts) == 0 || opts[0] == nil {
return geerpc.DefaultOption, nil
}
if len(opts) != 1 {
return nil, errors.New("number of options is more than 1")
}
opt := opts[0]
opt.MagicNumber = geerpc.DefaultOption.MagicNumber
if opt.CodecType == "" {
opt.CodecType = geerpc.DefaultOption.CodecType
}
return opt, nil
}

连接服务器:使用 net 包得 Dial 方法,获得一个 net.Conn 接口类型,表示面向流的网络连接。 GobCodec 会对 conn 进一步封装,向外提供读/写/关闭的能力。

返回 client 实例:这里是 Dail 的重中之重,很多特性都需要通过 client 的设计来实现。

如何抽象一个请求

用户拿着 Client 实例,可以和服务端进行多次请求的发送和接收,也就是 Client 需要管理多个请求。所以在设计 Client 之前,需要将请求抽象出来。

  • Args、Reply:请求的参数是 interface{} 类型,因为 codec 读写方法参数都是 interface{}。
  • Done:是一个 channel,请求结束时,调用 done 方法,给 Done chanel 写值,Client 拿着这个 channel 就可以知道请求结束了。(正常结束或者异常结束)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// client.go

// Call 实例表示一次 RPC 调用请求
type Call struct {
Seq uint64 // 请求的序号
ServiceMethod string // 请求的方法名
Args interface{} // 请求的参数
Reply interface{} // 请求的响应信息
Error error
Done chan *Call // 当调用结束后,会通过 Done 通知调用者 // 这个写法有意思,channel 的类型是 *Call
}

func (call *Call) done() {
call.Done <- call
}

如何设计一个 Client

(1)基础的收发消息

1
2
cc       codec.Codec      // 消息编解码器,用于序列化请求和反序列化响应
opt *geerpc.Option // 客户端配置,比如编码方式和协议参数。

(2)发送请求提前准备的信息:多个请求共用一个 header,放在 Client 里,避免每次构造。

1
header   codec.Header     // 每个请求,共用这个同一消息头

(3)处理并发请求的工具:因为请求时并发的,会存在两个问题:不知道什么时候收到响应?不知道收到的响应是哪个请求的?

  • seq:Client 发的每个请求和接收的每个响应的 header 中,都有一个序号,保证请求的顺序和唯一性。
  • pending:未处理完的请求,比如还未发送的请求、已经发送还未回复的请求。
    • 对于未处理完的请求,调用者一直在阻塞,等待响应。当我收到响应后,可以根据序号,拿到该请求的 channel,告知调用者已经收到结果,并将响应结果储存在对应 call 的 reply 变量中。
    • 如果没有这个 map,一无法通知,二也不知道通知谁,结果储存给谁。
1
2
seq      uint64           // 用于给每个请求分配一个编号,用于区分不同的请求。(每个请求间没有顺序要求)
pending map[uint64]*Call // 每个请求对应一个 Call 实例。未处理完的请求会被保存在该字段中

(4)保证并发读写的锁

  • sending:保证每次发送给服务器,只有一个请求在发,否则服务端交叉收到不好解析。
  • mu:保护 seq、pending 、shutdown、closing 字段,防止并发读写。
1
2
sending  sync.Mutex       // 互斥锁,用于确保在同一时间只有一个请求被发送
mu sync.Mutex // 互斥锁,保护 seq、pending、shutdown、closing 字段,防止并发读写

(5)异常处理的标志位:为什么要设计两个字段?

  • closing:表示用户正在主动关闭 Client,避免多次关闭客户端,还可以做优雅退出。
  • shutdown:表示因为异常错误,Client 无法发送接收了,快速失败,对于没处理完的请求返回错误,避免调用者长时间阻塞等待响应。
1
2
closing  bool             // 是否正在关闭连接
shutdown bool // 客户端是否已经关闭

整体如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client.go

// Client 表示一个 RPC 客户端,一个客户端可以完成多个请求(Call 实例)的发送和接收
// 管理连接、请求和响应,可同时被多个协程并发使用
// 提供 Dial 方法,用于建立连接;提供 Call 方法,用于发送请求并等待响应结果
type Client struct {
cc codec.Codec // 消息编解码器,用于序列化请求和反序列化响应
opt *geerpc.Option // 客户端配置,比如编码方式和协议参数。
sending sync.Mutex // 互斥锁,用于确保在同一时间只有一个请求被发送
header codec.Header // 每个请求,共用这个同一消息头
mu sync.Mutex // 互斥锁,保护 seq、pending 、shutdown字段,防止并发读写
seq uint64 // 用于给每个请求分配一个编号,用于区分不同的请求。(每个请求间没有顺序要求)
pending map[uint64]*Call // 每个请求对应一个 Call 实例。未处理完的请求会被保存在该字段中
closing bool // 是否正在关闭连接
shutdown bool // 客户端是否已经关闭
}

基于上面这些,我们要提供几个基础的方法。

(1)创建客户端

  • 确定编码方式
  • 根据编码方式实例化 Codec
  • 根据 Codec 实例化 Client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// client.go

func NewClient(conn net.Conn, opt *geerpc.Option) (*Client, error) {

// 用 JSON 数据通知服务器,客户端的编码方式
// json.NewEncoder(conn) 创建一个 JSON Encoder 对象,Encode 方法将 opt 编码为 JSON 数据, JSON Encoder 对象将 Json 数据写入到 conn 中,也就是发给服务器
if err := json.NewEncoder(conn).Encode(opt); err != nil {
log.Println("rpc client: options error: ", err)
_ = conn.Close()
return nil, err
}

newCodecFunc := codec.NewCodecFuncMap[opt.CodecType]
if newCodecFunc == nil {
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
log.Println("rpc client: codec error:", err)
return nil, err
}
return newClientCodec(newCodecFunc(conn), opt), nil
}

func newClientCodec(cc codec.Codec, opt *geerpc.Option) *Client {
client := &Client{
seq: 1, // seq starts with 1, 0 means invalid call
cc: cc,
opt: opt,
pending: make(map[uint64]*Call),
}
go client.receive() // 启动协程持续接收响应
return client
}

(2)启动协程持续接收响应

  • 接收请求,先读取 header,其中有序号。
  • 请求已经收到响应,表示已经处理完了,该序号的请求就可以在 pending 中移出,并拿到该请求的实例 call。
  • 读取 body,并将响应储存在 call.Reply 中。(在此之前要处理两种错误)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// client.go

func (client *Client) receive() {
var err error
for err == nil { // 这个写法,会在 err 不为 nil 时退出循环,所以只会处理一次错误
// 读取请求头
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}

// 根据 h.Seq 找到对应的 Call 实例,并从 pending 中移除。
call := client.removeCall(h.Seq)

// 三种处理响应的情况
switch {
case call == nil: // Call 实例不存在,(可能客户端已经取消请求,但服务器还是在响应请求),忽略该请求
err = client.cc.ReadBody(nil)
case h.Error != "": // Call 实例存在,但服务器返回了错误
// 将错误信息写入 call.Error 中,调用 call.done() 通知调用方
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default: // Call 实例存在,服务器正常响应
// 读取响应体,将响应信息写入 call.Reply 中,调用 call.done() 通知调用方
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
//fmt.Println(call.Reply)
}
}
// 如果发生错误(如连接断开),调用 terminateCalls 方法: 将所有未完成的调用(pending 中的所有调用)标记为错误状态。通知所有调用方,释放资源。
client.terminateCalls(err)
}

(3)异常处理,快速失败

如果发生错误(如连接断开),调用 terminateCalls 方法: 将所有未完成的调用(pending 中的所有调用)标记为错误状态。通知所有调用方,释放资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client.go

// terminateCalls 方法用于在客户端关闭时,终止所有未完成的调用,并通知调用者发生了错误
func (client *Client) terminateCalls(err error) {
client.sending.Lock()
defer client.sending.Unlock()

client.mu.Lock()
defer client.mu.Unlock()

client.shutdown = true
for _, call := range client.pending {
call.Error = err
call.done()
}
}

(4)优雅退出

提供 Close 方法,给用户主动关闭 Client。

1
2
3
4
5
6
7
8
9
10
11
12
// client.go

func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()

if client.closing {
return ErrShutdown
}
client.closing = true
return client.cc.Close()
}

(5)可用性

暴露给用户。

1
2
3
4
5
6
7
// client.go

func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}

(6)注册/移出请求

主要是将请求保存在 pending 中管理,根据 seq 快速拿到请求实例 call。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// client.go

// registerCall 方法用于注册一个 Call 实例,并返回该实例的序号。
func (client *Client) registerCall(call *Call) (uint64, error) {
client.mu.Lock()
defer client.mu.Unlock()

if client.closing || client.shutdown {
return 0, ErrShutdown
}

call.Seq = client.seq
client.pending[call.Seq] = call
client.seq++
return call.Seq, nil
}

// removeCall 方法用于从 pending 中移除一个 Call 实例,表示该请求已处理完成或已取消,并返回该实例。
func (client *Client) removeCall(seq uint64) *Call {
client.mu.Lock()
defer client.mu.Unlock()

call := client.pending[seq]
delete(client.pending, seq)
return call
}

如何设计异步/同步调用方法

基础设施已经有了,万事俱备,就差发送请求了。

异步调用方法:流程很明确,主要有三步。

  • 构造 Call 请求实例
  • registerCall 注册请求
  • Codec.Write 发送请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// client.go

// Go 异步调用,不阻塞等待响应结果
func (client *Client) Go(serviceMethod string, args, reply interface{}, done chan *Call) *Call {
if done == nil {
done = make(chan *Call, 10)
} else if cap(done) == 0 {
log.Panic("rpc client: done channel is unbuffered")
}

// 为本次调用请求创建一个 Call 实例
call := &Call{
ServiceMethod: serviceMethod,
Args: args,
Reply: reply,
Done: done,
}

// 将 Call 实例发送到客户端
client.send(call)
return call
}

// send 发送请求到服务器
func (client *Client) send(call *Call) {
// make sure that the client will send a complete request
client.sending.Lock()
defer client.sending.Unlock()

// register this call.
seq, err := client.registerCall(call)
if err != nil {
call.Error = err
call.done()
return
}

// prepare request header
client.header.ServiceMethod = call.ServiceMethod
client.header.Seq = seq
client.header.Error = ""

// encode and send the request
if err := client.cc.Write(&client.header, call.Args); err != nil {
call := client.removeCall(seq)
// call 可能为 nil
// 比如由于网络或者某种错误,客户端在 receive() 中已经将该请求从 pending 中移除,此时 call 为 nil
if call != nil {
call.Error = err
call.done()
}
}
}

这个调用的方式是异步的,调完就完了,也不需要等待结果,因为有 receive 协程专门在接收数据。

举个例子,用户可以批量的调用,再统一的等待结果。

1
2
3
4
5
done1 := client.Go("Arith.Add", args1, &reply1, make(chan *Call, 1))
done2 := client.Go("Arith.Mul", args2, &reply2, make(chan *Call, 1))

call1 := <-done1
call2 := <-done2

也可以不关心返回的结果。(比如打日志、异步上报)

1
client.Go("LogService.Log", args, nil, nil) // 不需要 reply

同步调用方法

用户想调用完,等待结果再继续。我们可以在 Client.Go 方法的基础上,简单封装一个同步调用的方法。

1
2
3
4
5
6
7
// client.go

// Call 同步调用,阻塞等待响应结果
func (client *Client) Call(serviceMethod string, args, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done // 读 Done 的数据,阻塞等待 receive() 处理完响应结果,对 Done 写值
return call.Error
}

完整代码

1
2
3
4
5
6
7
8
9
10
# 代码结构
version_2_client
├── client.go # 新增:封装 RPC 请求,设计 client stub
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main # 改动:使用 client stub 发起连接和请求
│   └── main.go
└── server.go

3.服务注册

前面两章,我们都在假装调用 Foo 类型的 Sum 方法,获取到了方法名和参数返回了,没有真正去调用方法。要想真正可以调用,那就需要 client stub 把 Foo 类型(其实也是一种服务)注册到 server stub 中,这就是服务注册,下面来看细节。

这一章是干什么的

服务端的主要工作:

  • 监听端口
  • 响应请求
    • 解析请求
    • 处理请求
      • 从请求中获取方法名、参数
      • 获取方法
      • 调用方法
    • 返回结果

这一章主要完成处理请求部分,其中,从请求中获取方法名、参数在上一章已完成。client 在发送请求时,方法名在请求头里 client.header.ServiceMethod,参数在请求体里 call.Args

所以我们重点关注

  • 获取方法(获取方法的重点在于,事前将可调用的方法注册到服务中)
  • 调用方法

如何获取方法

服务端通过方法名获取实际的方法,需要两步:

  1. 在启动服务端服务时,我们可以将所有可调用的方法都加到一个 map 中。
  2. 当 client 调用 hello 时,我们从 map 中找到对应的方法皆可。

由于不同类型的结构体,可能有同名的结构体方法。我们的方法名,使用 结构体实例名.方法名 来作为方法名。比如上一章中的,结构体类型 Foo ,有 Sum 方法,客户端实例化 foo 对象后,远程调用 Client.Call 时,传入的就是 foo.Sum

如何方法注册到 map 中

首先,先说数据存储方式,简单来说是这样的,

graph LR
    A[Server.serviceMap] -->|map储存多个service| B[Service<br>一个 service 表示一个变量类型]
    B -->|service包含一个 map 变量| C[service.method]
    C -->|map 储存多个方法| D[methodType<br>一个 methodType 表示一个该变量类型的方法]
 

具体数据存储方式设计如下:

服务端保存着一个 serviceMap。使用 serviceMap 保存变量类型及其方法。key 是变量类型名,value 是 service 类型实例。

1
2
3
4
5
// server.go
// Server 定义 Server 结构体,封装了 Accept、ServeConn、serveCodec 方法
type Server struct {
serviceMap sync.Map
}

service 类型定义如下,一个 service 表示一个变量类型。其中,也包含一个 map,保存着这个变量的方法。key 是方法名,value 是 methodType 类型实例。

1
2
3
4
5
6
7
8
// service.go
// 一个 service 表示一个变量类型
type service struct {
name string // 变量类型名,用来打 log,例如字符串 "Foo"
typ reflect.Type // 变量类型,通过变量的类型,可以直接获取变量的方法,例如 Foo 类型,可获取到方法 Sum
rcvr reflect.Value // 变量的值,通过变量的值,可以调用变量的方法,例如 &foo,调用 Sum
method map[string]*methodType // map 储存变量的方法中所有可调用的方法
}

methodType 类型定义如下,一个 methodType 表示一个方法。

1
2
3
4
5
6
7
8
// service.go
// 一个 methodType 表示一个方法
type methodType struct {
method reflect.Method // 方法本身,用于调用方法,例如 Foo.Sum
ArgType reflect.Type // 参数的类型,用于判断参数是否正确
ReplyType reflect.Type // 返回的类型,用于判断返回值是否正确
numCalls uint64 // 方法调用次数,用于统计方法调用次数
}

最后,下面将以一个实际例子,演示如何将方法注册到 map 中。

graph LR
    A[startServer, 启动服务器] --> B[Server.Register , 为 Foo 类型注册服务]
    B --> C[newService, 为 Foo 类型创建 service 结构体]
    C --> D[获取 rcvr 值, 类型名和类型]
    C --> G[service.registerMethods, 获取方法列表]
    G --> K[检查方法的参数和返回值]
    K --> L[保存方法到 map 变量 service.method]

  1. 启动服务,将 Foo 类型注册到 rpc server 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// main.go

type Foo int // Foo 类型,实现了 Sum 方法
type Args struct{ Num1, Num2 int }
func (f Foo) Sum(args Args, reply *int) error {
*reply = args.Num1 + args.Num2
return nil
}

func startServer(addr chan string) {

var foo Foo // 实例化 Foo 类型的对象
if err := geerpc.Register(&foo); err != nil { // 注册 Foo 类型的对象,注册的是 Foo 类型的对象,不是 Foo 类型的方法
log.Fatal("register error:", err)
}

l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
addr <- l.Addr().String()
geerpc.Accept(l)
}
  1. 为 Foo 类型创建 service 对象,并保存到 server 的 map 中。
1
2
3
4
5
6
7
8
9
10
11
// server.go
var DefaultServer = NewServer()
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

func (server *Server) Register(rcvr interface{}) error {
s := newService(rcvr) // 为 rcvr 变量的类型创建 service 结构体
if _, dup := server.serviceMap.LoadOrStore(s.name, s); dup { // 调用 serviceMap.LoadOrStore 将 service 结构体保存到 map 中
return errors.New("rpc: service already defined: " + s.name)
}
return nil
}
  1. 创建 service 对象、获取对象的方法、检查方法参数和返回值,最后将方法保存到 map 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// service.go
// newService 通过发射获取 rcvr 变量的类型及其方法
func newService(rcvr interface{}) *service {
s := new(service)
s.rcvr = reflect.ValueOf(rcvr) // 获取 rcvr 变量的值,例如 &foo
s.name = reflect.Indirect(s.rcvr).Type().Name() // 获取 rcvr 变量的类型名,例如 "Foo"
s.typ = reflect.TypeOf(rcvr) // 获取 rcvr 变量的类型,例如 Foo
if !ast.IsExported(s.name) {
log.Fatalf("rpc server: %s is not a valid service name", s.name)
}
s.registerMethods() // 获取 rcvr 变量的方法列表,例如 Foo.Sum,并将其保存到 service 的方法 map 中
return s
}

// registerMethods 获取 rcvr 变量的方法列表,例如 Foo.Sum,并将其保存到 service 的方法 map 中
func (s *service) registerMethods() {
s.method = make(map[string]*methodType)

for i := 0; i < s.typ.NumMethod(); i++ { // 遍历 rcvr 变量的方法列表
method := s.typ.Method(i) // 获取 rcvr 变量,第 i 个方法
mType := method.Type // 获取 rcvr 变量的方法的类型

// 检查方法的参数是否正确。rpc 的方法必须满足以下条件:
// 1. 方法有 3 个参数,第 1 个参数是 rcvr 变量(相当于 python 的 self,java 的 this),第 2 个参数是传入参数,第 3 个参数是传出参数,指针类型
// 2. 第 2 个参数和第 3 个参数都是导出的类型
// 3. 返回值只有一个,是 error 类型
// 例如这样 func (t *T) MethodName(argType T1, replyType *T2) error
if mType.NumIn() != 3 || mType.NumOut() != 1 { // 检查参数个数
continue
}
if mType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() { // 检测返回值个数和类型
continue
}
argType, replyType := mType.In(1), mType.In(2)
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) { // 检查参数类型是否是导出的类型或者内置类型
continue
}

// 检查完毕,将方法保存到 service 的方法 map 中
s.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
log.Printf("rpc server: register %s.%s\n", s.name, method.Name)
}
}

// isExportedOrBuiltinType 检查类型是否是导出的类型或者内置类型
func isExportedOrBuiltinType(t reflect.Type) bool {
return ast.IsExported(t.Name()) || t.PkgPath() == ""
}

如何从 map 获取方法

在 readRequest 时,从请求头中,获取方法名;根据方法名在对应的 service 的 map 中获取到方法、传入参数、传出参数。

graph LR
    A[Server.readRequest] -->|解析请求头| B[Server.readRequestHeader]
    B-->|返回请求头,包含方法名| A
    A -->|获取方法| C[Server.findService]
    A -->|获取方法传入参数,传出参数| D[GobCodec.ReadBody]
    E[request]
    C-->|方法保存到 request| E
    D-->|传入参数,传出参数保存到 request| E
  1. request 储存了一次远程调用请求的信息,方法、传入参数、传出参数指针。它的定义如下。
1
2
3
4
5
6
7
8
// server.go
// request 表示一次调用的所有信息
type request struct {
h *codec.Header // 请求头
svc *service // 请求对应的服务,使用 svc.call 调用对应的方法
mtype *methodType // 请求对应的方法,是 svc.call 的第一个参数
argv, replyv reflect.Value // 方法的传入参数和传出参数,是 svc.call 的第二个和第三个参数
}
  1. readRequest 获取方法名、方法、传入参数、传出参数,并储存在 request 实例中,为方法调用做准备。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// server.go
// readRequest 读取请求:调用 readRequestHeader 方法读取请求头,调用 ReadBody 方法读取请求参数,返回 request 结构体
func (server *Server) readRequest(cc codec.Codec) (*request, error) {
// 读取请求头
h, err := server.readRequestHeader(cc)
if err != nil {
return nil, err
}

// 初始化请求结构体
req := &request{h: h}

// 根据请求头中的 ServiceMethod 字段找到对应的服务和方法类型
req.svc, req.mtype, err = server.findService(h.ServiceMethod)
if err != nil {
return req, err
}

// 创建传入参数和传出参数的反射对象
req.argv = req.mtype.newArgv()
req.replyv = req.mtype.newReplyv()

// 检查请求传入参数的类型是否为指针类型,如果不是,则使用 Addr() 方法将 req.argv 转换为指针类型
// 为什么?
// 因为如果传入值是值类型,传入后,是值拷贝,不会修改传入变量的原值,所以需要使用 Addr() 获取地址后传入。
argvi := req.argv.Interface() // 使用 interface() 方法将 req.argv 转换为 interface{} 类型,这样可以传入任意类型的参数
if req.argv.Type().Kind() != reflect.Ptr {
argvi = req.argv.Addr().Interface()
}

// ReadBody 方法会将请求参数解码到 argvi 中储存
if err = cc.ReadBody(argvi); err != nil {
log.Println("rpc server: read body err:", err)
return req, err
}
return req, nil
}

另外,需要注意一下,

  • 创建传入参数时,需区分值类型还是指针类型。
  • 创建传出参数时,需对 map、slice 特殊处理。(因为 reflect.New 初始化时,map、slice 都是 nil)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// service.go
// newArgv 创建一个参数变量
func (m *methodType) newArgv() reflect.Value {
var argv reflect.Value
// 参数可能是指针类型也可能是值类型
if m.ArgType.Kind() == reflect.Ptr {
argv = reflect.New(m.ArgType.Elem()) // 指针类型,则创建指针类型变量
} else {
argv = reflect.New(m.ArgType).Elem() // 值类型,则创建值类型变量
}
return argv
}

// newReplyv 创建一个传出参数变量
func (m *methodType) newReplyv() reflect.Value {
// 传出参数必须是指针类型
replyv := reflect.New(m.ReplyType.Elem())

// 为什么需要对 map 和 slice 特殊处理?
// reflect.New 创建的 map 和 slice 都是 nil,需要先初始化,再使用。
// 为什么 newArgv 方法不需要对 map 和 slice 特殊处理?
// 因为 argv 是用于接收调用方传入的参数,这些参数由调用方提供且已经初始化。
switch m.ReplyType.Elem().Kind() {
case reflect.Map:
replyv.Elem().Set(reflect.MakeMap(m.ReplyType.Elem()))
case reflect.Slice:
replyv.Elem().Set(reflect.MakeSlice(m.ReplyType.Elem(), 0, 0))
}
return replyv
}

如何调用方法

在 handleRequest 时,通过 service.call 利用反射来调用函数。这里主要学习这个用法。

graph LR
    A[Server.serveCodec]-->|获取请求信息| B[Server.readRequest] 
    A -->|调用方法| C[Server.handleRequest]
    C --> D[request.svc.call, 也就是 Service.call]
    D -->|使用反射调用| E[methodType.method.Func.Call]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// server.go
// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {
defer wg.Done()

err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用

if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// service.go
// call 调用 rcvr 变量的方法,例如 Foo.Sum
func (s *service) call(m *methodType, argv, replyv reflect.Value) error {
atomic.AddUint64(&m.numCalls, 1)
f := m.method.Func
// 用反射的方式调用方法
// 第一个参数是 rcvr 变量,例如 &foo,类似于 java 的 this,python 的 self
// 第 2 个参数是传入参数,第 3 个参数是传出参数,指针类型
returnValues := f.Call([]reflect.Value{s.rcvr, argv, replyv})
if errInter := returnValues[0].Interface(); errInter != nil {
return errInter.(error)
}
return nil
}

更深入了解反射

请看这篇 https://www.aimtao.net/go#11-reflect

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
# 代码结构
version_3_service
├── client.go
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go # 改动:增加真正将服务注册到 server stub 的步骤
├── server.go # 改动:增加 serviceMap 保存所有可以调用的方法,并对用户暴露服务注册的方法
├── service.go # 新增:将每个变量抽象成一个 service,并储存该变量的方法
└── service_test.go # 新增:测试 service 方法

4.超时处理

哪些地方需要超时处理

简单来看整个流程:

  • 客户端:

    1. 拨号连接 ✓
    2. 远程调用(发送数据)✓
    3. 等待服务端处理 ✓
    4. 接收返回的结果(接收数据)✓
  • 服务端:

    1. 端口监听
    2. 接收请求信息(接收数据)✓
    3. 调用方法处理请求信息(处理数据)✓
    4. 返回请求结果(发送数据)✓

以上打 ✓ 的步骤,均可能出现超时,主要为,

  • 客户端建立连接超时
  • 发送数据
    • 客户端/服务端写报文时超时
  • 处理数据
    • 服务端调用方法超时
  • 接收数据
    • 客户端等待服务端响应超时
    • 客户端/服务端读取报文超时

基于这个超时的情景,我们可以在以下三个地方设置超时处理机制。

客户端:

  • 建立连接
  • Client.call() 的整个过程(包含发送数据、等待处理、接收数据)

服务端:

  • Server.handleRequest() 的整个过程(包括处理数据、发送数据,接收数据先不管了)

建立连接的超时处理

启动一个 goroutine 来拨号建立连接,使用 select 设置超时器,阻塞等待拨号结果,如果在接收拨号结果之前,先收到了超时信号,则进行超时处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// client.go
type NewClientFunc func(conn net.Conn, opt *Option) (*Client, error)

func Dial(network, address string, opts ...*Option) (client *Client, err error) {
return dialTimeout(NewClient, network, address, opts...)
}

type dialResult struct {
client *Client
err error
}

func dialTimeout(f NewClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
// 默认使用 Gob 编码
opt, err := parseOptions(opts...)
if err != nil {
return nil, err
}

// 声明一个通道,用于传输拨号建立连接的结果
result := make(chan dialResult)

// 当发生错误时,保证 client 为 nil
defer func() {
if err != nil {
client = nil
}
}()

// 启动一个 goroutine 连接服务器,连接成功后,调用 f 创建 Client 实例,并将结果发送到 result 通道中
go func() {
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
if err != nil {
result <- dialResult{client: nil, err: err}
return
}
client, err = f(conn, opt)
result <- dialResult{client: client, err: err}
}()

// 超时处理,阻塞等待,等待超时或收到结果
select {
case <-time.After(opt.ConnectTimeout):
return nil, fmt.Errorf("rpc client: connect timeout: %v", opt.ConnectTimeout)
case result := <-result:
return result.client, result.err
}
}

超时的时长,设置在 Option struct 内,便于用户自定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// server.go

// Option 定义 Option 结构体,封装了 MagicNumber 和 CodecType 字段,从 conn 中解析出 Option 的信息,表示 RPC 消息的编码方式
type Option struct {
MagicNumber int
CodecType codec.Type
ConnectTimeout time.Duration // Client 建立连接的超时时间
HandleTimeout time.Duration // Client.Call() 整个过程的超时时间
}

var DefaultOption = &Option{
MagicNumber: MagicNumber,
CodecType: codec.GobType,
ConnectTimeout: time.Second * 10,
//HandleTimeout: time.Second * 10, // 默认为 0,不设置超时时间
}

Call 的超时处理

把选择权交给用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// client.go

// Call 同步调用,阻塞等待响应结果
func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))

// 等待响应结果/超时
select {
case <-ctx.Done():
client.removeCall(call.Seq)
return errors.New("rpc client: call failed: " + ctx.Err().Error())
case call := <-call.Done:
return call.Error
}
}

用户如何使用?

1
2
3
ctx, _ := context.WithTimeout(context.Background(), time.Second)
var reply int
err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)

handleRequest 超时处理

这里先不管读取数据了,仅对 handleRequest 做超时处理。handleRequest 其中又分为两个步骤,处理数据、发送数据,仅对处理数据做超时处理。

设置两个 channel,一个表示 req.svc.call 完成,一个表示 server.sendResponse 完成。当 req.svc.call 完成后,就不用管超时时间是否到达,继续让 server.sendResponse 发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// server.go

// handleRequest 处理请求:构造请求响应信息,调用 sendResponse 方法发送响应
func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
defer wg.Done()

called := make(chan struct{}, 1) // 设置缓冲区为 1,防止在超时后,无人接收 channel 数据,导致 channel 发送时阻塞,导致 goroutine 泄漏
sent := make(chan struct{}, 1) // 设置缓冲区为 1,防止在超时后,无人接收 channel 数据,导致 channel 发送时阻塞,导致 goroutine 泄漏


go func() {
err := req.svc.call(req.mtype, req.argv, req.replyv) // 调用
called <- struct{}{} // 调用完成, 不管是否超时,继续发送数据
if err != nil {
req.h.Error = err.Error()
server.sendResponse(cc, req.h, invalidRequest, sending)
sent <- struct{}{}
return
}
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
sent <- struct{}{}
}()

if timeout == 0 { // 没有超时时间,直接等待
<-called
<-sent
return
}

// 有超时时间,使用 select 等待超时或调用完成
select {
case <-time.After(timeout):
server.sendResponse(cc, req.h, invalidRequest, sending)
case <-called: // 如果调用完成,则不管超时时间,等待 sent(仅对 req.svc.call 做超时处理)
<-sent
}
}

测试代码

测试客户端处理连接超时的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// client_test.go
func TestClient_dialTimeout(t *testing.T) {
t.Parallel() // 设置测试项并行执行

f := func(conn net.Conn, opt *Option) (client *Client, err error) {
_ = conn.Close()
time.Sleep(time.Second * 2)
return nil, nil // 模拟了一个没有错误的客户端连接
}

l, _ := net.Listen("tcp", ":0")

// 测试客户端连接有超时处理的情况
t.Run("connect timeout", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: time.Second})
_assert(err != nil && strings.Contains(err.Error(), "connect timeout"), "expect a timeout error")
})

// 测试客户端连接没有超时处理的情况
t.Run("0", func(t *testing.T) {
_, err := dialTimeout(f, "tcp", l.Addr().String(), &Option{ConnectTimeout: 0})
_assert(err == nil, "0 means no limit")
})
}

测试客户端处理调用超时和服务端处理超时的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// server_test.go
type ServiceTemp int

// ServiceTemp 有一个方法 Timeout,该方法耗时2s
func (s ServiceTemp) Timeout(args int, reply *int) error {
time.Sleep(time.Second * time.Duration(args))
*reply = 0
return nil
}

func TestClient_Call(t *testing.T) {
t.Parallel()

addrCh := make(chan string)
go func(chan string) { // 启动一个服务器,监听 0 端口,注册 ServiceTemp 类型的对象,然后启动 Accept 方法,等待客户端连接
var s ServiceTemp
_ = Register(&s)
l, _ := net.Listen("tcp", ":0")
addrCh <- l.Addr().String()
Accept(l)
}(addrCh)
addr := <-addrCh

// 测试客户端处理调用超时的情况
t.Run("client call timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr)

ctx, _ := context.WithTimeout(context.Background(), time.Second) // 创建一个超时的 context,如果 1s 内没有返回结果,context.Done() 会传出信号 struct{}{}
var reply int
err := client.Call(ctx, "ServiceTemp.Timeout", 20, &reply) // 调用 ServiceTemp.Timeout 方法,Call 发生超时
_assert(err != nil && strings.Contains(err.Error(), ctx.Err().Error()), "expect a timeout error")
})

// 测试服务端处理超时的情况
t.Run("server handle timeout", func(t *testing.T) {
client, _ := Dial("tcp", addr, &Option{HandleTimeout: time.Second})
var reply int
err := client.Call(context.Background(), "ServiceTemp.Timeout", 20, &reply) // 调用 ServiceTemp.Timeout 方法,服务端处理超时
_assert(err != nil && strings.Contains(err.Error(), "handle timeout"), "expect a timeout error")
})
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
# 代码结构
version_4_timeout
├── client.go # 改动:增加客户端端超时处理的逻辑
├── codec
│   ├── codec.go
│   └── gob.go
├── go.mod
├── main
│   └── main.go
├── server.go # 改动:增加服务端超时处理的逻辑
├── service.go
└── service_test.go

5.支持 HTTP 协议

什么是支持 HTTP 协议

当前我们 RPC 框架是基于 TCP 协议的。通过监听端口,建立 TCP socket 连接。

1
2
3
// main.go
l, _ := net.Listen("tcp", ":9999") // 使用 TCP 协议
geerpc.Accept(l) // 等待建立连接
1
2
3
4
5
6
7
8
9
10
11
// server.go
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept() // 建立 socket 连接
if err != nil {
log.Println("rpc server: accept error: ", err)
return
}
go server.ServeConn(conn) // 使用 goroutine 处理连接
}
}

这种情况下,客户端只能通过建立 socket 连接与服务端通信。如果支持了 HTTP 协议是什么样子呢?

用户在浏览器访问 http://ip:port/xxx 就可以访问服务端的服务。

为什么需要支持 HTTP 协议

  • 兼容性:很多应用是基于 HTTP 协议的,RPC 框架支持 HTTP,便于兼容。
  • 方便调试:通过 HTTP 提供调试界面,可以实时监控RPC服务的状态和调用统计,方便开发和运维。
  • 安全性:HTTP 可以和 TLS/SSL 结合,提供安全的通信方式。

如何支持 HTTP 协议

这一块非常关键,决定你是否真正理解服务端这块的核心逻辑。(为了便于理解,我尽可以使用冗余的语言来描述)

  1. 首先要明确需求,我们支持 HTTP 协议,并不是要用 HTTP 替换掉之前的 TCP socket 的连接方式,而是要兼容二者。
  2. 其次要明确原理,HTTP 协议是基于 TCP 协议的,HTTP 是应用层协议,TCP 是传输层协议。HTTP 是在 TCP 上的一层封装。

所以,需要兼容二者,我既需要完成 HTTP 请求和响应,又需要拿到底层的 TCP socket 连接。 该怎么做呢?

对于 HTTP 请求和响应,

  • 客户端:客户端通过 net.DialTimeout 建立 TCP 连接,得到 net.Conn ,然后通过 net.Conn 向 RPC 服务器,发送 HTTP CONNECT 请求,并接收 HTTP 响应,确认连接成功。
  • 服务端:对于 HTTP 请求的监听,我们很熟悉,在 如何接管 HTTP 请求 中介绍过,我们的 Server 只需要实现 ServeHTTP(ResponseWriter, *Request) 方法就可以实现 Handler 接口,最后用 http.Handle() 方法将 Server 注册到对应的路由上,Server 就可以接管部分 HTTP 请求了(细节后面再讨论)。

对于 TCP 连接,

  • 客户端:无需额外做什么,使用 net.DialTimeout 建立的已经就是 TCP 连接。

  • 服务端:

    • 在之前我们是通过 net.Listen() 监听端口、 net.Listener.Accept() 阻塞等待连接完成的,最关键的是最后每个连接都返回一个 net.Conn 用来传输数据,这是建立的 TCP 连接。

    • 现在是监听 TCP 端口,并启动 HTTP 服务器,服务端只能响应 HTTP 请求,无法获得 TCP 连接。

    • 既然 HTTP 是基于 TCP 封装的,那 HTTP 请求能否拿到底层 TCP 的 net.Conn 呢?如果可以拿到,我们就可以根据请求类型,来选择是响应 HTTP 请求,还是建立 TCP 连接。

    • 答案是当然可以拿到。

      1
      func (w *response) Hijack() (rwc net.Conn, buf *bufio.ReadWriter, err error)

最后梳理一下,基于通信流程梳理需要做哪些事情:

  1. 服务端监听端口,并启动 HTTP 服务器。
  2. 客户端 Dial 建立 TCP 连接,获得 net.conn,并向 RPC 服务器,发送 HTTP CONNECT 方法请求
  3. 服务端响应 HTTP CONNECT 方法请求,并通过 Hijack 劫持 HTTP 底层的 TCP 连接。
  4. 客户端使用建立的连接,发送 RPC 报文(先发送 option,再发送 RPC 报文)
  5. 服务端处理 RPC 请求,并同时可以响应 HTTP 请求。

服务端支持 HTTP 协议

实现 ServeHTTP 函数接管 /_geeprc_ 路由的 HTTP 请求处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// server.go

const (
connected = "200 Connected to Gee RPC"
defaultRPCPath = "/_geeprc_"
)

// Server 实现 ServeHTTP,并使用 http.Handle 注册到对应路由中。
func (server *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "CONNECT" { // "/_geeprc_" 理由地址只响应 CONNECT 请求
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = io.WriteString(w, "405 must CONNECT\n")
return
}

// 从 HTTP 请求中,劫持底层的 TCP 连接
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", r.RemoteAddr, ": ", err.Error())
return
}
_, _ = io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn) // 处理 TCP 连接的 RPC 报文
}

// HandleHTTP 将 Server 注册到 defaultRPCPath 路由上
func (server *Server) HandleHTTP() {
http.Handle(defaultRPCPath, server) // 使用 Server.ServeHTTP 方法处理 defaultRPCPath 路由
}

func HandleHTTP() {
DefaultServer.HandleHTTP()
}

用户如何启动 HTTP 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// main.go

func startServer(addr chan string) {

var foo Foo // 实例化 Foo 类型的对象
if err := geerpc.Register(&foo); err != nil { // 注册 Foo 类型的对象,注册的是 Foo 类型的对象,不是 Foo 类型的方法
log.Fatal("register error:", err)
}

l, err := net.Listen("tcp", ":0")
if err != nil {
log.Fatal("network error:", err)
}
log.Println("start rpc server on", l.Addr())
geerpc.HandleHTTP() // 注册静态路由
addr <- l.Addr().String()
_ = http.Serve(l, nil) // 启动 HTTP 服务,不使用 accpet
}

客户端支持 HTTP 协议

1
2
3
4
5
6
7
8
9
// client.go

func dialTimeout(f NewClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
//...
conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
//...
client, err = f(conn, opt)
//...
}

在 dialTimeout 过程中,先会获得了一个 TCP 连接 net.conn,原本的逻辑调会用 NewClient,利用该连接,生成一个 Client 使用。

但是现在服务端想同时响应 HTTP 请求和TCP 连接,服务端只启动 HTTP 服务,不进行 Accpet,所以需要利用该 TCP 连接,向服务端发送一条 HTTP 请求,让服务端有机会从 HTTP 请求中,拿到底层的 TCP 连接。

什么时候发送呢?在 NewClient 之前。发送请求并接收响应后,再正常调用 NewClient 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// client.go

// NewHTTPClient 做所的事,就是在 NewClient 之前,向 conn 发送一条 HTTP 请求 “CONNECT /_geeprc_ HTTP/1.0”,并获得响应
// 服务端 收到这条 HTTP CONNECT 请求后,会做出回复,并拿到 HTTP 底层的 conn 连接。所以服务器可以响应 HTTP 请求,又可以通过 conn 这个 tcp socket 通信
func NewHTTPClient(conn net.Conn, opt *Option) (*Client, error) {
_, _ = io.WriteString(conn, fmt.Sprintf("CONNECT %s HTTP/1.0\n\n", defaultRPCPath))

response, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"}) // 读取服务端响应
if err == nil && response.Status == connected {
return NewClient(conn, opt)
}
if err == nil {
err = errors.New("unexpected HTTP response: " + response.Status)
}
return nil, err
}

为了用户更方便的调用,为 NewHTTPClient 封装两个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// client.go

func DialHTTP(network, address string, opts ...*Option) (*Client, error) {
return dialTimeout(NewHTTPClient, network, address, opts...)
}

func XDial(rpcAddr string, opts ...*Option) (*Client, error) {
parts := strings.Split(rpcAddr, "@")
if len(parts) != 2 {
return nil, fmt.Errorf("rpc client err: wrong format '%s', expect protocol@addr", rpcAddr)
}
protocol, addr := parts[0], parts[1]
switch protocol {
case "http":
return DialHTTP("tcp", addr, opts...)
default:
// tcp, unix or other transport protocol
return Dial(protocol, addr, opts...)
}
}

用户如何使用客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// main.go

func call(addrCh chan string) {
client, _ := geerpc.DialHTTP("tcp", <-addrCh)
defer func() { _ = client.Close() }()

time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := &Args{Num1: i, Num2: i * i}
var reply int
if err := client.Call(context.Background(), "Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Printf("%d + %d = %d", args.Num1, args.Num2, reply)
}(i)
}
wg.Wait()
}

一些问题

  • 客户端只想建立 TCP 连接,还需发送 HTTP 请求吗?

    • 需要的。为什么?客户端通过 Dial 就可以拿到 TCP 连接,但是这个 TCP 连接如果没有服务器 Accept,是没有办法和服务器进行通信的(这是因为什么下面会讲)。所以是否需要发送 HTTP 请求,这取决于服务端使用什么样的监听方式。
    • http.Serve(l, nil) 如果服务端启动 HTTP 服务器,就需要使用 DialHTTP 方法去拨号,在这个过程中,客户端会自动帮用户发送 HTTP CONNECT 请求,服务端会通过 HTTP 劫持到底层 TCP 连接。
    • geerpc.Accept(l) 如果服务端启动 TCP socket 监听,就可以直接 Dial 拨号,建立 TCP 连接。
  • 为什么通过 Dial 可以拿到 TCP 连接,服务器不 Accept 二者无法通信?

    • TCP 连接的本质:
      • 内核层连接 Dial 成功 = TCP 三次握手完成
      • 连接在内核层面进入 ESTABLISHED 状态
      • 但应用层无法直接使用该连接
    • 应用层通信必备条件:
      • 服务端必须调用 Accept
    • Accept 的作用:
      • 将内核连接转交应用层
      • 创建应用层 socket
      • 分配通信资源(缓冲区等)
  • 为什么 http.ReadResponse 可以读出数据?

    1
    response, err := http.ReadResponse(bufio.NewReader(conn), &http.Request{Method: "CONNECT"})

    bufio.NewReader(conn) 创建缓冲读取器包装原始连接,读到服务端给客户端 HTTP CONNECT 请求的回复。

一个实例

上面这一些操作仅服务端仅实现了响应 “/geeprc” 路由的 CONNECT 请求。

下面实现一个用于客户端 debug 用的路由 “/debug/geerpc”。

(1)创建 debugHTTP,结构体中嵌入 Server。

为什么要嵌入:这样 debugHTTP 就可以拿到 Server 实例里的方法名和方法的调用次数。

1
2
3
4
5
// debug.go

type debugHTTP struct {
*Server
}

(2)实现 debugHTTP 的 ServeHTTP,来处理 HTTP 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// debug.go

const debugText = `<html>
<body>
<title>GeeRPC Services</title>
{{range .}}
<hr>
Service {{.Name}}
<hr>
<table>
<th align=center>Method</th><th align=center>Calls</th>
{{range $name, $mtype := .Method}}
<tr>
<td align=left font=fixed>{{$name}}({{$mtype.ArgType}}, {{$mtype.ReplyType}}) error</td>
<td align=center>{{$mtype.NumCalls}}</td>
</tr>
{{end}}
</table>
{{end}}
</body>
</html>`

var debug = template.Must(template.New("RPC debug").Parse(debugText))

type debugService struct {
Name string
Method map[string]*methodType
}

// Runs at /debug/geerpc
func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Build a sorted version of the data.
var services []debugService
server.serviceMap.Range(func(namei, svci interface{}) bool {
svc := svci.(*service)
services = append(services, debugService{
Name: namei.(string),
Method: svc.method,
})
return true
})
err := debug.Execute(w, services)
if err != nil {
_, _ = fmt.Fprintln(w, "rpc: error executing template:", err.Error())
}
}

(3)注册静态路由

1
2
3
4
5
6
7
8
9
10
11
12
13
// server.go

const (
connected = "200 Connected to Gee RPC"
defaultRPCPath = "/_geeprc_"
defaultDebugPath = "/debug/geerpc" // debug 的路由地址
)

func (server *Server) HandleHTTP() {
http.Handle(defaultRPCPath, server)
http.Handle(defaultDebugPath, debugHTTP{server}) // 注册 debug 路由
log.Println("rpc server debug path:", defaultDebugPath)
}

(4)在 main.go 执行后,可以访问目标端口的“/debug/geerpc”,可以看到各个方法被远程调用了多少次。

1
2
3
4
Service Foo
-------------------------------
Method Calls
Sum(main.Args, *int) error 5

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 代码结构
version_5_http
├── client.go # 改动:增加 DialHTTP 方法(发送 HTTP CONNECT 请求)
├── client_test.go # 新增:增加测试 XDial 的测试
├── codec
│   ├── codec.go
│   └── gob.go
├── debug.go # 新增:增加 debugHTTP,并接管部分请求
├── go.mod
├── main
│   └── main.go # 改动:服务端使用 HTTP 服务器,客户端使用 DialHTTP 拨号
├── server.go # 改动:ServeHTTP 接管 HTTP 请求,使用 HTTP 请求劫持 TCP 连接
├── service.go
└── service_test.go

从零实现系列|RPC
https://www.aimtao.net/7days-rpc/
Posted on
2024-10-18
Licensed under