从零实现系列|分布式缓存

本文逐步实现了一个分布式缓存系统,涵盖LRU缓存淘汰策略、单机并发控制、HTTP服务接口及一致性哈希算法。通过封装缓存值、互斥锁机制和虚拟节点映射,解决了并发安全、节点负载均衡等核心问题,构建了支持分布式部署的缓存框架。

通过回答关键问题的方式,记录阅读 geeCache 代码过程中的思考,并做出补充和改进,重点在“怎么做”的基础上,补充“为什么这么做”等逻辑。

0.序言

为什么需要 cache

  • 优化性能:将耗时操作的结果储存起来,下次需要请求,直接使用,避免重复耗时操作。
  • 降低延迟:静态资源缓存(CDN/浏览器缓存),提高访问速度。
  • 转移负载:将高频访问的数据,从数据库迁移到内存中,降低后端压力。
  • 高并发支撑:通过内存级的响应能力支持大规模的并发请求,避免数据库被击穿。

cache 要解决哪些问题

  • 怎么存储?维护一个键值对缓存,实现 O(1) 的存取速度。
  • 内存不够?实现一种缓存的淘汰策略。
  • 并发冲突?对缓存的修改,实现并发保护。
  • 单机性能差?实现可以横向扩展的分布式缓存。
    • 分布式缓存存在多个节点,节点间如何通信?需要支持 HTTP、protobuf。
    • 如何用 O(1) 的速度找到 cache 对应的节点?使用一致性哈希来选择节点,实现负载均衡。

1.缓存淘汰策略

介绍三种常见的缓存淘汰算法

缓存储存在内存中,内存是有限的,对于没用的数据就需要移出。三种常见的缓存淘汰算法。

FIFO(First In First Out)

  • 新进先出,按照时间顺序,优先淘汰最早添加的数据。
  • 实现:维护一个先进先出队列,新数据插入队尾,淘汰时移除队首数据。
  • 缺点:无法感知访问频率,即使高频访问的数据,若进入缓存较早仍会被淘汰,可能导致缓存命中率下降。

LFU(Least Frequently Used)

  • 最少使用,按照访问频率,优先淘汰访问次数最少的数据。
  • 实现:维护一个按照访问次数排序的最小堆/双层链表。每次访问,访问次数+1,重新排序。淘汰时,直接淘汰队首访问次数最少的即可。
  • 优点:缓存命中率高。
  • 缺点:
    • 维护成本高:每次访问都需要更新计数,并重新排序。
    • 受历史数据影响大:早期高频但后期失效的数据(比如过时的热点新闻)难以被淘汰。(解决:定期衰减历史计数。)

LRU(Least Recently Used)

  • 基于时间局部性原理,认为最近被访问过的,更有可能被再次访问。那么最近没使用过的数据,就是要淘汰的数据。
  • 实现:维护一个队列,某个数据被访问,就将数据移至队尾,这样队首则是最近最少被访问的数据。
  • 优点:FIFO 考虑时间因素,LFU 考虑访问频率,LRU 既考虑时间,也考虑频次(多次访问,意味着多次被移到队尾)。

如何设计 cache

cache 数据结构设计思路

  • 使用哈希表和双向链表,来实现 LRU 淘汰策略的缓存。

    • 哈希表:通过 key 可以 O(1) 的复杂度获取 value
    • 双向链表:维护访问顺序,最近访问的节点移动到链表头部,队尾节点是“最近最少使用”的数据,即将被淘汰的数据。
  • value 储存在 entry 的结构体中。每个 entry 结构体都是双向链表上的一个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// geecache/lru/lru.go

type Cache struct {
maxBytes int64 // 最大的内存
nBytes int64 // 已经被用作缓存的内存
ll *list.List // 双向链表
cache map[string]*list.Element // 用 map 查找链表的节点
OnEvicted func(key string, value Value) // 删除缓存节点的回调函数
}

// 链表节点的数据类型
type entry struct {
key string // 保存 key,删除链表缓存时,便于同步删除 map 中的 k/v 记录
value Value // 真正的缓存数据,允许 Value 是任何类型
}

type Value interface {
Len() int // Len() 表示该数据的内存大小, 添加数据前,应该手动实现该数据类型的 Len 函数
}

// 初始化 cache
func New(maxBytes int64, onEvicted func(key string, value Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}

为什么 value 不直接放在链表里,要放在 entry 结构体里?

  • 淘汰缓存时,可以直接从 entry 结构体中拿到 key,用来删除哈希表中的映射。

为什么要新建一个 Value 接口类型,并包含 Len 方法?

  • 缓存的数据可以什么格式都有,用接口变量储存最好,即 interface{}
  • 因为整个 cache 是有大小的,所有有需要查询所有缓存数据的大小。所以在 interface{} 中增加一个 Len方法,只有实现了这个方法的变量,才能作为接口类型 Value 的变量储存在 cache 中。
  • 下面举个例子,如歌 entry 的 value 变量需要储存 String 变量,String 需要主动实现 Len 方法,也就是实现 Value 接口。
1
2
3
4
5
type String string

func (s String) Len() int {
return len(s)
}

如何实现 cache 增删改查

增加

  • 检查是否存在,存在即更新,不存在则增加缓存
  • 维护缓存大小,超过最大值需要淘汰缓存。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// geecache/lru/lru.go

// Add 增加缓存数据
func (c *Cache) Add(key string, value Value) {
if _, ok := c.cache[key]; ok {
c.Update(key, value)
} else {
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nBytes += int64(len(key)) + int64(value.Len())
for c.maxBytes != 0 && c.maxBytes < c.nBytes {
c.RemoveOldest()
}
}
}

删除

  • 链表删除尾部节点,map 删除 key,更新 cache 总体大小,调用回调函数(回调函数是初始化时传入的)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// geecache/lru/lru.go

// RemoveOldest 删除链表尾部的缓存数据
// 用户并不需要调用,Add/Update时,可用空间不够,才会删除
func (c *Cache) RemoveOldest() {
ele := c.ll.Back()
if ele != nil {
c.ll.Remove(ele)
kv := ele.Value.(*entry) // 先获取链表节点的 Value 字段,类型是 any,并转化为 *entry 类型
delete(c.cache, kv.key)
c.nBytes -= int64(len(kv.key)) + int64(kv.value.Len())
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
}

修改

  • 查询是否存在,将节点移至首部(表示频率),更新 cache 总体大小,更新节点的缓存值。
  • 注意:更新缓存还需要看缓存是否超过最大容量,如果超过需要淘汰缓存,下面代码暂时不写这个逻辑了。
1
2
3
4
5
6
7
8
9
10
11
12
13
// geecache/lru/lru.go

// Update 更新缓存数据
func (c *Cache) Update(key string, value Value) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
c.nBytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value
} else {
c.Add(key, value)
}
}

查询

  • 查询是否存在,将节点移至首部(表示频率),返回节点的缓存值。
1
2
3
4
5
6
7
8
9
10
11
// geecache/lru/lru.go

// Get 获取缓存数据
func (c *Cache) Get(key string) (value Value, ok bool) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}

ele.Value.(*entry) 的写法?

  • ele 是 *list.Element 类型的变量,表示链表的一个节点。Element 变量中有 Value 变量,Value 变量类型是 any。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// container/list/list.go

package list

// Element is an element of a linked list.
type Element struct {
// Next and previous pointers in the doubly-linked list of elements.
// To simplify the implementation, internally a list l is implemented
// as a ring, such that &l.root is both the next element of the last
// list element (l.Back()) and the previous element of the first list
// element (l.Front()).
next, prev *Element

// The list to which this element belongs.
list *List

// The value stored with this element.
Value any
}
  • Element.Value 变量中存储的,实际上是我们的 entry 类型的结构体变量的指针。
  • 这里写法是断言,将 any 类型的 Element.Value 变量还原为 *entry 类型变量。

完整代码

1
2
3
4
5
version_1_LRU[geecache]
├── go.mod
└── lru
├── lru.go
└── lru_test.go

2.单机并发缓存

如何设计并发安全的 cache?

主要做两件事:加锁 + 保证数据不可变性

对互斥操作进行加锁

  • lru.Cache 的基础上,封装一个 cache 结构体,将 lru.Cache 的操作都加互斥锁。(注意不能是读写锁,因为读数据,也要移动链表节点)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// geecache/cache.go

// 为什么要封装 cache?
// 加锁,保证对 lru.Cache 的并发安全
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}

// add 并发安全地添加缓存
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil) // 懒加载,延迟初始化 lru.Cache
}
c.lru.Add(key, value)
}

// get 并发安全地获取缓存
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}

return
}

保证数据不可变性(只读)

  • 上面代码中,无论是 add 还是 get 操作的数据类型都是 ByteView,也就是说,缓存的类型被规定成了 ByteView,这是为什么呢?
  • 其实就是为了保证数据的不可变性, ByteView 的定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// geecache/byteview.go

package geecache

type ByteView struct {
b []byte // slice
}

func (byteView ByteView) Len() int { // ByteView 实现 lru.Value 接口
return len(byteView.b)
}

// 为什么 ByteSlice 要返回拷贝?
// 防止缓存值被外部修改,这里直接返回拷贝
func (byteView ByteView) ByteSlice() []byte {
return cloneBytes(byteView.b) // 为什么不直接使用 make,要封装 cloneBytes 函数
}

// 为什么需要 cloneBytes ?
// 因为 []byte 是切片,传递时,不会深拷贝,传递的是视图,底层数据会被外界修改
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}

func (byteView ByteView) String() string {
return string(byteView.b)
}

为什么要抽象 ByteView 数据类型来表示缓存值?

  1. 在 lru 中定义的 entry.value 是 Value 接口类型,所有传入的数据的类型均需要实现这个接口,也就是实现 Len 函数,比较麻烦。[]byte 可以支持任何数据类型的存储,并为 []byte 抽象为 ByteView 类型,并实现 lru.Value 接口。
  2. 保证 ByteView 是只读类型。如何保证?
    • ByteView 的变量只有一个 bb 是小写,外部无法直接读取。
    • 只能通过 ByteSliceString 方法获取 b 的数据
      • ByteSlice 返回 slice 时,会拷贝一个副本再返回
      • String 将 b 的数据强制转换为 String。(外界也无法直接修改 b 的值)

为什么 get 操作也要加锁?

想象两个场景:

  1. cache 在 add 函数中,初始化 lru.Cache 的过程是懒加载,lru.Cache 初始化过程中,还为初始化完,如果这时候访问 Get,会导致空指针的情况。
  2. lru.cache 在写数据时,需要移动节点到首部,如果这时候去访问,会同时移动链表节点到首部,会造成链表断裂,节点丢失。

也就是说,需要保证 lru.cache 初始化的原子性,以及 lru.cache 内部操作的原子性。

为什么要延迟初始化(Lazy Initialization)?

1
2
3
4
5
6
7
8
9
10
11
12
// geecache/cache.go

// add 并发安全地添加缓存
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil) // 懒加载,延迟初始化 lru.Cache
}
c.lru.Add(key, value)
}
  • 节省内存:只有使用时才初始化,没使用不初始化。
  • 减少初始化 cache 时的开销,加块整个程序的冷启动速度。

为什么要设计 Group?

在使用 cache 对 lru.Cache 进行封装后,又使用 Group 对 cache 进行了封装。这是为什么呢?

其实站在使用者的角度,只有一个并发安全的 cache,会存在很多问题没法解决:

  • 不同的数据因为大小、有效期等不同,需要放到不同的 cache 实例中,如何标识不同的 cache 实例呢?需要一个名字/ID。
  • 当前机器没有找到的数据,需要访问其他分布式节点或者回源(可能是多种数据源)。这个过程如何统一呢?

为此,我们使用分层设计的思想,再封装一层,cache 负责并发存储,Group 负责业务逻辑。

有哪些业务逻辑?

  • 为 cache 实例标识一个命名空间。
  • 为 cache 实例提供回源方法的接口。
  • 注:还可以在 group 中,针对 cache 实例增加各种缓存策略。

如何设计 Group?

1
2
3
4
5
6
7
8
9
10
11
12
13
// geecache/geecache.go

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string // 一个 Group 是一个命名空间,并有唯一的 name。比如可以创建多个 Group,储存不同类别的信息。
getter Getter // 当缓存未命中时,可以调用 Getter.Get 这个回调函数获取值,并储存在缓存中。
mainCache cache // 并发安全的缓存
}

var (
groupMu sync.RWMutex // 全局读写锁
groups = make(map[string]*Group) // 全局缓存组注册表
)

通过 groups 进行全局管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// geecache/geecache.go

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}

groupMu.Lock()
defer groupMu.Unlock()

g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}

func GetGroup(name string) *Group {
groupMu.RLock() // 为什么是读锁?保证写操作的原子性
g := groups[name]
groupMu.RUnlock()
return g
}

统一管理缓存回源逻辑

首先是回源接口,

  • Group 只要获取不到数据,都通过 group.getter.Get 来回源。但是数据源数据有很多,可能是文件,可能是数据库。所以需要抽象一个接口。
  • 只要实现了 Get 方法,都可以作为 Getter 接口类型变量,供 group 回源时调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// geecache/geecache.go

// Getter 用于从各类的外部数据源获取数据
type Getter interface {
Get(key string) ([]byte, error)
}

// GetterFunc 为了便于使用者传入匿名函数到 Getter 中
// 使用时,只需将匿名函数转化为 GetterFunc 类型,即可传入 Getter
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

有了回源接口,需要实现 group.Get 的逻辑,参考这个流程图。

graph LR
    A[接收 key] --> B[是否被缓存?]
    B -->|是| C[返回缓存值-步骤1]
    B -->|否| D[是否应从远程节点获取?]
    D -->|是| E[与远程节点交互]
    E --> F[返回缓存值-步骤2]
    D -->|否| G[调用回调函数]
    G --> H[获取值并添加到缓存]
    H --> I[返回缓存值-步骤3]
  • Get 完成步骤1
  • load + getLocally 完成步骤3
  • 步骤二在第5章节《分布式节点》中实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}

// 缓存没有命中,就从其他数据源中获取
return g.load(key)
}

// 当缓存不存在,Get -> load -> getLocally 获取数据
// 为什么不直接使用 getLocally,还要封装一个 load ?
// load 会先从 远程分布式节点获取,获取不到才会使用 getLocally。此处属于预留设计。
func (g *Group) load(key string) (ByteView, error) {
return g.getLocally(key)
}

// getLocally 调用用户的回调函数 g.getter.Get,获取数据,并使用 populateCache 添加数据
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}

// populateCache 将获取到的数据添加到缓存中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

完整代码

1
2
3
4
5
6
7
8
9
10
version_2_singal_node[geecache]
├── geecache
│ ├── byteview.go # 缓存值的抽象与封装
│ ├── cache.go # 并发控制
│ ├── geecache.go # 负责与用户交互,并拥有从外部数据源获取缓存并存储的功能
│ ├── geecache_test.go
│ └── lru
│ ├── lru.go
│ └── lru_test.go
└── go.mod

3.HTTP 服务器

这部分比较简单,主要是启动一个 http 服务,处理 URL,获取到 group name 和 key,返回缓存查询到的结果。

完整代码

1
2
3
4
5
6
7
8
9
10
11
version_3_http_server[geecache]
├── geecache
│ ├── byteview.go
│ ├── cache.go
│ ├── geecache.go
│ ├── geecache_test.go # 新增 http 测试
│   ├── http.go # 新增从 HTTP 服务器获取 cache
│ └── lru
│ ├── lru.go
│ └── lru_test.go
└── go.mod

4.一致性哈希

该访问哪个缓存节点?

  • 分布式缓存,也就是有多个机器,每个机器存储一部分数据。但当一个请求过来,该去哪个节点查询缓存呢?
  • 解决:通过哈希算法对 key 进行计算,并对节点个数进行取余。这样每个 key 都会固定映射到一个节点中。

缓存节点数量调整怎么办?

  • 缓存运行过程中,如果有节点增加/减少,hash(key) % N,这里的 N 会变为 N-1,大部分缓存都会失效,在原来的节点中找不到,会引发缓存雪崩。
  • 解决:一致性哈希可以解决这个问题。

一致性哈希是什么?

  • 一致性哈希是一个哈希环,通过哈希算法将 key 和节点映射到 0 到 2³²−1 的环形空间。(为什么是 2³²?因为大部分哈希计算出的值都是 32 位的,其实也可以用 64 位的,只不过 2³² 是40亿,足够应对哈希冲突了。)
  • 每个 key 归属于顺时针找到的第一个节点。
  • 增加节点时,只有新增节点区域的 key 需要迁移至新节点。减少节点也是同理。

add_peer

数据倾斜问题怎么处理?

  • 如上图左侧哈希环中,key2、key11、key27 都在节点 peer2 上,这就是数据倾斜,大部分数据储存在少部分几个节点中。
  • 为什么会数据倾斜?因为节点在哈希环上分布不均。所以我们使用更多的虚拟节点将环划分为更小的区间,每个真实节点对应多个虚拟节点。这样就不会因为真实节点分布不均,导致的大段连续区间被单一节点垄断的问题。

如何实现一致性哈希?

主要有三件事:构建哈希环、节点数量变化的处理、查找 key 所在节点。

(1)构建哈希环

哈希环就是map,

  • 支持自定义哈希函数,在初始化时,使用者传入,不传入则默认使用 crc32。
  • 设置虚拟节点,虚拟节点个数是真实节点的 replicas 倍。
  • 将所有虚拟节点有序排列,储存在数组 keys 里,这就是哈希环本体。
  • 虚拟节点与真实节点的映射保存在 hashMap 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Hash func(data []byte) uint32 // hash 函数类型

type Map struct { // 一致性哈希算法的主要数据结构
hash Hash // 设置自定义一种哈希算法函数
replicas int // 虚拟节点的倍数
keys []int // 哈希环,有序储存所有虚拟节点
hashMap map[int]string // 虚拟节点和真实节点的映射表,key 是虚拟节点的哈希值,value 是真实节点的名称
}

func New(replicas int, fn Hash) *Map {
m := &Map{
hash: fn,
replicas: replicas,
hashMap: make(map[int]string),
}
if m.hash == nil { // 默认使用 crc32.ChecksumIEEE 哈希算法
m.hash = crc32.ChecksumIEEE
}
return m
}

(2)节点数量变化的处理

暂时不考虑已有缓存迁移的问题,仅考虑节点数量变化后,能否找到对应的真实节点。

加入节点:

  • 加入一个真实节点,实际操作是,加入 replicas 个虚拟节点。
  • 先都加入到 keys 中,再排序
1
2
3
4
5
6
7
8
9
10
11
12
// Add 在哈希环 Map.keys 中加入真实节点
// 节点名和虚拟节点名经过 hash 计算后得到哈希值,将哈希值加入 Map.keys 中,并排序。
func (m *Map) Add(keys ...string) {
for _, key := range keys { // 对于每个真实节点,添加 m.replicas 个虚拟节点。
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key))) // 计算虚拟节点的 hash 值
m.keys = append(m.keys, hash) // 虚拟节点加入环
m.hashMap[hash] = key // 将虚拟节点和真实节点加入映射表
}
}
sort.Ints(m.keys) // 对虚拟节点排序
}

删除节点:

  • 删除真实节点与虚拟节点的映射
  • 重新构建哈希环(仅保留未被删除的哈希值)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Del 在哈希环 Map.keys 中减少真实节点对应的虚拟节点
func (m *Map) Del(keys ...string) {

// 删除真实节点与虚拟节点的映射
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
delete(m.hashMap, hash)
}
}

// 重新构建哈希环(仅保留未被删除的哈希值)
var newKeys []int
for hash := range m.hashMap {
newKeys = append(newKeys, hash)
}
sort.Ints(newKeys)
m.keys = newKeys
}

(3)查找 key 所在节点

  • 最初的定义是,每个 key 归属于顺时针找到的第一个节点。
  • 给定一个 hash,他归属的节点就是第一个大于等于 hash 的节点。
  • 通过二分查找来寻找第一个大于等于 hash 的节点,注意需手动传入判断函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Get 获取 key 所在节点名
// key 经过 hash 计算后得到哈希值,在哈希环 Map.keys 上查找最接近的节点。
// 例如:key 的哈希值是 10000,哈希环上找到最接近的两个节点是 8000、11000,应该存在 8000 这个节点上。
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}

hash := int(m.hash([]byte(key)))

// 二分查找虚拟节点
index := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash // 找到第一个大于等于 hash 的节点,没有则返回 len(m.keys)
})

return m.hashMap[m.keys[index%len(m.keys)]]
}
  • 同时来了解一下,sort.Search 函数,二分查找,找到第一个符合条件的索引。什么条件?就是传入的函数为 true。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func Search(n int, f func(int) bool) int {
i, j := 0, n
for i < j {

// 因为i、j是int, (i+j)/2 在 i+j > 2^31-1 时会溢出,先转化为无符号整型计算可以防止溢出。
h := int(uint(i+j) >> 1)
// i ≤ h < j
if !f(h) {
i = h + 1 // h 不满足条件,答案在 h 的右侧。[h + 1, j)
} else {
j = h // h 满足条件,答案在 h 的左侧。[i, h)
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
version_4_consistent_hashing[geecache]
├── geecache
│ ├── byteview.go
│ ├── cache.go
│   ├── consistenthash
│   │   └── consistent_hash.go # 新增一致性哈希
│ ├── geecache.go
│   ├── geecache_test.go
│   ├── http.go
│   └── lru
│   ├── lru.go
│   └── lru_test.go
└── go.mod

5.分布式节点

这一章节作者原文站在架构的角度自上而下描述的,看的时候未免有些疑惑,比如为什么上来要抽象两个接口。下面我会按照实用主义角度来回答,“基于什么需求,所以这么做”,以作为补充。

分布式缓存要做哪些工作?

整个分布式缓存,要做的工作就是实现下面这个图。

graph LR
    A[接收 key] --> B[是否被缓存?]
    B -->|是| C[返回缓存值-步骤1]
    B -->|否| D[是否应从远程节点获取?]
    D -->|是| E[与远程节点交互]
    E --> F[返回缓存值-步骤2]
    D -->|否| G[调用回调函数]
    G --> H[获取值并添加到缓存]
    H --> I[返回缓存值-步骤3]
  • 在第二章节,我们完成了单个节点的功能:查找本地缓存(步骤1);找不到时进行回源储存(步骤3)。
  • 在第三章节,我们解决了单个节点如何为其他节点提供服务的问题。
  • 在第四章节,我们解决了多个节点的情况下,查找缓存该查哪个节点的问题。
  • 有了以上铺垫,我们该完成上图中的步骤二了:与远程节点交互,查找远程节点的数据。
graph LR
    A[接收 key] --> B[是否被缓存?]
    B -->|是| C[返回缓存值-步骤1]
    B -->|否| E[使用一致性哈希选择节点,是否从远程节点获取?]
    E -->|是| G[HTTP 客户端访问远程节点]
    G -->|成功| I[返回缓存值-步骤2]
    G -->|失败| K[调用回调函数]
    K --> L[获取值并添加到缓存]
    L --> M[返回缓存值-步骤3]
    E -->|否| K

补充一些步骤二的细节,如上图。

站在使用者的角度,当我拿到 Group 之后,要完成步骤二,我需要哪些方法供我调用?

  1. 方法一:通过 Key 查找到属于哪个远程节点。
  2. 方法二:访问远程节点,获取缓存。
  3. 方法三:远程节点也没有,则调用本地的回调函数。

所以 Group 声明了两个接口,PeerPicker 为 Group 提供方法一,PeerGetter 为 Group 提供方法二,方法三在原有 group.load 逻辑中改写。

1
2
3
4
5
6
7
8
9
10
11
// geecache/geecache.go

// PeerPicker 协助 Group 通过 key 选择远程节点
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool) // 根据 key 选择节点 PeerGetter
}

// PeerGetter 获取到的远程节点,协助 Group 从远程节点获取缓存值
type PeerGetter interface {
Get(group string, key string) ([]byte, error) // 获取缓存值
}

Group 只需要拿着口变量调用接口的方法,就可以完成整体逻辑。

至于这两个接口是谁来实现,用什么协议实现,Group 并不关心,我就规定了这几个方法,谁实现了我用谁完成。所以可能是 HTTPPool 用 HTTP 协议实现的,也可能是 HTTPPool 用 protobuf 实现的。

就比如你是一个 leader,你有一项工作,你分为part 1、part2,你只需要说出你最终想要什么。你也不管谁来做,具体怎么做。比如张三李四说他们有能力做,那你要做的就是合并张三李四成果,向上汇报就行。

这样做有什么好处?领导轻松

  • 业务与实现分离,自己做自己的事情:
    • Group 调用各种方法,完成业务逻辑。
    • HTTPPool 实现 Group 规定的两个方法。
  • 依赖倒置,Group 不依赖 HTTPPool,HTTPPool 变成可插拔的了,方便自定义修改。(也就是上面例子里,把张三李四换成王五,让王五来干)

如何实现 Group 访问远程节点的逻辑?

  • 首先将 PeerPicker 接口变量,放在 Group 结构体中,并提供注入方法,便于 Group 后续调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// geecache/geecache.go

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string
getter Getter
mainCache cache

peers PeerPicker // 用于选择远程节点
}

// RegisterPeers 将实现了 PeerPicker 接口的变量注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
  • 在查找缓存时,仅在 load 函数里增加去远程节点查找的逻辑。
  • 去远程节点查找的逻辑更为简单。(就像领导直接安排张三李四干活一样)
    • 先调用 PeerPicker.PickPeer 找到远程节点,并拿到访问这个节点的客户端。
    • 再调用 PeerGetter.Get 给远程节点发送请求,获取缓存值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// geecache/geecache.go

// load: 缓存不存在时,先在远程节点中查找,未果再调用 getLocally 获取数据
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {

// 新增逻辑
if peer, ok := g.peers.PickPeer(key); ok { // 找到 key 对应的远程节点。
value, err := g.getFromPeer(peer, key) // 在远程节点中查找
if err == nil {
return value, nil
}
log.Println("[GeeCache] failed to get from peer", err)
}

}
return g.getLocally(key)
}

// 从远程节点获取数据,传入的是 PeerGetter 接口类型,只要实现了 Get 方法,就可以传入
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{bytes}, nil
}

ok,至此,Group 的工作就做完了,只管调用 PeerPicker.PickPeerPeerGetter.Get 的方法,这两件事谁来干不知道,具体怎么干不知道,十分简洁优雅。

谁来完成具体的工作呢?我们把这个重任交给第三章中出现的 HTTPPool,让 HTTPPool 实现 PeerPicker.PickPeer 接口,用户就可以调用 RegisterPeers,把 HTTPPool 注入 Group 了。

为什么是 HTTPPool 来实现?

  • 统一网络通信层:第三章中 HTTPPool 主要是作为服务端,提供获取本机缓存的 HTTP 服务;那同样他也可以作为客户端,可以发送 HTTP 请求到其他节点。

HTTPPool 如何实现 PeerPicker 接口?

实现 PickPeer 方法就实现了这个接口,也就是实现通过 key 查找远程节点的功能。

  • 要查找远程节点,则需要将一致性哈希环集成到内部。
  • 找到节点后,要可以请求访问,则需要保存各节点 IP/Port。

如何将一致性哈希加入 HTTPPool?

  • 增加 sync.Mutex 锁。为什么要加锁?
  • 增加一致哈希 peers,保存所有节点。
  • 增加 map,将节点和节点的 ip/port 映射起来,知道访问那个 IP 去查询。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// geecache/http.go

// HTTPPool :包含服务端和客户端,服务端提供获取本机数据的 HTTP 服务,客户端提供访问其他节点的方法
type HTTPPool struct {
self string // 当前节点的 IP/端口
basePath string

// 增加能力:设置和获取远程节点的能力
mu sync.Mutex
peers *consistenthash.Map // 一致性哈希
httpGetters map[string]*httpGetter // 记录每个远程节点的 httpGetter,httpGetter 包含了 baseURL
}

// httpGetter 客户端:保存节点的 IP/Port
type httpGetter struct {
baseURL string
}

HTTPPool 能做的,也就是对 consistenthash.Map 能力进行封装,为自己所用:

  • HTTPPool.Set 初始化一致性哈希 peers,把所有节点都加到哈希环里,并为每个节点初始化 HTTP 客户端 httpGetter(其实就是保存了 IP/Port 而已)。
  • HTTPPool.PickPeer 通过 key,在一致性哈希环上,查找 key 所属的节点,返回这个节点的 IP/Port,我们将 IP/Port 保存在客户端 httpGetter,所以直接返回 httpGetter 实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// geecache/http.go

// Set 设置远程节点的能力:实例化一个一致性哈希,并向哈希环中添加节点。
func (h *HTTPPool) Set(peers ...string) {
h.mu.Lock()
defer h.mu.Unlock()

h.peers = consistenthash.New(defaultReplicas, nil) // 初始化哈希环
h.peers.Add(peers...) // 将节点加到哈希环上

// 保存 key 和对应的 httpGetter 到 map 字段 httpGetters 中
h.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
h.httpGetters[peer] = &httpGetter{
baseURL: peer + h.basePath,
}
}
}

// PickPeer 获取远程节点的能力:包装了一致性哈希获取真实节点的方法 consistenthash.Map.Get
func (h *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
h.mu.Lock()
defer h.mu.Unlock()

// 从哈希环上获取缓存值属于那个节点,如果不是空且不是自身,则返回对应的节点。
if peer := h.peers.Get(key); peer != "" && peer != h.self {
h.Log("Pick peer %s", peer)
return h.httpGetters[peer], true
}
return nil, false
}

为什么要返回客户端 httpGetter 实例,而不是直接的 IP/Port,让 Group 拿到 IP/Port 自己去发 HTTP 请求呢?

和上面,Group 为什么设计 PeerPicker 和 PeerGetter 接口一样。因为我们要做到 HTTPPool 和 Group 解耦,实现与业务分离。

  • HTTPPool 负责管理所有节点信息,根据节点信息提供 HTTP 服务、访问 HTTP 服务。
  • Group 只管调用 PeerPicker.PickPeerPeerGetter.Get 的方法,去完成业务逻辑即可。

所以,我们还需为 httpGetter 写一个发送请求的方法,让 httpGetter 实现 PeerGetter 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// geecache/http.go

// Get 步骤:
// 1.将 baseURL、group、key 拼接为远程节点缓存值的访问地址 URL
// 2.访问 URL 获取缓存值返回
func (g *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf("%v%v/%v", g.baseURL, url.QueryEscape(group), url.QueryEscape(key))
/* url.QueryEscape 是 URL 转义函数,
比如 http://123.com/123?image=http://images.com/cat.png
需要转义为 http://123.com/123?image=http%3A%2F%2Fimages.com%2Fcat.png
*/

res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned %v", res.Status)
}

bytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body %v", err)
}

return bytes, nil
}

如何验证功能?

这个 main 函数看着很绕,其实逻辑是这样:

  • 8001 - 8003 端口模拟三个的主机节点。
  • 端口 9999 相当于 8001 节点机器供用户交互的网关,每个节点都有与用户交互的网关,这里演示只启动 8001 节点的 9999 端口用于交互。
  • main 做的就是初始化 Group;初始化 HTTPPool;将 HTTPPool 注册到 Group 中;启动一个 api 服务监听 9999 端口。
graph LR
    A[查询请求] --> B[API-端口 9999  +  分布式节点一-端口 8001]
    B --> D[分布式节点二-端口 8002]
    B -->|经过查询 key 在节点三| E[分布式节点三-端口 8003]
    E --> F[返回缓存值]

具体实现不细描述,跑起来就明白了。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
.
├── geecache
│   ├── byteview.go
│   ├── cache.go
│   ├── consistenthash
│   │   ├── consistent_hash.go
│   │   └── consistent_hash_test.go
│   ├── geecache.go # 增加与远程节点交互的业务逻辑
│   ├── geecache_test.go
│   ├── http.go # 增加与远程节点交互的实现逻辑
│   └── lru
│   ├── lru.go
│   └── lru_test.go
├── go.mod
└── main.go # 增加与远程节点交互的 demo

6.防止缓存击穿

  • 缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。
  • 缓存击穿:一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。
  • 缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。

延伸阅读:golang 中 singleflight 包的实现:singleflight.go

如何防止缓存击穿?

这一章节可写的不多,主要是一个思想:

  • 对于短时间内的大量并发的同种请求,只响应第一个。
  • 其他的请求等待第一个请求完毕,直接使用第一个请求的结果。

值得学习的地方:

  • singleflight 的实现,使用代理模式,load 中查询远程节点的逻辑,全部交给了 singleflight.Do 这个方法进行封装,有点类似中间件的工作模式。
  • 仅需少量代码就可以完成对于每个请求的控制,简单优雅。

什么时候应该加锁?

  • 在读写 g.m[key] 前均需加锁。但是互斥锁效率太低,下面改成读写锁。

  • 并对于懒汉单例模式,使用双检锁来优化性能。

    • 第一次检查(无锁)
      减少锁竞争。大多数情况下实例已存在,无需加锁直接进入后续业务。
    • 第二次检查(加锁)
      防止多个协程同时通过第一次检查后,在锁内重复创建实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {

if g.m == nil { // 第一次不加锁,因为 g.m 的概率小
g.mu.Lock() // 如果 g.m == nil,说明是第一次,需要加锁,避免其他并发也进来创建 g.m
if g.m == nil {
g.m = make(map[string]*call) // 懒加载
}
g.mu.Unlock()
}

g.mu.RLock()
if c, ok := g.m[key]; ok {
g.mu.RUnlock() // 读 g.m 后解锁
c.wg.Wait()
return c.val, c.err
}
g.mu.RUnlock() // 读 g.m 后解锁

c := new(call)
c.wg.Add(1) // 记录同步的事件数量,其他并发请求阻塞等待当前请求结束
g.mu.Lock() // 写 g.m 前加锁
g.m[key] = c
g.mu.Unlock() // 写 g.m 后解锁

c.val, c.err = fn() // 调用 fn,发起请求
c.wg.Done() // 结束请求

g.mu.Lock() // 写 g.m 前加锁
delete(g.m, key) // 更新 g.m
g.mu.Unlock() // 写 g.m 后解锁

return c.val, c.err // 返回结果
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
.
├── geecache
│   ├── byteview.go
│   ├── cache.go
│   ├── consistenthash
│   │   ├── consistent_hash.go
│   │   └── consistent_hash_test.go
│   ├── geecache.go # 增加 singleflight
│   ├── geecache_test.go
│   ├── http.go
│   └── lru
│   │ ├── lru.go
│   │ └── lru_test.go
│   └── singleflight
│   └── singleflight.go # 新增对并发请求的控制
├── go.mod
└── main.go

7.使用 Protobuf 编码

使用 protobuf 编码需要做哪些更改?

(1)原来逻辑

客户端:

  1. 拿到 group、key 构造 url
  2. 使用 http.Get 发送请求

服务端:

  1. 处理请求,将结果 view 返回

客户端:

  1. 接收服务端结果,直接使用

(2)使用 protobuf 编码后的逻辑

客户端:

  1. 拿到 group、key 构造 url
  2. 使用 http.Get 发送请求

服务端:

  1. 处理请求,将结果 view 使用 protobuf 编码(变更)

客户端:

  1. 接收服务端结果,使用 protobuf 解码。(变更)

如何实现

  1. 创建 proto 文件,定义 Request 和 Response。
  2. 更改 PeerGetter.Get 参数,改为 Request 和 Response。
  3. 在处理请求和接收数据时,使用 protobuf 进行编解码。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
.
├── geecache
│   ├── byteview.go
│   ├── cache.go
│   ├── consistenthash
│   │   ├── consistent_hash.go
│   │   └── consistent_hash_test.go
│   ├── geecache.go # getFromPeer 修改传参方式。
│   ├── geecachepb # protobuf 编解码
│   │   ├── geecachepb.pb.go
│   │   └── geecachepb.proto
│   ├── geecache_test.go
│   ├── geers.go # 新增文件存放 PeerGetter、PeerPicker 接口
│   ├── http.go # 新增在处理请求和接收数据时,使用 protobuf 进行编解码。
│   └── lru
│   │ ├── lru.go
│   │ └── lru_test.go
│   └── singleflight
│   └── singleflight.go
├── go.mod
└── main.go

从零实现系列|分布式缓存
https://www.aimtao.net/7days-cache/
Posted on
2023-05-13
Licensed under