从零实现系列|分布式缓存

本文逐步实现了一个分布式缓存系统,涵盖LRU缓存淘汰策略、单机并发控制、HTTP服务接口及一致性哈希算法。通过封装缓存值、互斥锁机制和虚拟节点映射,解决了并发安全、节点负载均衡等核心问题,构建了支持分布式部署的缓存框架。

通过回答关键问题的方式,记录阅读 geeCache 代码过程中的思考,并做出补充和改进,重点在“怎么做”的基础上,补充“为什么这么做”等逻辑。

0.序言

为什么需要 cache

  • 优化性能:将耗时操作的结果储存起来,下次需要请求,直接使用,避免重复耗时操作。

  • 降低延迟:静态资源缓存(CDN/浏览器缓存),提高访问速度。

  • 转移负载:将高频访问的数据,从数据库迁移到内存中,降低后端压力。

  • 高并发支撑:通过内存级的响应能力支持大规模的并发请求,避免数据库被击穿。

cache 要解决哪些问题

  • 怎么存储?维护一个键值对缓存,实现 O(1) 的存取速度。

  • 内存不够?实现一种缓存的淘汰策略。

  • 并发冲突?对缓存的修改,实现并发保护。

  • 单机性能差?实现可以横向扩展的分布式缓存。

    • 分布式缓存存在多个节点,节点间如何通信?需要支持 HTTP、protobuf。
    • 如何用 O(1) 的速度找到 cache 对应的节点?使用一致性哈希来选择节点,实现负载均衡。

1.缓存淘汰策略

介绍三种常见的缓存淘汰算法

缓存储存在内存中,内存是有限的,对于没用的数据就需要移出。三种常见的缓存淘汰算法。

FIFO(First In First Out)

  • 新进先出,按照时间顺序,优先淘汰最早添加的数据。

  • 实现:维护一个先进先出队列,新数据插入队尾,淘汰时移除队首数据。

  • 缺点:无法感知访问频率,即使高频访问的数据,若进入缓存较早仍会被淘汰,可能导致缓存命中率下降。

LFU(Least Frequently Used)

  • 最少使用,按照访问频率,优先淘汰访问次数最少的数据。

  • 实现:维护一个按照访问次数排序的最小堆/双层链表。每次访问,访问次数+1,重新排序。淘汰时,直接淘汰队首访问次数最少的即可。

  • 优点:缓存命中率高。

  • 缺点:

    • 维护成本高:每次访问都需要更新计数,并重新排序。
    • 受历史数据影响大:早期高频但后期失效的数据(比如过时的热点新闻)难以被淘汰。(解决:定期衰减历史计数。)

LRU(Least Recently Used)

  • 基于时间局部性原理,认为最近被访问过的,更有可能被再次访问。那么最近没使用过的数据,就是要淘汰的数据。

  • 实现:维护一个队列,某个数据被访问,就将数据移至队尾,这样队首则是最近最少被访问的数据。

  • 优点:FIFO 考虑时间因素,LFU 考虑访问频率,LRU 既考虑时间,也考虑频次(多次访问,意味着多次被移到队尾)。

如何设计 cache

cache 数据结构设计思路

  • 使用哈希表和双向链表,来实现 LRU 淘汰策略的缓存。

    • 哈希表:通过 key 可以 O(1) 的复杂度获取 value
    • 双向链表:维护访问顺序,最近访问的节点移动到链表头部,队尾节点是“最近最少使用”的数据,即将被淘汰的数据。
  • value 储存在 entry 的结构体中。每个 entry 结构体都是双向链表上的一个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// geecache/lru/lru.go

type Cache struct {
maxBytes int64 // 最大的内存
nBytes int64 // 已经被用作缓存的内存
ll *list.List // 双向链表
cache map[string]*list.Element // 用 map 查找链表的节点
OnEvicted func(key string, value Value) // 删除缓存节点的回调函数
}

// 链表节点的数据类型
type entry struct {
key string // 保存 key,删除链表缓存时,便于同步删除 map 中的 k/v 记录
value Value // 真正的缓存数据,允许 Value 是任何类型
}

type Value interface {
Len() int // Len() 表示该数据的内存大小, 添加数据前,应该手动实现该数据类型的 Len 函数
}

// 初始化 cache
func New(maxBytes int64, onEvicted func(key string, value Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}

为什么 value 不直接放在链表里,要放在 entry 结构体里

  • 淘汰缓存时,可以直接从 entry 结构体中拿到 key,用来删除哈希表中的映射。

为什么要新建一个 Value 接口类型,并包含 Len 方法

  • 缓存的数据可以什么格式都有,用接口变量储存最好,即 interface{}

  • 因为整个 cache 是有大小的,所有有需要查询所有缓存数据的大小。所以在 interface{} 中增加一个 Len方法,只有实现了这个方法的变量,才能作为接口类型 Value 的变量储存在 cache 中。

  • 下面举个例子,如歌 entry 的 value 变量需要储存 String 变量,String 需要主动实现 Len 方法,也就是实现 Value 接口。

1
2
3
4
5
type String string

func (s String) Len() int {
return len(s)
}

如何实现 cache 增删改查

增加

  • 检查是否存在,存在即更新,不存在则增加缓存

  • 维护缓存大小,超过最大值需要淘汰缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// geecache/lru/lru.go

// Add 增加缓存数据
func (c *Cache) Add(key string, value Value) {
if _, ok := c.cache[key]; ok {
c.Update(key, value)
} else {
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nBytes += int64(len(key)) + int64(value.Len())
for c.maxBytes != 0 && c.maxBytes < c.nBytes {
c.RemoveOldest()
}
}
}

删除

  • 链表删除尾部节点,map 删除 key,更新 cache 总体大小,调用回调函数(回调函数是初始化时传入的)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// geecache/lru/lru.go

// RemoveOldest 删除链表尾部的缓存数据
// 用户并不需要调用,Add/Update时,可用空间不够,才会删除
func (c *Cache) RemoveOldest() {
ele := c.ll.Back()
if ele != nil {
c.ll.Remove(ele)
kv := ele.Value.(*entry) // 先获取链表节点的 Value 字段,类型是 any,并转化为 *entry 类型
delete(c.cache, kv.key)
c.nBytes -= int64(len(kv.key)) + int64(kv.value.Len())
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
}

修改

  • 查询是否存在,将节点移至首部(表示频率),更新 cache 总体大小,更新节点的缓存值。

  • 注意:更新缓存还需要看缓存是否超过最大容量,如果超过需要淘汰缓存,下面代码暂时不写这个逻辑了。

1
2
3
4
5
6
7
8
9
10
11
12
13
// geecache/lru/lru.go

// Update 更新缓存数据
func (c *Cache) Update(key string, value Value) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
c.nBytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value
} else {
c.Add(key, value)
}
}

查询

  • 查询是否存在,将节点移至首部(表示频率),返回节点的缓存值。

1
2
3
4
5
6
7
8
9
10
11
// geecache/lru/lru.go

// Get 获取缓存数据
func (c *Cache) Get(key string) (value Value, ok bool) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}

ele.Value.(*entry) 的写法

  • ele 是 *list.Element 类型的变量,表示链表的一个节点。Element 变量中有 Value 变量,Value 变量类型是 any。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// container/list/list.go

package list

// Element is an element of a linked list.
type Element struct {
// Next and previous pointers in the doubly-linked list of elements.
// To simplify the implementation, internally a list l is implemented
// as a ring, such that &l.root is both the next element of the last
// list element (l.Back()) and the previous element of the first list
// element (l.Front()).
next, prev *Element

// The list to which this element belongs.
list *List

// The value stored with this element.
Value any
}
  • Element.Value 变量中存储的,实际上是我们的 entry 类型的结构体变量的指针。

  • 这里写法是断言,将 any 类型的 Element.Value 变量还原为 *entry 类型变量。

完整代码

1
2
3
4
5
version_1_LRU[geecache]
├── go.mod
└── lru
├── lru.go
└── lru_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package lru

import "container/list"

type Cache struct {
maxBytes int64 // 最大的内存
nBytes int64 // 已经被用作缓存的内存
ll *list.List // 双向链表
cache map[string]*list.Element // 用 map 查找链表的节点
OnEvicted func(key string, value Value) // 删除缓存节点的回调函数
}

// 链表节点的数据类型
type entry struct {
key string // 保存 key,删除链表缓存时,便于同步删除 map 中的 k/v 记录
value Value // 真正的缓存数据,允许 Value 是任何类型
}

type Value interface {
Len() int // Len() 表示该数据的内存大小, 添加数据前,应该手动实现该数据类型的 Len 函数
}

func New(maxBytes int64, onEvicted func(key string, value Value)) *Cache {
return &Cache{
maxBytes: maxBytes,
ll: list.New(),
cache: make(map[string]*list.Element),
OnEvicted: onEvicted,
}
}

// Add 增加缓存数据
func (c *Cache) Add(key string, value Value) {
if _, ok := c.cache[key]; ok {
c.Update(key, value)
} else {
ele := c.ll.PushFront(&entry{key, value})
c.cache[key] = ele
c.nBytes += int64(len(key)) + int64(value.Len())
for c.maxBytes != 0 && c.maxBytes < c.nBytes {
c.RemoveOldest()
}
}
}

// RemoveOldest 删除链表尾部的缓存数据
// 用户并不需要调用,Add/Update时,可用空间不够,才会删除
func (c *Cache) RemoveOldest() {
ele := c.ll.Back()
if ele != nil {
c.ll.Remove(ele)
kv := ele.Value.(*entry) // 先获取链表节点的 Value 字段,类型是 any,并转化为 *entry 类型
delete(c.cache, kv.key)
c.nBytes -= int64(len(kv.key)) + int64(kv.value.Len())
if c.OnEvicted != nil {
c.OnEvicted(kv.key, kv.value)
}
}
}

// Update 更新缓存数据
func (c *Cache) Update(key string, value Value) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
c.nBytes += int64(value.Len()) - int64(kv.value.Len())
kv.value = value
} else {
c.Add(key, value)
}
}

// Get 获取缓存数据
func (c *Cache) Get(key string) (value Value, ok bool) {
if ele, ok := c.cache[key]; ok {
c.ll.MoveToFront(ele)
kv := ele.Value.(*entry)
return kv.value, true
}
return
}

// Len 获取缓存数据的条数
func (c *Cache) Len() int {
return c.ll.Len()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package lru

import (
"fmt"
"testing"
)

// 对于数据类型,需要进行封装,因为无法在原生类型 string 中添加 Len 方法
type String string

func (s String) Len() int {
return len(s)
}

// TestFunction 测试增删改查基本功能
func TestFunction(t *testing.T) {
lru := New(int64(10), nil)

lru.Add("k1", String("1234"))
fmt.Println(lru.Get("k1"))

lru.RemoveOldest()
fmt.Println(lru.Get("k1"))

lru.Update("k2", String("123"))
fmt.Println(lru.Get("k2"))
}

// TestAutoRemoveOldest 测试当使用内存超过 maxBytes,是否会触发 “无用” 节点的删除
func TestAutoRemoveOldest(t *testing.T) {
testData := []struct {
key string
value Value
}{
{"k1", String("1234567890")},
{"k2", String("234567890")},
{"k3", String("34567890")},
}

lru := New(int64(10), nil)

for _, test := range testData {
lru.Add(test.key, test.value)
}

for _, test := range testData {
fmt.Println(lru.Get(test.key))
}
}

// TestCallbackOnEvicted 测试删除缓存时回调函数是否能被调用
func TestCallbackOnEvicted(t *testing.T) {
keys := make([]string, 0)
lru := New(10, func(key string, value Value) {
keys = append(keys, key)
})
lru.Add("k1", String("k1"))
lru.Add("k2", String("k2"))
lru.Add("k3", String("k3"))
lru.Add("k4", String("k4"))

fmt.Println(keys)
}

2.单机并发缓存

如何设计并发安全的 cache

主要做两件事:加锁 + 保证数据不可变性

对互斥操作进行加锁

  • lru.Cache 的基础上,封装一个 cache 结构体,将 lru.Cache 的操作都加互斥锁。(注意不能是读写锁,因为读数据,也要移动链表节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// geecache/cache.go

// 为什么要封装 cache?
// 加锁,保证对 lru.Cache 的并发安全
type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}

// add 并发安全地添加缓存
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil) // 懒加载,延迟初始化 lru.Cache
}
c.lru.Add(key, value)
}

// get 并发安全地获取缓存
func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}

return
}

保证数据不可变性(只读)

  • 上面代码中,无论是 add 还是 get 操作的数据类型都是 ByteView,也就是说,缓存的类型被规定成了 ByteView,这是为什么呢?

  • 其实就是为了保证数据的不可变性, ByteView 的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// geecache/byteview.go

package geecache

type ByteView struct {
b []byte // slice
}

func (byteView ByteView) Len() int { // ByteView 实现 lru.Value 接口
return len(byteView.b)
}

// 为什么 ByteSlice 要返回拷贝?
// 防止缓存值被外部修改,这里直接返回拷贝
func (byteView ByteView) ByteSlice() []byte {
return cloneBytes(byteView.b) // 为什么不直接使用 make,要封装 cloneBytes 函数
}

// 为什么需要 cloneBytes ?
// 因为 []byte 是切片,传递时,不会深拷贝,传递的是视图,底层数据会被外界修改
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}

func (byteView ByteView) String() string {
return string(byteView.b)
}

为什么要抽象 ByteView 数据类型来表示缓存值?

  1. 在 lru 中定义的 entry.value 是 Value 接口类型,所有传入的数据的类型均需要实现这个接口,也就是实现 Len 函数,比较麻烦。[]byte 可以支持任何数据类型的存储,并为 []byte 抽象为 ByteView 类型,并实现 lru.Value 接口。

  2. 保证 ByteView 是只读类型。如何保证?

    • ByteView 的变量只有一个 bb 是小写,外部无法直接读取。
    • 只能通过 ByteSliceString 方法获取 b 的数据
      • ByteSlice 返回 slice 时,会拷贝一个副本再返回
      • String 将 b 的数据强制转换为 String。(外界也无法直接修改 b 的值)

为什么 get 操作也要加锁

想象两个场景:

  1. cache 在 add 函数中,初始化 lru.Cache 的过程是懒加载,lru.Cache 初始化过程中,还为初始化完,如果这时候访问 Get,会导致空指针的情况。

  2. lru.cache 在写数据时,需要移动节点到首部,如果这时候去访问,会同时移动链表节点到首部,会造成链表断裂,节点丢失。

也就是说,需要保证 lru.cache 初始化的原子性,以及 lru.cache 内部操作的原子性。

为什么要延迟初始化(Lazy Initialization)

1
2
3
4
5
6
7
8
9
10
11
12
// geecache/cache.go

// add 并发安全地添加缓存
func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil) // 懒加载,延迟初始化 lru.Cache
}
c.lru.Add(key, value)
}
  • 节省内存:只有使用时才初始化,没使用不初始化。

  • 减少初始化 cache 时的开销,加块整个程序的冷启动速度。

为什么要设计 Group

在使用 cache 对 lru.Cache 进行封装后,又使用 Group 对 cache 进行了封装。这是为什么呢?

其实站在使用者的角度,只有一个并发安全的 cache,会存在很多问题没法解决:

  • 不同的数据因为大小、有效期等不同,需要放到不同的 cache 实例中,如何标识不同的 cache 实例呢?需要一个名字/ID。

  • 当前机器没有找到的数据,需要访问其他分布式节点或者回源(可能是多种数据源)。这个过程如何统一呢?

为此,我们使用分层设计的思想,再封装一层,cache 负责并发存储,Group 负责业务逻辑。

有哪些业务逻辑?

  • 为 cache 实例标识一个命名空间。

  • 为 cache 实例提供回源方法的接口。

  • 注:还可以在 group 中,针对 cache 实例增加各种缓存策略。

如何设计 Group

1
2
3
4
5
6
7
8
9
10
11
12
13
// geecache/geecache.go

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string // 一个 Group 是一个命名空间,并有唯一的 name。比如可以创建多个 Group,储存不同类别的信息。
getter Getter // 当缓存未命中时,可以调用 Getter.Get 这个回调函数获取值,并储存在缓存中。
mainCache cache // 并发安全的缓存
}

var (
groupMu sync.RWMutex // 全局读写锁
groups = make(map[string]*Group) // 全局缓存组注册表
)

通过 groups 进行全局管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// geecache/geecache.go

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}

groupMu.Lock()
defer groupMu.Unlock()

g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}

func GetGroup(name string) *Group {
groupMu.RLock() // 为什么是读锁?保证写操作的原子性
g := groups[name]
groupMu.RUnlock()
return g
}

统一管理缓存回源逻辑

首先是回源接口,

  • Group 只要获取不到数据,都通过 group.getter.Get 来回源。但是数据源数据有很多,可能是文件,可能是数据库。所以需要抽象一个接口。

  • 只要实现了 Get 方法,都可以作为 Getter 接口类型变量,供 group 回源时调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// geecache/geecache.go

// Getter 用于从各类的外部数据源获取数据
type Getter interface {
Get(key string) ([]byte, error)
}

// GetterFunc 为了便于使用者传入匿名函数到 Getter 中
// 使用时,只需将匿名函数转化为 GetterFunc 类型,即可传入 Getter
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

有了回源接口,需要实现 group.Get 的逻辑,参考这个流程图。

graph LR
    A[接收 key] --> B[是否被缓存?]
    B -->|是| C[返回缓存值-步骤1]
    B -->|否| D[是否应从远程节点获取?]
    D -->|是| E[与远程节点交互]
    E --> F[返回缓存值-步骤2]
    D -->|否| G[调用回调函数]
    G --> H[获取值并添加到缓存]
    H --> I[返回缓存值-步骤3]
  • Get 完成步骤1

  • load + getLocally 完成步骤3

  • 步骤二在第5章节《分布式节点》中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}

// 缓存没有命中,就从其他数据源中获取
return g.load(key)
}

// 当缓存不存在,Get -> load -> getLocally 获取数据
// 为什么不直接使用 getLocally,还要封装一个 load ?
// load 会先从 远程分布式节点获取,获取不到才会使用 getLocally。此处属于预留设计。
func (g *Group) load(key string) (ByteView, error) {
return g.getLocally(key)
}

// getLocally 调用用户的回调函数 g.getter.Get,获取数据,并使用 populateCache 添加数据
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}

// populateCache 将获取到的数据添加到缓存中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

完整代码

1
2
3
4
5
6
7
8
9
10
version_2_singal_node[geecache]
├── geecache
│ ├── byteview.go # 缓存值的抽象与封装
│ ├── cache.go # 并发控制
│ ├── geecache.go # 负责与用户交互,并拥有从外部数据源获取缓存并存储的功能
│ ├── geecache_test.go
│ └── lru
│ ├── lru.go
│ └── lru_test.go
└── go.mod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package geecache

// 为什么要抽象 ByteView 数据类型来表示缓存值?
/* 1. 在 lru 中定义的 entry.value 是 Value 接口类型,所有传入的数据的类型均需要实现这个接口,也就是实现 Len 函数,比较麻烦。
[]byte 可以支持任何数据类型的存储,并为 []byte 抽象为 ByteView 类型,并实现 lru.Value 接口。

2. 保证 ByteView 是只读类型
如何保证?
1. b 是小写,外部无法直接读取。
2. 只能通过 ByteSlice 和 String 方法获取 b 的数据
1.ByteSlice 返回 slice 时,会拷贝一个副本再返回
2.String 将 b 的数据强制转换为 String。(外界也无法直接修改 b 的值)
*/

type ByteView struct {
b []byte // slice
}

func (byteView ByteView) Len() int { // ByteView 实现 lru.Value 接口
return len(byteView.b)
}

// 为什么要返回拷贝?
// 防止缓存值被外部修改,这里直接返回拷贝

func (byteView ByteView) ByteSlice() []byte {
return cloneBytes(byteView.b) // 为什么不直接使用 make,要封装 cloneBytes 函数?方便 cloneBytes 复用
}

// 为什么需要 cloneBytes ?
// 因为 []byte 是切片,传递时,不会深拷贝,传递的是视图,底层数据会被外界修改
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}

func (byteView ByteView) String() string {
return string(byteView.b)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package geecache

import (
"sync"
"geecache/geecache/lru"
)

// 为什么要封装 cache?
// 加锁,保证对 lru.Cache 的并发安全

type cache struct {
mu sync.Mutex
lru *lru.Cache
cacheBytes int64
}

func (c *cache) add(key string, value ByteView) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
c.lru = lru.New(c.cacheBytes, nil)
}
c.lru.Add(key, value)
}

func (c *cache) get(key string) (value ByteView, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()

if c.lru == nil {
return
}
if v, ok := c.lru.Get(key); ok {
return v.(ByteView), ok
}

return
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package geecache

import (
"fmt"
"log"
"sync"
)

// Getter 用于从各类的外部数据源获取数据
type Getter interface {
Get(key string) ([]byte, error)
}

// GetterFunc 为了便于使用者传入匿名函数到 Getter 中
// 使用时,只需将匿名函数转化为 GetterFunc 类型,即可传入 Getter
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string // 一个 Group 是一个命名空间,并有唯一的 name。比如可以创建多个 Group,储存不同类别的信息。
getter Getter // 当缓存未命中时,可以调用 Getter.Get 这个回调函数获取值,并储存在缓存中。
mainCache cache // 并发安全的缓存
}

var (
groupMu sync.RWMutex
groups = make(map[string]*Group)
)

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}

groupMu.Lock()
defer groupMu.Unlock()

g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}

func GetGroup(name string) *Group {
groupMu.RLock() // 为什么是读锁?
g := groups[name]
groupMu.RUnlock()
return g
}

/*

接收 key --> 检查是否被缓存 -----> 返回缓存值 (1)
| 否 是
|-----> 是否应当从远程节点获取 -----> 与远程节点交互 --> 返回缓存值 (2)
| 否
|-----> 调用`回调函数`,获取值并添加到缓存 --> 返回缓存值 (3)
Get 完成流程(1)
load + getLocally 完成流程(3)
*/

func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}

// 缓存没有命中,就从其他数据源中获取
return g.load(key)
}

// 当缓存不存在,Get -> load -> getLocally 获取数据
// 为什么不直接使用 getLocally,还要封装一个 load ?
// load 会先从 远程分布式节点获取,获取不到才会使用 getLocally。此处属于预留设计。
func (g *Group) load(key string) (ByteView, error) {
return g.getLocally(key)
}

// getLocally 调用用户的回调函数 g.getter.Get,获取数据,并使用 populateCache 添加数据
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}

// populateCache 将获取到的数据添加到缓存中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package geecache

import (
"fmt"
"log"
"testing"
)

func TestGetter(t *testing.T) {
var g Getter = GetterFunc(func(key string) ([]byte, error) {
return []byte(key), nil
})

bytes, _ := g.Get("key")
fmt.Printf("%c", bytes)
}

// 外部数据源
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}

func TestGet(t *testing.T) {
loadCounts := make(map[string]int, len(db)) // 记录加入缓存的次数

gee := NewGroup("scores", 1024, GetterFunc(
func(key string) ([]byte, error) { // 缓存未命中时,从其他数据源获取数据的函数
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
if _, ok := loadCounts[key]; !ok {
loadCounts[key] = 0
}
loadCounts[key] += 1
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))

// 从缓存中查看各个 key,测试缓存从外部数据源获取数据的能力
for k, v := range db {
view, err := gee.Get(k)
if err != nil {
panic(err)
}
fmt.Println(k, v, view.String())
}

// 对于数据源中不存在的数据的处理
view, err := gee.Get("unknown")
if err != nil {
fmt.Println(err)
}
fmt.Println(view.String())
}

3.HTTP 服务器

这部分比较简单,主要是启动一个 http 服务,处理 URL,获取到 group name 和 key,返回缓存查询到的结果。

完整代码

1
2
3
4
5
6
7
8
9
10
11
version_3_http_server[geecache]
├── geecache
│ ├── byteview.go
│ ├── cache.go
│ ├── geecache.go
│ ├── geecache_test.go # 新增 http 测试
│   ├── http.go # 新增从 HTTP 服务器获取 cache
│ └── lru
│ ├── lru.go
│ └── lru_test.go
└── go.mod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package geecache

import (
"fmt"
"log"
"net/http"
"strings"
)

const BASEPATH = "/_geecache/" // 请求路径应该是 "/<basepath>/<groupname>/<key>"

type HTTPPool struct {
self string
basePath string
}

func NewHTTPPool(self string) *HTTPPool { // 为什么要设置这两个字段
return &HTTPPool{
self: self, // 本机的IP/端口
basePath: BASEPATH, // 请求前缀,便于过滤请求
}
}

func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, h.basePath) {
panic("Error path: " + r.URL.Path)
}

h.Log("%s %s", r.Method, r.URL.Path)

parts := strings.SplitN(r.URL.Path[len(BASEPATH):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.Write(view.ByteSlice())
}

func (h *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", h.self, fmt.Sprintf(format, v...))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package geecache

import (
"fmt"
"log"
"net/http"
"testing"
)

// 外部数据源
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}

func TestHTTP(t *testing.T) {
NewGroup("scores", 1024, GetterFunc(
func(key string) ([]byte, error) {
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))

addr := "localhost:9999"
peers := NewHTTPPool(addr)
log.Println("cache is running at", addr)
log.Fatal(http.ListenAndServe(addr, peers))
}

/*
curl http://localhost:9999/_geecache/scores/Tom
curl http://localhost:9999/_geecache/scores/xxx

log:
2023/10/16 17:40:24 [Server localhost:9999] GET /_geecache/scores/Tom
2023/10/16 17:40:24 [SlowDB] search key Tom
2023/10/16 17:40:51 [Server localhost:9999] GET /_geecache/scores/567
2023/10/16 17:40:51 [SlowDB] search key 567
*/

4.一致性哈希

该访问哪个缓存节点

  • 分布式缓存,也就是有多个机器,每个机器存储一部分数据。但当一个请求过来,该去哪个节点查询缓存呢?

  • 解决:通过哈希算法对 key 进行计算,并对节点个数进行取余。这样每个 key 都会固定映射到一个节点中。

缓存节点数量调整怎么办

  • 缓存运行过程中,如果有节点增加/减少,hash(key) % N,这里的 N 会变为 N-1,大部分缓存都会失效,在原来的节点中找不到,会引发缓存雪崩。

  • 解决:一致性哈希可以解决这个问题。

一致性哈希是什么

  • 一致性哈希是一个哈希环,通过哈希算法将 key 和节点映射到 0 到 2³²−1 的环形空间。(为什么是 2³²?因为大部分哈希计算出的值都是 32 位的,其实也可以用 64 位的,只不过 2³² 是40亿,足够应对哈希冲突了。)

  • 每个 key 归属于顺时针找到的第一个节点。

  • 增加节点时,只有新增节点区域的 key 需要迁移至新节点。减少节点也是同理。

add_peer

数据倾斜问题怎么处理

  • 如上图左侧哈希环中,key2、key11、key27 都在节点 peer2 上,这就是数据倾斜,大部分数据储存在少部分几个节点中。

  • 为什么会数据倾斜?因为节点在哈希环上分布不均。所以我们使用更多的虚拟节点将环划分为更小的区间,每个真实节点对应多个虚拟节点。这样就不会因为真实节点分布不均,导致的大段连续区间被单一节点垄断的问题。

如何实现一致性哈希

主要有三件事:构建哈希环、节点数量变化的处理、查找 key 所在节点。

(1)构建哈希环

哈希环就是map,

  • 支持自定义哈希函数,在初始化时,使用者传入,不传入则默认使用 crc32。

  • 设置虚拟节点,虚拟节点个数是真实节点的 replicas 倍。

  • 将所有虚拟节点有序排列,储存在数组 keys 里,这就是哈希环本体。

  • 虚拟节点与真实节点的映射保存在 hashMap 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type Hash func(data []byte) uint32 // hash 函数类型

type Map struct { // 一致性哈希算法的主要数据结构
hash Hash // 设置自定义一种哈希算法函数
replicas int // 虚拟节点的倍数
keys []int // 哈希环,有序储存所有虚拟节点
hashMap map[int]string // 虚拟节点和真实节点的映射表,key 是虚拟节点的哈希值,value 是真实节点的名称
}

func New(replicas int, fn Hash) *Map {
m := &Map{
hash: fn,
replicas: replicas,
hashMap: make(map[int]string),
}
if m.hash == nil { // 默认使用 crc32.ChecksumIEEE 哈希算法
m.hash = crc32.ChecksumIEEE
}
return m
}

(2)节点数量变化的处理

暂时不考虑已有缓存迁移的问题,仅考虑节点数量变化后,能否找到对应的真实节点。

加入节点:

  • 加入一个真实节点,实际操作是,加入 replicas 个虚拟节点。

  • 先都加入到 keys 中,再排序

1
2
3
4
5
6
7
8
9
10
11
12
// Add 在哈希环 Map.keys 中加入真实节点
// 节点名和虚拟节点名经过 hash 计算后得到哈希值,将哈希值加入 Map.keys 中,并排序。
func (m *Map) Add(keys ...string) {
for _, key := range keys { // 对于每个真实节点,添加 m.replicas 个虚拟节点。
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key))) // 计算虚拟节点的 hash 值
m.keys = append(m.keys, hash) // 虚拟节点加入环
m.hashMap[hash] = key // 将虚拟节点和真实节点加入映射表
}
}
sort.Ints(m.keys) // 对虚拟节点排序
}

删除节点:

  • 删除真实节点与虚拟节点的映射

  • 重新构建哈希环(仅保留未被删除的哈希值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Del 在哈希环 Map.keys 中减少真实节点对应的虚拟节点
func (m *Map) Del(keys ...string) {

// 删除真实节点与虚拟节点的映射
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
delete(m.hashMap, hash)
}
}

// 重新构建哈希环(仅保留未被删除的哈希值)
var newKeys []int
for hash := range m.hashMap {
newKeys = append(newKeys, hash)
}
sort.Ints(newKeys)
m.keys = newKeys
}

(3)查找 key 所在节点

  • 最初的定义是,每个 key 归属于顺时针找到的第一个节点。

  • 给定一个 hash,他归属的节点就是第一个大于等于 hash 的节点。

  • 通过二分查找来寻找第一个大于等于 hash 的节点,注意需手动传入判断函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Get 获取 key 所在节点名
// key 经过 hash 计算后得到哈希值,在哈希环 Map.keys 上查找最接近的节点。
// 例如:key 的哈希值是 10000,哈希环上找到最接近的两个节点是 8000、11000,应该存在 8000 这个节点上。
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}

hash := int(m.hash([]byte(key)))

// 二分查找虚拟节点
index := sort.Search(len(m.keys), func(i int) bool {
return m.keys[i] >= hash // 找到第一个大于等于 hash 的节点,没有则返回 len(m.keys)
})

return m.hashMap[m.keys[index%len(m.keys)]]
}
  • 同时来了解一下,sort.Search 函数,二分查找,找到第一个符合条件的索引。什么条件?就是传入的函数为 true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func Search(n int, f func(int) bool) int {
i, j := 0, n
for i < j {

// 因为i、j是int, (i+j)/2 在 i+j > 2^31-1 时会溢出,先转化为无符号整型计算可以防止溢出。
h := int(uint(i+j) >> 1)
// i ≤ h < j
if !f(h) {
i = h + 1 // h 不满足条件,答案在 h 的右侧。[h + 1, j)
} else {
j = h // h 满足条件,答案在 h 的左侧。[i, h)
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
version_4_consistent_hashing[geecache]
├── geecache
│ ├── byteview.go
│ ├── cache.go
│   ├── consistenthash
│   │   └── consistent_hash.go # 新增一致性哈希
│ ├── geecache.go
│   ├── geecache_test.go
│   ├── http.go
│   └── lru
│   ├── lru.go
│   └── lru_test.go
└── go.mod
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package consistenthash

import (
"hash/crc32"
"sort"
"strconv"
)

/**
一致性哈希解决什么问题?
每个数据,都会准确的分配到一个节点上。访问时,根据该数据的hash值,确定该访问哪个节点。
数据倾斜问题:服务器节点过少时,会导致数据无法均匀分配在各节点上。使用虚拟节点解决。
删除节点或增加节点时,只需要调整该节点的数据。
*/

type Hash func(data []byte) uint32 // hash 函数类型

type Map struct { // 一致性哈希算法的主要数据结构
hash Hash // 设置自定义一种哈希算法函数
replicas int // 虚拟节点的倍数
keys []int // 哈希环,有序储存所有虚拟节点
hashMap map[int]string // 虚拟节点和真实节点的映射表,key 是虚拟节点的哈希值,value 是真实节点的名称
}

func New(replicas int, fn Hash) *Map {
m := &Map{
hash: fn,
replicas: replicas,
hashMap: make(map[int]string),
}
if m.hash == nil { // 默认使用 crc32.ChecksumIEEE 哈希算法
m.hash = crc32.ChecksumIEEE
}
return m
}

// Add 在哈希环 Map.keys 中加入真实节点对应的虚拟节点
// 节点名和虚拟节点名经过 hash 计算后得到哈希值,将哈希值加入 Map.keys 中,并排序。
func (m *Map) Add(keys ...string) {
for _, key := range keys { // 对于每个真实节点,添加 m.replicas 个虚拟节点。
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key))) // 计算虚拟节点的 hash 值
m.keys = append(m.keys, hash) // 虚拟节点加入环
m.hashMap[hash] = key // 将虚拟节点和真实节点加入映射表
}
}
sort.Ints(m.keys) // 对虚拟节点排序
}

// Del 在哈希环 Map.keys 中减少真实节点对应的虚拟节点
func (m *Map) Del(keys ...string) {
for _, key := range keys {
for i := 0; i < m.replicas; i++ {
hash := int(m.hash([]byte(strconv.Itoa(i) + key)))
delete(m.hashMap, hash)
}
}

var newKeys []int
for hash := range m.hashMap {
newKeys = append(newKeys, hash)
}
sort.Ints(newKeys)
m.keys = newKeys
}

// Get 获取 key 所在节点名
// key 经过 hash 计算后得到哈希值,在哈希环 Map.keys 上查找最接近的节点。
// 例如:key 的哈希值是 10000,哈希环上找到最接近的两个节点是 8000、11000,应该存在 8000 这个节点上。
func (m *Map) Get(key string) string {
if len(m.keys) == 0 {
return ""
}

hash := int(m.hash([]byte(key)))

// 二分查找虚拟节点
index := sort.Search(len(m.keys), func(i int) bool { // 这个函数的意义
return m.keys[i] >= hash
})

return m.hashMap[m.keys[index%len(m.keys)]]
}

5.分布式节点

这一章节作者原文站在架构的角度自上而下描述的,看的时候未免有些疑惑,比如为什么上来要抽象两个接口。下面我会按照实用主义角度来回答,“基于什么需求,所以这么做”,以作为补充。

分布式缓存要做哪些工作

整个分布式缓存,要做的工作就是实现下面这个图。

graph LR
    A[接收 key] --> B[是否被缓存?]
    B -->|是| C[返回缓存值-步骤1]
    B -->|否| D[是否应从远程节点获取?]
    D -->|是| E[与远程节点交互]
    E --> F[返回缓存值-步骤2]
    D -->|否| G[调用回调函数]
    G --> H[获取值并添加到缓存]
    H --> I[返回缓存值-步骤3]
  • 在第二章节,我们完成了单个节点的功能:查找本地缓存(步骤1);找不到时进行回源储存(步骤3)。

  • 在第三章节,我们解决了单个节点如何为其他节点提供服务的问题。

  • 在第四章节,我们解决了多个节点的情况下,查找缓存该查哪个节点的问题。

  • 有了以上铺垫,我们该完成上图中的步骤二了:与远程节点交互,查找远程节点的数据。

graph LR
    A[接收 key] --> B[是否被缓存?]
    B -->|是| C[返回缓存值-步骤1]
    B -->|否| E[使用一致性哈希选择节点,是否从远程节点获取?]
    E -->|是| G[HTTP 客户端访问远程节点]
    G -->|成功| I[返回缓存值-步骤2]
    G -->|失败| K[调用回调函数]
    K --> L[获取值并添加到缓存]
    L --> M[返回缓存值-步骤3]
    E -->|否| K

补充一些步骤二的细节,如上图。

站在使用者的角度,当我拿到 Group 之后,要完成步骤二,我需要哪些方法供我调用?

  1. 方法一:通过 Key 查找到属于哪个远程节点。

  2. 方法二:访问远程节点,获取缓存。

  3. 方法三:远程节点也没有,则调用本地的回调函数。

所以 Group 声明了两个接口,PeerPicker 为 Group 提供方法一,PeerGetter 为 Group 提供方法二,方法三在原有 group.load 逻辑中改写。

1
2
3
4
5
6
7
8
9
10
11
// geecache/geecache.go

// PeerPicker 协助 Group 通过 key 选择远程节点
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool) // 根据 key 选择节点 PeerGetter
}

// PeerGetter 获取到的远程节点,协助 Group 从远程节点获取缓存值
type PeerGetter interface {
Get(group string, key string) ([]byte, error) // 获取缓存值
}

Group 只需要拿着口变量调用接口的方法,就可以完成整体逻辑。

至于这两个接口是谁来实现,用什么协议实现,Group 并不关心,我就规定了这几个方法,谁实现了我用谁完成。所以可能是 HTTPPool 用 HTTP 协议实现的,也可能是 HTTPPool 用 protobuf 实现的。

就比如你是一个 leader,你有一项工作,你分为part 1、part2,你只需要说出你最终想要什么。你也不管谁来做,具体怎么做。比如张三李四说他们有能力做,那你要做的就是合并张三李四成果,向上汇报就行。

这样做有什么好处?领导轻松

  • 业务与实现分离,自己做自己的事情:

    • Group 调用各种方法,完成业务逻辑。
    • HTTPPool 实现 Group 规定的两个方法。
  • 依赖倒置,Group 不依赖 HTTPPool,HTTPPool 变成可插拔的了,方便自定义修改。(也就是上面例子里,把张三李四换成王五,让王五来干)

如何实现 Group 访问远程节点的逻辑

  • 首先将 PeerPicker 接口变量,放在 Group 结构体中,并提供注入方法,便于 Group 后续调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// geecache/geecache.go

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string
getter Getter
mainCache cache

peers PeerPicker // 用于选择远程节点
}

// RegisterPeers 将实现了 PeerPicker 接口的变量注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
  • 在查找缓存时,仅在 load 函数里增加去远程节点查找的逻辑。

  • 去远程节点查找的逻辑更为简单。(就像领导直接安排张三李四干活一样)

    • 先调用 PeerPicker.PickPeer 找到远程节点,并拿到访问这个节点的客户端。
    • 再调用 PeerGetter.Get 给远程节点发送请求,获取缓存值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// geecache/geecache.go

// load: 缓存不存在时,先在远程节点中查找,未果再调用 getLocally 获取数据
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {

// 新增逻辑
if peer, ok := g.peers.PickPeer(key); ok { // 找到 key 对应的远程节点。
value, err := g.getFromPeer(peer, key) // 在远程节点中查找
if err == nil {
return value, nil
}
log.Println("[GeeCache] failed to get from peer", err)
}

}
return g.getLocally(key)
}

// 从远程节点获取数据,传入的是 PeerGetter 接口类型,只要实现了 Get 方法,就可以传入
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{bytes}, nil
}

ok,至此,Group 的工作就做完了,只管调用 PeerPicker.PickPeerPeerGetter.Get 的方法,这两件事谁来干不知道,具体怎么干不知道,十分简洁优雅。

谁来完成具体的工作呢?我们把这个重任交给第三章中出现的 HTTPPool,让 HTTPPool 实现 PeerPicker.PickPeer 接口,用户就可以调用 RegisterPeers,把 HTTPPool 注入 Group 了。

为什么是 HTTPPool 来实现?

  • 统一网络通信层:第三章中 HTTPPool 主要是作为服务端,提供获取本机缓存的 HTTP 服务;那同样他也可以作为客户端,可以发送 HTTP 请求到其他节点。

HTTPPool 如何实现 PeerPicker 接口

实现 PickPeer 方法就实现了这个接口,也就是实现通过 key 查找远程节点的功能。

  • 要查找远程节点,则需要将一致性哈希环集成到内部。

  • 找到节点后,要可以请求访问,则需要保存各节点 IP/Port。

如何将一致性哈希加入 HTTPPool

  • 增加 sync.Mutex 锁。为什么要加锁?

  • 增加一致哈希 peers,保存所有节点。

  • 增加 map,将节点和节点的 ip/port 映射起来,知道访问那个 IP 去查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// geecache/http.go

// HTTPPool :包含服务端和客户端,服务端提供获取本机数据的 HTTP 服务,客户端提供访问其他节点的方法
type HTTPPool struct {
self string // 当前节点的 IP/端口
basePath string

// 增加能力:设置和获取远程节点的能力
mu sync.Mutex
peers *consistenthash.Map // 一致性哈希
httpGetters map[string]*httpGetter // 记录每个远程节点的 httpGetter,httpGetter 包含了 baseURL
}

// httpGetter 客户端:保存节点的 IP/Port
type httpGetter struct {
baseURL string
}

HTTPPool 能做的,也就是对 consistenthash.Map 能力进行封装,为自己所用:

  • HTTPPool.Set 初始化一致性哈希 peers,把所有节点都加到哈希环里,并为每个节点初始化 HTTP 客户端 httpGetter(其实就是保存了 IP/Port 而已)。

  • HTTPPool.PickPeer 通过 key,在一致性哈希环上,查找 key 所属的节点,返回这个节点的 IP/Port,我们将 IP/Port 保存在客户端 httpGetter,所以直接返回 httpGetter 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// geecache/http.go

// Set 设置远程节点的能力:实例化一个一致性哈希,并向哈希环中添加节点。
func (h *HTTPPool) Set(peers ...string) {
h.mu.Lock()
defer h.mu.Unlock()

h.peers = consistenthash.New(defaultReplicas, nil) // 初始化哈希环
h.peers.Add(peers...) // 将节点加到哈希环上

// 保存 key 和对应的 httpGetter 到 map 字段 httpGetters 中
h.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
h.httpGetters[peer] = &httpGetter{
baseURL: peer + h.basePath,
}
}
}

// PickPeer 获取远程节点的能力:包装了一致性哈希获取真实节点的方法 consistenthash.Map.Get
func (h *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
h.mu.Lock()
defer h.mu.Unlock()

// 从哈希环上获取缓存值属于那个节点,如果不是空且不是自身,则返回对应的节点。
if peer := h.peers.Get(key); peer != "" && peer != h.self {
h.Log("Pick peer %s", peer)
return h.httpGetters[peer], true
}
return nil, false
}

为什么要返回客户端 httpGetter 实例,而不是直接的 IP/Port,让 Group 拿到 IP/Port 自己去发 HTTP 请求呢?

和上面,Group 为什么设计 PeerPicker 和 PeerGetter 接口一样。因为我们要做到 HTTPPool 和 Group 解耦,实现与业务分离。

  • HTTPPool 负责管理所有节点信息,根据节点信息提供 HTTP 服务、访问 HTTP 服务。

  • Group 只管调用 PeerPicker.PickPeerPeerGetter.Get 的方法,去完成业务逻辑即可。

所以,我们还需为 httpGetter 写一个发送请求的方法,让 httpGetter 实现 PeerGetter 接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// geecache/http.go

// Get 步骤:
// 1.将 baseURL、group、key 拼接为远程节点缓存值的访问地址 URL
// 2.访问 URL 获取缓存值返回
func (g *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf("%v%v/%v", g.baseURL, url.QueryEscape(group), url.QueryEscape(key))
/* url.QueryEscape 是 URL 转义函数,
比如 http://123.com/123?image=http://images.com/cat.png
需要转义为 http://123.com/123?image=http%3A%2F%2Fimages.com%2Fcat.png
*/

res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned %v", res.Status)
}

bytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body %v", err)
}

return bytes, nil
}

如何验证功能

这个 main 函数看着很绕,其实逻辑是这样:

  • 8001 - 8003 端口模拟三个的主机节点。

  • 端口 9999 相当于 8001 节点机器供用户交互的网关,每个节点都有与用户交互的网关,这里演示只启动 8001 节点的 9999 端口用于交互。

  • main 做的就是初始化 Group;初始化 HTTPPool;将 HTTPPool 注册到 Group 中;启动一个 api 服务监听 9999 端口。

graph LR
    A[查询请求] --> B[API-端口 9999  +  分布式节点一-端口 8001]
    B --> D[分布式节点二-端口 8002]
    B -->|经过查询 key 在节点三| E[分布式节点三-端口 8003]
    E --> F[返回缓存值]

具体实现不细描述,跑起来就明白了。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
.
├── geecache
│   ├── byteview.go
│   ├── cache.go
│   ├── consistenthash
│   │   ├── consistent_hash.go
│   │   └── consistent_hash_test.go
│   ├── geecache.go # 增加与远程节点交互的业务逻辑
│   ├── geecache_test.go
│   ├── http.go # 增加与远程节点交互的实现逻辑
│   └── lru
│   ├── lru.go
│   └── lru_test.go
├── go.mod
└── main.go # 增加与远程节点交互的 demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package geecache

import (
"fmt"
"log"
"sync"
)

// Getter 用于从各类的外部数据源获取数据
type Getter interface {
Get(key string) ([]byte, error)
}

// GetterFunc 为了便于使用者传入匿名函数到 Getter 中
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string
getter Getter
mainCache cache

peers PeerPicker // 用于选择远程节点
}

var (
groupMu sync.RWMutex
groups = make(map[string]*Group)
)

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}

groupMu.Lock()
defer groupMu.Unlock()

g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
}
groups[name] = g
return g
}

func GetGroup(name string) *Group {
groupMu.RLock()
g := groups[name]
groupMu.RUnlock()
return g
}

/*

(Get)接收 key --> 检查是否被缓存 -----> 返回缓存值 (1)
| 否 是
|-----> (load)是否应当从远程节点获取 -----> (getFromPeer)与远程节点交互 --> 返回缓存值 (2)
| 否
|-----> (getLocally)调用`回调函数`,获取值,(populateCache)并添加到缓存 --> 返回缓存值 (3)
Get 完成流程(1)
load + getLocally 完成流程(3)
*/

func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}

// 缓存没有命中,就从其他数据源中获取
return g.load(key)
}

// load: 缓存不存在时,先在远程节点中查找,未果再调用 getLocally 获取数据
func (g *Group) load(key string) (ByteView, error) {
if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok { // 找到 key 对应的远程节点。
value, err := g.getFromPeer(peer, key) // 在远程节点中查找
if err == nil {
return value, nil
}
log.Println("[GeeCache] failed to get from peer", err)
}
}
return g.getLocally(key)
}

// 从远程节点获取数据,传入的是 PeerGetter 接口类型,只要实现了 Get 方法,就可以传入
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{bytes}, nil
}

// getLocally 调用用户的回调函数 g.getter.Get,获取数据,并使用 populateCache 添加数据
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}

// populateCache 将获取到的数据添加到缓存中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

// RegisterPeers 将实现了 PeerPicker 接口的变量注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}

/*
使用一致性哈希选择节点 是 是
|-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。
*/

// PeerGetter 获取到的远程节点,协助 Group 从远程节点获取缓存值
type PeerGetter interface {
Get(group string, key string) ([]byte, error) // 获取缓存值
}

// PeerPicker 协助 Group 通过 key 选择远程节点
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool) // 根据 key 选择节点 PeerGetter
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package geecache

import (
"fmt"
"geecache/geecache/consistenthash"
"io"
"log"
"net/http"
"net/url"
"strings"
"sync"
)

const (
defaultBasePath = "/_geecache/" // 请求路径应该是 "/<basepath>/<groupname>/<key>"
defaultReplicas = 3
)

// HTTPPool :包含服务端和客户端,服务端提供获取本机数据的 HTTP 服务,客户端提供访问其他节点的方法
type HTTPPool struct {
self string // 当前节点的 IP/端口
basePath string

// 增加能力:设置和获取远程节点的能力
mu sync.Mutex
peers *consistenthash.Map // 一致性哈希
httpGetters map[string]*httpGetter // 记录每个远程节点的 httpGetter,httpGetter 包含了 baseURL
}

func NewHTTPPool(self string) *HTTPPool { // 为什么要设置这两个字段
return &HTTPPool{
self: self, // 本机的IP/端口
basePath: defaultBasePath, // 请求前缀,便于过滤请求
}
}

func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, h.basePath) {
panic("Error path: " + r.URL.Path)
}

h.Log("%s %s", r.Method, r.URL.Path)

parts := strings.SplitN(r.URL.Path[len(defaultBasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.Write(view.ByteSlice())
}

func (h *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", h.self, fmt.Sprintf(format, v...))
}

// Set 设置远程节点的能力:实例化一个一致性哈希,并向哈希环中添加节点。
func (h *HTTPPool) Set(peers ...string) {
h.mu.Lock()
defer h.mu.Unlock()

h.peers = consistenthash.New(defaultReplicas, nil) // 初始化哈希环
h.peers.Add(peers...) // 将节点加到哈希环上

// 保存 key 和对应的 httpGetter 到 map 字段 httpGetters 中
h.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
h.httpGetters[peer] = &httpGetter{
baseURL: peer + h.basePath,
}
}
}

// PickPeer 获取远程节点的能力:包装了一致性哈希获取真实节点的方法 consistenthash.Map.Get
func (h *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
h.mu.Lock()
defer h.mu.Unlock()

// 从哈希环上获取缓存值属于那个节点,如果不是空且不是自身,则返回对应的节点。
if peer := h.peers.Get(key); peer != "" && peer != h.self {
h.Log("Pick peer %s", peer)
return h.httpGetters[peer], true
}
return nil, false
}

// httpGetter 客户端:获取数据,实现 PeerGetter 接口
type httpGetter struct {
baseURL string
}

// Get 步骤:
// 1.将 baseURL、group、key 拼接为远程节点缓存值的访问地址 URL
// 2.访问 URL 获取缓存值返回
func (g *httpGetter) Get(group string, key string) ([]byte, error) {
u := fmt.Sprintf("%v%v/%v", g.baseURL, url.QueryEscape(group), url.QueryEscape(key))
/* url.QueryEscape 是 URL 转义函数,
比如 http://123.com/123?image=http://images.com/cat.png
需要转义为 http://123.com/123?image=http%3A%2F%2Fimages.com%2Fcat.png
*/

res, err := http.Get(u)
if err != nil {
return nil, err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned %v", res.Status)
}

bytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading response body %v", err)
}

return bytes, nil
}

var _ PeerGetter = (*httpGetter)(nil) // 保证 httpGetter 实现了 PeerGetter。没实现在编译时报错。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main

import (
"flag"
"fmt"
"geecache/geecache"
"log"
"net/http"
)

// 模拟回源的数据库
var db = map[string]string{
"Tom": "630",
"Jack": "589",
"Sam": "567",
}

func createGroup() *geecache.Group {
return geecache.NewGroup("scores", 2<<10,
geecache.GetterFunc(func(key string) ([]byte, error) { // 回源函数
log.Println("[SlowDB] search key", key)
if v, ok := db[key]; ok {
return []byte(v), nil
}
return nil, fmt.Errorf("%s not exist", key)
}))
}

// startCacheServer 每个节点启动一个 HTTP 服务器,用于接收请求,返回缓存值。
func startCacheServer(addr string, addrs []string, gee *geecache.Group) {
peers := geecache.NewHTTPPool(addr) // 初始化一个 HTTPPool 实例
peers.Set(addrs...) // 将所有节点的地址加入到 HTTPPool 中
gee.RegisterPeers(peers)
log.Println("geecache is running at", addr)
log.Fatal(http.ListenAndServe(addr[7:], peers)) // 监听端口,让 NewHTTPPool 的 ServeHTTP 方法处理请求
}

// startAPIServer 启动一个 API 服务,用于测试。接收请求,根据 key 查找对应节点,访问节点,返回结果。
func startAPIServer(apiAddr string, gee *geecache.Group) {
http.Handle("/api", http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
key := r.URL.Query().Get("key")
view, err := gee.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Write(view.ByteSlice())

}))
log.Println("fontend server is running at", apiAddr)
log.Fatal(http.ListenAndServe(apiAddr[7:], nil))

}

func main() {
var port int
var api bool
flag.IntVar(&port, "port", 8001, "Geecache server port")
flag.BoolVar(&api, "api", false, "Start a api server?")
flag.Parse()

apiAddr := "http://localhost:9999"

// 所有分布式节点的地址,包括本机
addrMap := map[int]string{
8001: "http://localhost:8001",
8002: "http://localhost:8002",
8003: "http://localhost:8003",
}
var addrs []string
for _, v := range addrMap {
addrs = append(addrs, v)
}

gee := createGroup()

// 启动一个 API 服务,用于测试。接收请求,根据 key 查找对应节点,访问节点,返回结果。
if api {
go startAPIServer(apiAddr, gee)
}

startCacheServer(addrMap[port], []string(addrs), gee)
}

6.防止缓存击穿

  • 缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。

  • 缓存击穿:一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。

  • 缓存穿透:查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。

延伸阅读:golang 中 singleflight 包的实现:singleflight.go

如何防止缓存击穿

这一章节可写的不多,主要是一个思想:

  • 对于短时间内的大量并发的同种请求,只响应第一个。

  • 其他的请求等待第一个请求完毕,直接使用第一个请求的结果。

值得学习的地方:

  • singleflight 的实现,使用代理模式,load 中查询远程节点的逻辑,全部交给了 singleflight.Do 这个方法进行封装,有点类似中间件的工作模式。

  • 仅需少量代码就可以完成对于每个请求的控制,简单优雅。

什么时候应该加锁

  • 在读写 g.m[key] 前均需加锁。但是互斥锁效率太低,下面改成读写锁。

  • 并对于懒汉单例模式,使用双检锁来优化性能。

    • 第一次检查(无锁)
      减少锁竞争。大多数情况下实例已存在,无需加锁直接进入后续业务。
    • 第二次检查(加锁)
      防止多个协程同时通过第一次检查后,在锁内重复创建实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {

if g.m == nil { // 第一次不加锁,因为 g.m 的概率小
g.mu.Lock() // 如果 g.m == nil,说明是第一次,需要加锁,避免其他并发也进来创建 g.m
if g.m == nil {
g.m = make(map[string]*call) // 懒加载
}
g.mu.Unlock()
}

g.mu.RLock()
if c, ok := g.m[key]; ok {
g.mu.RUnlock() // 读 g.m 后解锁
c.wg.Wait()
return c.val, c.err
}
g.mu.RUnlock() // 读 g.m 后解锁

c := new(call)
c.wg.Add(1) // 记录同步的事件数量,其他并发请求阻塞等待当前请求结束
g.mu.Lock() // 写 g.m 前加锁
g.m[key] = c
g.mu.Unlock() // 写 g.m 后解锁

c.val, c.err = fn() // 调用 fn,发起请求
c.wg.Done() // 结束请求

g.mu.Lock() // 写 g.m 前加锁
delete(g.m, key) // 更新 g.m
g.mu.Unlock() // 写 g.m 后解锁

return c.val, c.err // 返回结果
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
.
├── geecache
│   ├── byteview.go
│   ├── cache.go
│   ├── consistenthash
│   │   ├── consistent_hash.go
│   │   └── consistent_hash_test.go
│   ├── geecache.go # 增加 singleflight
│   ├── geecache_test.go
│   ├── http.go
│   └── lru
│   │ ├── lru.go
│   │ └── lru_test.go
│   └── singleflight
│   └── singleflight.go # 新增对并发请求的控制
├── go.mod
└── main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package singleflight

import "sync"

type call struct {
wg sync.WaitGroup
val interface{}
err error
}

type Group struct {
mu sync.RWMutex
m map[string]*call
}

func (g *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {

if g.m == nil { // 第一次不加锁,因为 g.m 的概率小
g.mu.Lock() // 如果 g.m == nil,说明是第一次,需要加锁,避免其他并发也进来创建 g.m
if g.m == nil {
g.m = make(map[string]*call) // 懒加载
}
g.mu.Unlock()
}

g.mu.RLock()
if c, ok := g.m[key]; ok {
g.mu.RUnlock() // 读 g.m 后解锁
c.wg.Wait()
return c.val, c.err
}
g.mu.RUnlock() // 读 g.m 后解锁

c := new(call)
c.wg.Add(1) // 记录同步的事件数量,其他并发请求阻塞等待当前请求结束
g.mu.Lock() // 写 g.m 前加锁
g.m[key] = c
g.mu.Unlock() // 写 g.m 后解锁

c.val, c.err = fn() // 调用 fn,发起请求
c.wg.Done() // 结束请求

g.mu.Lock() // 写 g.m 前加锁
delete(g.m, key) // 更新 g.m
g.mu.Unlock() // 写 g.m 后解锁

return c.val, c.err // 返回结果
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package geecache

import (
"fmt"
"geecache/geecache/singleflight"
"log"
"sync"
)

// Getter 用于从各类的外部数据源获取数据
type Getter interface {
Get(key string) ([]byte, error)
}

// GetterFunc 为了便于使用者传入匿名函数到 Getter 中
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker

loader *singleflight.Group
}

var (
groupMu sync.RWMutex
groups = make(map[string]*Group)
)

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}

groupMu.Lock()
defer groupMu.Unlock()

g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
loader: &singleflight.Group{},
}
groups[name] = g
return g
}

func GetGroup(name string) *Group {
groupMu.RLock()
g := groups[name]
groupMu.RUnlock()
return g
}

/*

(Get)接收 key --> 检查是否被缓存 -----> 返回缓存值 (1)
| 否 是
|-----> (load)是否应当从远程节点获取 -----> (getFromPeer)与远程节点交互 --> 返回缓存值 (2)
| 否
|-----> (getLocally)调用`回调函数`,获取值,(populateCache)并添加到缓存 --> 返回缓存值 (3)
Get 完成流程(1)
load + getLocally 完成流程(3)
*/

func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}

// 缓存没有命中,就从其他数据源中获取
return g.load(key)
}

// load: 缓存不存在时,先在远程节点中查找,未果再调用 getLocally 获取数据
func (g *Group) load(key string) (ByteView, error) {
val, err := g.loader.Do(key, func() (interface{}, error) {

if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok { // 找到 key 对应的远程节点。
value, err := g.getFromPeer(peer, key) // 在远程节点中查找
if err == nil {
return value, nil
}
log.Println("[GeeCache] failed to get from peer", err)
}
}
return g.getLocally(key)
})

if err == nil {
return val.(ByteView), nil
}
return ByteView{}, err
}

// 从远程节点获取数据,传入的是 PeerGetter 接口类型,只要实现了 Get 方法,就可以传入
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
bytes, err := peer.Get(g.name, key)
if err != nil {
return ByteView{}, err
}
return ByteView{bytes}, nil
}

// getLocally 调用用户的回调函数 g.getter.Get,获取数据,并使用 populateCache 添加数据
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}

// populateCache 将获取到的数据添加到缓存中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

// RegisterPeers 将实现了 PeerPicker 接口的变量注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}

/*
使用一致性哈希选择节点 是 是
|-----> 是否是远程节点 -----> HTTP 客户端访问远程节点 --> 成功?-----> 服务端返回返回值
| 否 ↓ 否
|----------------------------> 回退到本地节点处理。
*/

// PeerGetter 获取到的远程节点,协助 Group 从远程节点获取缓存值
type PeerGetter interface {
Get(group string, key string) ([]byte, error) // 获取缓存值
}

// PeerPicker 协助 Group 通过 key 选择远程节点
type PeerPicker interface {
PickPeer(key string) (peer PeerGetter, ok bool) // 根据 key 选择节点 PeerGetter
}

7.使用 Protobuf 编码

使用 protobuf 编码需要做哪些更改

(1)原来逻辑

客户端:

  1. 拿到 group、key 构造 url

  2. 使用 http.Get 发送请求

服务端:

  1. 处理请求,将结果 view 返回

客户端:

  1. 接收服务端结果,直接使用

(2)使用 protobuf 编码后的逻辑

客户端:

  1. 拿到 group、key 构造 url

  2. 使用 http.Get 发送请求

服务端:

  1. 处理请求,将结果 view 使用 protobuf 编码(变更)

客户端:

  1. 接收服务端结果,使用 protobuf 解码。(变更)

如何实现

  1. 创建 proto 文件,定义 Request 和 Response。

  2. 更改 PeerGetter.Get 参数,改为 Request 和 Response。

  3. 在处理请求和接收数据时,使用 protobuf 进行编解码。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
.
├── geecache
│   ├── byteview.go
│   ├── cache.go
│   ├── consistenthash
│   │   ├── consistent_hash.go
│   │   └── consistent_hash_test.go
│   ├── geecache.go # getFromPeer 修改传参方式。
│   ├── geecachepb # protobuf 编解码
│   │   ├── geecachepb.pb.go
│   │   └── geecachepb.proto
│   ├── geecache_test.go
│   ├── geers.go # 新增文件存放 PeerGetter、PeerPicker 接口
│   ├── http.go # 新增在处理请求和接收数据时,使用 protobuf 进行编解码。
│   └── lru
│   │ ├── lru.go
│   │ └── lru_test.go
│   └── singleflight
│   └── singleflight.go
├── go.mod
└── main.go
1
2
3
4
5
6
7
8
9
10
11
12
syntax = "proto3";

option go_package = ".;geecachepb";

message Request {
string group = 1;
string key = 2;
}

message Response {
bytes value = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package geecache

import (
"fmt"
"geecache/geecache/geecachepb"
"geecache/geecache/singleflight"
"log"
"sync"
)

// Getter 用于从各类的外部数据源获取数据
type Getter interface {
Get(key string) ([]byte, error)
}

// GetterFunc 为了便于使用者传入匿名函数到 Getter 中
type GetterFunc func(key string) ([]byte, error)

func (f GetterFunc) Get(key string) ([]byte, error) {
return f(key)
}

// Group 负责与用户交互(获取缓存值),并拥有从外部数据源获取值并存储在缓存中的功能
type Group struct {
name string
getter Getter
mainCache cache
peers PeerPicker

loader *singleflight.Group
}

var (
groupMu sync.RWMutex
groups = make(map[string]*Group)
)

func NewGroup(name string, cacheBytes int64, getter Getter) *Group {
if getter == nil {
panic("nil Getter")
}

groupMu.Lock()
defer groupMu.Unlock()

g := &Group{
name: name,
getter: getter,
mainCache: cache{cacheBytes: cacheBytes},
loader: &singleflight.Group{},
}
groups[name] = g
return g
}

func GetGroup(name string) *Group {
groupMu.RLock()
g := groups[name]
groupMu.RUnlock()
return g
}

/*

(Get)接收 key --> 检查是否被缓存 -----> 返回缓存值 (1)
| 否 是
|-----> (load)是否应当从远程节点获取 -----> (getFromPeer)与远程节点交互 --> 返回缓存值 (2)
| 否
|-----> (getLocally)调用`回调函数`,获取值,(populateCache)并添加到缓存 --> 返回缓存值 (3)
Get 完成流程(1)
load + getLocally 完成流程(3)
*/

func (g *Group) Get(key string) (ByteView, error) {
if key == "" {
return ByteView{}, fmt.Errorf("key is required")
}

if v, ok := g.mainCache.get(key); ok {
log.Println("[GeeCache] hit")
return v, nil
}

// 缓存没有命中,就从其他数据源中获取
return g.load(key)
}

// load: 缓存不存在时,先在远程节点中查找,未果再调用 getLocally 获取数据
func (g *Group) load(key string) (ByteView, error) {
val, err := g.loader.Do(key, func() (interface{}, error) {

if g.peers != nil {
if peer, ok := g.peers.PickPeer(key); ok { // 找到 key 对应的远程节点。
value, err := g.getFromPeer(peer, key) // 在远程节点中查找
if err == nil {
return value, nil
}
log.Println("[GeeCache] failed to get from peer", err)
}
}
return g.getLocally(key)
})

if err == nil {
return val.(ByteView), nil
}
return ByteView{}, err
}

// 从远程节点获取数据,传入的是 PeerGetter 接口类型,只要实现了 Get 方法,就可以传入
func (g *Group) getFromPeer(peer PeerGetter, key string) (ByteView, error) {
req := &geecachepb.Request{
Group: g.name,
Key: key,
}
res := &geecachepb.Response{}

err := peer.Get(req, res)
if err != nil {
return ByteView{}, err
}
return ByteView{res.Value}, nil
}

// getLocally 调用用户的回调函数 g.getter.Get,获取数据,并使用 populateCache 添加数据
func (g *Group) getLocally(key string) (ByteView, error) {
bytes, err := g.getter.Get(key)
if err != nil {
return ByteView{}, err
}
value := ByteView{cloneBytes(bytes)}
g.populateCache(key, value)
return value, nil
}

// populateCache 将获取到的数据添加到缓存中
func (g *Group) populateCache(key string, value ByteView) {
g.mainCache.add(key, value)
}

// RegisterPeers 将实现了 PeerPicker 接口的变量注入到 Group 中
func (g *Group) RegisterPeers(peers PeerPicker) {
if g.peers != nil {
panic("RegisterPeerPicker called more than once")
}
g.peers = peers
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package geecache

import (
"fmt"
"geecache/geecache/consistenthash"
"geecache/geecache/geecachepb"
"io"
"log"
"net/http"
"net/url"
"strings"
"sync"

"google.golang.org/protobuf/proto"
)

const (
defaultBasePath = "/_geecache/" // 请求路径应该是 "/<basepath>/<groupname>/<key>"
defaultReplicas = 3
)

// HTTPPool :包含服务端和客户端,服务端提供获取本机数据的 HTTP 服务,客户端提供访问其他节点的方法
type HTTPPool struct {
self string // 当前节点的 IP/端口
basePath string

// 增加能力:设置和获取远程节点的能力
mu sync.Mutex
peers *consistenthash.Map // 一致性哈希
httpGetters map[string]*httpGetter // 记录每个远程节点的 httpGetter,httpGetter 包含了 baseURL
}

func NewHTTPPool(self string) *HTTPPool { // 为什么要设置这两个字段
return &HTTPPool{
self: self, // 本机的IP/端口
basePath: defaultBasePath, // 请求前缀,便于过滤请求
}
}

func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !strings.HasPrefix(r.URL.Path, h.basePath) {
panic("Error path: " + r.URL.Path)
}

h.Log("%s %s", r.Method, r.URL.Path)

parts := strings.SplitN(r.URL.Path[len(defaultBasePath):], "/", 2)
if len(parts) != 2 {
http.Error(w, "bad request", http.StatusBadRequest)
return
}

groupName := parts[0]
key := parts[1]
group := GetGroup(groupName)
if group == nil {
http.Error(w, "no such group: "+groupName, http.StatusNotFound)
return
}
view, err := group.Get(key)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

res := &geecachepb.Response{
Value: view.ByteSlice(),
}
body, err := proto.Marshal(res)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.Write(body)
}

func (h *HTTPPool) Log(format string, v ...interface{}) {
log.Printf("[Server %s] %s", h.self, fmt.Sprintf(format, v...))
}

// Set 设置远程节点的能力:实例化一个一致性哈希,并向哈希环中添加节点。
func (h *HTTPPool) Set(peers ...string) {
h.mu.Lock()
defer h.mu.Unlock()

h.peers = consistenthash.New(defaultReplicas, nil) // 初始化哈希环
h.peers.Add(peers...) // 将节点加到哈希环上

// 保存 key 和对应的 httpGetter 到 map 字段 httpGetters 中
h.httpGetters = make(map[string]*httpGetter, len(peers))
for _, peer := range peers {
h.httpGetters[peer] = &httpGetter{
baseURL: peer + h.basePath,
}
}
}

// PickPeer 获取远程节点的能力:包装了一致性哈希获取真实节点的方法 consistenthash.Map.Get
func (h *HTTPPool) PickPeer(key string) (PeerGetter, bool) {
h.mu.Lock()
defer h.mu.Unlock()

// 从哈希环上获取缓存值属于那个节点,如果不是空且不是自身,则返回对应的节点。
if peer := h.peers.Get(key); peer != "" && peer != h.self {
h.Log("Pick peer %s", peer)
return h.httpGetters[peer], true
}
return nil, false
}

// httpGetter 客户端:获取数据,实现 PeerGetter 接口
type httpGetter struct {
baseURL string
}

// Get 步骤:
// 1.将 baseURL、group、key 拼接为远程节点缓存值的访问地址 URL
// 2.访问 URL 获取缓存值返回
func (g *httpGetter) Get(in *geecachepb.Request, out *geecachepb.Response) error {
group := in.Group
key := in.Key

u := fmt.Sprintf("%v%v/%v", g.baseURL, url.QueryEscape(group), url.QueryEscape(key))
/* url.QueryEscape 是 URL 转义函数,
比如 http://123.com/123?image=http://images.com/cat.png
需要转义为 http://123.com/123?image=http%3A%2F%2Fimages.com%2Fcat.png
*/

res, err := http.Get(u)
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return fmt.Errorf("server returned %v", res.Status)
}

bytes, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("reading response body %v", err)
}

err = proto.Unmarshal(bytes, out)
if err != nil {
return err
}

return nil
}

var _ PeerGetter = (*httpGetter)(nil) // 保证 httpGetter 实现了 PeerGetter。没实现在编译时报错。

从零实现系列|分布式缓存
https://www.aimtao.net/7days-cache/
Posted on
2023-05-13
Licensed under