本文逐步实现了一个分布式缓存系统,涵盖LRU缓存淘汰策略、单机并发控制、HTTP服务接口及一致性哈希算法。通过封装缓存值、互斥锁机制和虚拟节点映射,解决了并发安全、节点负载均衡等核心问题,构建了支持分布式部署的缓存框架。
通过回答关键问题的方式,记录阅读 geeCache 代码过程中的思考,并做出补充和改进,重点在“怎么做”的基础上,补充“为什么这么做”等逻辑。
0.序言 为什么需要 cache 优化性能:将耗时操作的结果储存起来,下次需要请求,直接使用,避免重复耗时操作。 降低延迟:静态资源缓存(CDN/浏览器缓存),提高访问速度。 转移负载:将高频访问的数据,从数据库迁移到内存中,降低后端压力。 高并发支撑:通过内存级的响应能力支持大规模的并发请求,避免数据库被击穿。 cache 要解决哪些问题 怎么存储?维护一个键值对缓存,实现 O(1) 的存取速度。 内存不够?实现一种缓存的淘汰策略。 并发冲突?对缓存的修改,实现并发保护。 单机性能差?实现可以横向扩展的分布式缓存。分布式缓存存在多个节点,节点间如何通信?需要支持 HTTP、protobuf。 如何用 O(1) 的速度找到 cache 对应的节点?使用一致性哈希来选择节点,实现负载均衡。 1.缓存淘汰策略 介绍三种常见的缓存淘汰算法 缓存储存在内存中,内存是有限的,对于没用的数据就需要移出。三种常见的缓存淘汰算法。
FIFO(First In First Out)
新进先出,按照时间顺序,优先淘汰最早添加的数据。 实现:维护一个先进先出队列,新数据插入队尾,淘汰时移除队首数据。 缺点:无法感知访问频率,即使高频访问的数据,若进入缓存较早仍会被淘汰,可能导致缓存命中率下降。 LFU(Least Frequently Used)
最少使用,按照访问频率,优先淘汰访问次数最少的数据。 实现:维护一个按照访问次数排序的最小堆/双层链表。每次访问,访问次数+1,重新排序。淘汰时,直接淘汰队首访问次数最少的即可。 优点:缓存命中率高。 缺点:维护成本高:每次访问都需要更新计数,并重新排序。 受历史数据影响大:早期高频但后期失效的数据(比如过时的热点新闻)难以被淘汰。(解决:定期衰减历史计数。) LRU(Least Recently Used)
基于时间局部性原理,认为最近被访问过的,更有可能被再次访问。那么最近没使用过的数据,就是要淘汰的数据。 实现:维护一个队列,某个数据被访问,就将数据移至队尾,这样队首则是最近最少被访问的数据。 优点:FIFO 考虑时间因素,LFU 考虑访问频率,LRU 既考虑时间,也考虑频次(多次访问,意味着多次被移到队尾)。 如何设计 cache cache 数据结构设计思路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 type Cache struct { maxBytes int64 nBytes int64 ll *list.List cache map [string ]*list.Element OnEvicted func (key string , value Value) }type entry struct { key string value Value }type Value interface { Len() int }func New (maxBytes int64 , onEvicted func (key string , value Value) ) *Cache { return &Cache{ maxBytes: maxBytes, ll: list.New(), cache: make (map [string ]*list.Element), OnEvicted: onEvicted, } }
为什么 value 不直接放在链表里,要放在 entry 结构体里? 淘汰缓存时,可以直接从 entry 结构体中拿到 key,用来删除哈希表中的映射。 为什么要新建一个 Value 接口类型,并包含 Len
方法? 缓存的数据可以什么格式都有,用接口变量储存最好,即 interface{}
。 因为整个 cache 是有大小的,所有有需要查询所有缓存数据的大小。所以在 interface{}
中增加一个 Len
方法,只有实现了这个方法的变量,才能作为接口类型 Value 的变量储存在 cache 中。 下面举个例子,如歌 entry 的 value 变量需要储存 String 变量,String 需要主动实现 Len 方法,也就是实现 Value 接口。 1 2 3 4 5 type String string func (s String) Len() int { return len (s) }
如何实现 cache 增删改查 增加
检查是否存在,存在即更新,不存在则增加缓存 维护缓存大小,超过最大值需要淘汰缓存。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (c *Cache) Add(key string , value Value) { if _, ok := c.cache[key]; ok { c.Update(key, value) } else { ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.nBytes += int64 (len (key)) + int64 (value.Len()) for c.maxBytes != 0 && c.maxBytes < c.nBytes { c.RemoveOldest() } } }
删除
链表删除尾部节点,map 删除 key,更新 cache 总体大小,调用回调函数(回调函数是初始化时传入的)。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (c *Cache) RemoveOldest() { ele := c.ll.Back() if ele != nil { c.ll.Remove(ele) kv := ele.Value.(*entry) delete (c.cache, kv.key) c.nBytes -= int64 (len (kv.key)) + int64 (kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key, kv.value) } } }
修改
查询是否存在,将节点移至首部(表示频率),更新 cache 总体大小,更新节点的缓存值。 注意:更新缓存还需要看缓存是否超过最大容量,如果超过需要淘汰缓存,下面代码暂时不写这个逻辑了。 1 2 3 4 5 6 7 8 9 10 11 12 13 func (c *Cache) Update(key string , value Value) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) c.nBytes += int64 (value.Len()) - int64 (kv.value.Len()) kv.value = value } else { c.Add(key, value) } }
查询
查询是否存在,将节点移至首部(表示频率),返回节点的缓存值。 1 2 3 4 5 6 7 8 9 10 11 func (c *Cache) Get(key string ) (value Value, ok bool ) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) return kv.value, true } return }
ele.Value.(*entry)
的写法?ele 是 *list.Element 类型的变量,表示链表的一个节点。Element 变量中有 Value 变量,Value 变量类型是 any。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package listtype Element struct { next, prev *Element list *List Value any }
Element.Value 变量中存储的,实际上是我们的 entry 类型的结构体变量的指针。 这里写法是断言,将 any 类型的 Element.Value 变量还原为 *entry 类型变量。 完整代码 1 2 3 4 5 version_1_LRU[geecache] ├── go.mod └── lru ├── lru.go └── lru_test.go
lru/lru.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package lruimport "container/list" type Cache struct { maxBytes int64 nBytes int64 ll *list.List cache map [string ]*list.Element OnEvicted func (key string , value Value) }type entry struct { key string value Value }type Value interface { Len() int }func New (maxBytes int64 , onEvicted func (key string , value Value) ) *Cache { return &Cache{ maxBytes: maxBytes, ll: list.New(), cache: make (map [string ]*list.Element), OnEvicted: onEvicted, } }func (c *Cache) Add(key string , value Value) { if _, ok := c.cache[key]; ok { c.Update(key, value) } else { ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.nBytes += int64 (len (key)) + int64 (value.Len()) for c.maxBytes != 0 && c.maxBytes < c.nBytes { c.RemoveOldest() } } }func (c *Cache) RemoveOldest() { ele := c.ll.Back() if ele != nil { c.ll.Remove(ele) kv := ele.Value.(*entry) delete (c.cache, kv.key) c.nBytes -= int64 (len (kv.key)) + int64 (kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key, kv.value) } } }func (c *Cache) Update(key string , value Value) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) c.nBytes += int64 (value.Len()) - int64 (kv.value.Len()) kv.value = value } else { c.Add(key, value) } }func (c *Cache) Get(key string ) (value Value, ok bool ) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) return kv.value, true } return }func (c *Cache) Len() int { return c.ll.Len() }
lru/lru_test.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package lruimport ( "fmt" "testing" )type String string func (s String) Len() int { return len (s) }func TestFunction (t *testing.T) { lru := New(int64 (10 ), nil ) lru.Add("k1" , String("1234" )) fmt.Println(lru.Get("k1" )) lru.RemoveOldest() fmt.Println(lru.Get("k1" )) lru.Update("k2" , String("123" )) fmt.Println(lru.Get("k2" )) }func TestAutoRemoveOldest (t *testing.T) { testData := []struct { key string value Value }{ {"k1" , String("1234567890" )}, {"k2" , String("234567890" )}, {"k3" , String("34567890" )}, } lru := New(int64 (10 ), nil ) for _, test := range testData { lru.Add(test.key, test.value) } for _, test := range testData { fmt.Println(lru.Get(test.key)) } }func TestCallbackOnEvicted (t *testing.T) { keys := make ([]string , 0 ) lru := New(10 , func (key string , value Value) { keys = append (keys, key) }) lru.Add("k1" , String("k1" )) lru.Add("k2" , String("k2" )) lru.Add("k3" , String("k3" )) lru.Add("k4" , String("k4" )) fmt.Println(keys) }
2.单机并发缓存 如何设计并发安全的 cache? 主要做两件事:加锁 + 保证数据不可变性
对互斥操作进行加锁
在 lru.Cache
的基础上,封装一个 cache 结构体,将 lru.Cache
的操作都加互斥锁。(注意不能是读写锁,因为读数据,也要移动链表节点) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 type cache struct { mu sync.Mutex lru *lru.Cache cacheBytes int64 }func (c *cache) add(key string , value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil ) } c.lru.Add(key, value) }func (c *cache) get(key string ) (value ByteView, ok bool ) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { return } if v, ok := c.lru.Get(key); ok { return v.(ByteView), ok } return }
保证数据不可变性 (只读)
上面代码中,无论是 add 还是 get 操作的数据类型都是 ByteView
,也就是说,缓存的类型被规定成了 ByteView
,这是为什么呢? 其实就是为了保证数据的不可变性, ByteView
的定义如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package geecachetype ByteView struct { b []byte }func (byteView ByteView) Len() int { return len (byteView.b) }func (byteView ByteView) ByteSlice() []byte { return cloneBytes(byteView.b) }func cloneBytes (b []byte ) []byte { c := make ([]byte , len (b)) copy (c, b) return c }func (byteView ByteView) String() string { return string (byteView.b) }
为什么要抽象 ByteView
数据类型来表示缓存值?
在 lru 中定义的 entry.value
是 Value 接口类型,所有传入的数据的类型均需要实现这个接口,也就是实现 Len
函数,比较麻烦。[]byte
可以支持任何数据类型的存储,并为 []byte
抽象为 ByteView
类型,并实现 lru.Value
接口。 保证 ByteView
是只读类型。如何保证?ByteView
的变量只有一个 b
,b
是小写,外部无法直接读取。只能通过 ByteSlice
和 String
方法获取 b
的数据ByteSlice
返回 slice 时,会拷贝一个副本再返回String 将 b 的数据强制转换为 String。(外界也无法直接修改 b 的值) 为什么 get 操作也要加锁? 想象两个场景:
cache 在 add 函数中,初始化 lru.Cache
的过程是懒加载,lru.Cache
初始化过程中,还为初始化完,如果这时候访问 Get,会导致空指针的情况。 lru.cache
在写数据时,需要移动节点到首部,如果这时候去访问,会同时移动链表节点到首部,会造成链表断裂,节点丢失。也就是说,需要保证 lru.cache
初始化的原子性,以及 lru.cache
内部操作的原子性。
为什么要延迟初始化(Lazy Initialization)? 1 2 3 4 5 6 7 8 9 10 11 12 func (c *cache) add(key string , value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil ) } c.lru.Add(key, value) }
节省内存:只有使用时才初始化,没使用不初始化。 减少初始化 cache 时的开销,加块整个程序的冷启动速度。 为什么要设计 Group? 在使用 cache 对 lru.Cache
进行封装后,又使用 Group 对 cache 进行了封装。这是为什么呢?
其实站在使用者的角度,只有一个并发安全的 cache,会存在很多问题没法解决:
不同的数据因为大小、有效期等不同,需要放到不同的 cache 实例中,如何标识不同的 cache 实例呢?需要一个名字/ID。 当前机器没有找到的数据,需要访问其他分布式节点或者回源(可能是多种数据源)。这个过程如何统一呢? 为此,我们使用分层设计的思想,再封装一层,cache
负责并发存储,Group
负责业务逻辑。
有哪些业务逻辑?
为 cache 实例标识一个命名空间。 为 cache 实例提供回源方法的接口。 注:还可以在 group 中,针对 cache 实例增加各种缓存策略。 如何设计 Group? 1 2 3 4 5 6 7 8 9 10 11 12 13 type Group struct { name string getter Getter mainCache cache }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )
通过 groups 进行全局管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }
统一管理缓存回源逻辑
首先是回源接口,
Group 只要获取不到数据,都通过 group.getter.Get
来回源。但是数据源数据有很多,可能是文件,可能是数据库。所以需要抽象一个接口。 只要实现了 Get 方法,都可以作为 Getter 接口类型变量,供 group 回源时调用。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }
有了回源接口,需要实现 group.Get 的逻辑,参考这个流程图。
graph LR
A[接收 key] --> B[是否被缓存?]
B -->|是| C[返回缓存值-步骤1]
B -->|否| D[是否应从远程节点获取?]
D -->|是| E[与远程节点交互]
E --> F[返回缓存值-步骤2]
D -->|否| G[调用回调函数]
G --> H[获取值并添加到缓存]
H --> I[返回缓存值-步骤3]
Get 完成步骤1 load + getLocally 完成步骤3 步骤二在第5章节《分布式节点》中实现。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { return g.getLocally(key) }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }
完整代码 1 2 3 4 5 6 7 8 9 10 version_2_singal_node[geecache] ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── geecache.go │ ├── geecache_test.go │ └── lru │ ├── lru.go │ └── lru_test.go └── go.mod
byteview.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package geecachetype ByteView struct { b []byte }func (byteView ByteView) Len() int { return len (byteView.b) }func (byteView ByteView) ByteSlice() []byte { return cloneBytes(byteView.b) }func cloneBytes (b []byte ) []byte { c := make ([]byte , len (b)) copy (c, b) return c }func (byteView ByteView) String() string { return string (byteView.b) }
cache.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package geecacheimport ( "sync" "geecache/geecache/lru" )type cache struct { mu sync.Mutex lru *lru.Cache cacheBytes int64 }func (c *cache) add(key string , value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil ) } c.lru.Add(key, value) }func (c *cache) get(key string ) (value ByteView, ok bool ) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { return } if v, ok := c.lru.Get(key); ok { return v.(ByteView), ok } return }
geecache.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 package geecacheimport ( "fmt" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { return g.getLocally(key) }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }
geecache_test.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package geecacheimport ( "fmt" "log" "testing" )func TestGetter (t *testing.T) { var g Getter = GetterFunc(func (key string ) ([]byte , error ) { return []byte (key), nil }) bytes, _ := g.Get("key" ) fmt.Printf("%c" , bytes) }var db = map [string ]string { "Tom" : "630" , "Jack" : "589" , "Sam" : "567" , }func TestGet (t *testing.T) { loadCounts := make (map [string ]int , len (db)) gee := NewGroup("scores" , 1024 , GetterFunc( func (key string ) ([]byte , error ) { log.Println("[SlowDB] search key" , key) if v, ok := db[key]; ok { if _, ok := loadCounts[key]; !ok { loadCounts[key] = 0 } loadCounts[key] += 1 return []byte (v), nil } return nil , fmt.Errorf("%s not exist" , key) })) for k, v := range db { view, err := gee.Get(k) if err != nil { panic (err) } fmt.Println(k, v, view.String()) } view, err := gee.Get("unknown" ) if err != nil { fmt.Println(err) } fmt.Println(view.String()) }
3.HTTP 服务器 这部分比较简单,主要是启动一个 http 服务,处理 URL,获取到 group name 和 key,返回缓存查询到的结果。
完整代码 1 2 3 4 5 6 7 8 9 10 11 version_3_http_server[geecache] ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ ├── lru.go │ └── lru_test.go └── go.mod
http.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package geecacheimport ( "fmt" "log" "net/http" "strings" )const BASEPATH = "/_geecache/" type HTTPPool struct { self string basePath string }func NewHTTPPool (self string ) *HTTPPool { return &HTTPPool{ self: self, basePath: BASEPATH, } }func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, h.basePath) { panic ("Error path: " + r.URL.Path) } h.Log("%s %s" , r.Method, r.URL.Path) parts := strings.SplitN(r.URL.Path[len (BASEPATH):], "/" , 2 ) if len (parts) != 2 { http.Error(w, "bad request" , http.StatusBadRequest) return } groupName := parts[0 ] key := parts[1 ] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: " +groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/json" ) w.Write(view.ByteSlice()) }func (h *HTTPPool) Log(format string , v ...interface {}) { log.Printf("[Server %s] %s" , h.self, fmt.Sprintf(format, v...)) }
geecache_test.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package geecacheimport ( "fmt" "log" "net/http" "testing" )var db = map [string ]string { "Tom" : "630" , "Jack" : "589" , "Sam" : "567" , }func TestHTTP (t *testing.T) { NewGroup("scores" , 1024 , GetterFunc( func (key string ) ([]byte , error ) { log.Println("[SlowDB] search key" , key) if v, ok := db[key]; ok { return []byte (v), nil } return nil , fmt.Errorf("%s not exist" , key) })) addr := "localhost:9999" peers := NewHTTPPool(addr) log.Println("cache is running at" , addr) log.Fatal(http.ListenAndServe(addr, peers)) }
4.一致性哈希 该访问哪个缓存节点? 分布式缓存,也就是有多个机器,每个机器存储一部分数据。但当一个请求过来,该去哪个节点查询缓存呢? 解决:通过哈希算法对 key 进行计算,并对节点个数进行取余。这样每个 key 都会固定映射到一个节点中。 缓存节点数量调整怎么办? 缓存运行过程中,如果有节点增加/减少,hash(key) % N
,这里的 N
会变为 N-1
,大部分缓存都会失效,在原来的节点中找不到,会引发缓存雪崩。 解决:一致性哈希可以解决这个问题。 一致性哈希是什么? 一致性哈希是一个哈希环,通过哈希算法将 key 和节点映射到 0 到 2³²−1 的环形空间。(为什么是 2³²?因为大部分哈希计算出的值都是 32 位的,其实也可以用 64 位的,只不过 2³² 是40亿,足够应对哈希冲突了。) 每个 key 归属于顺时针找到的第一个节点。 增加节点时 ,只有新增节点区域的 key 需要迁移至新节点。减少节点也是同理。
数据倾斜问题怎么处理? 如上图左侧哈希环中,key2、key11、key27 都在节点 peer2 上,这就是数据倾斜,大部分数据储存在少部分几个节点中。 为什么会数据倾斜?因为节点在哈希环上分布不均。所以我们使用更多的虚拟节点将环划分为更小的区间,每个真实节点对应多个虚拟节点。这样就不会因为真实节点分布不均,导致的大段连续区间被单一节点垄断的问题。 如何实现一致性哈希? 主要有三件事:构建哈希环、节点数量变化的处理、查找 key 所在节点。
(1)构建哈希环
哈希环就是map,
支持自定义哈希函数,在初始化时,使用者传入,不传入则默认使用 crc32。 设置虚拟节点,虚拟节点个数是真实节点的 replicas
倍。 将所有虚拟节点有序排列,储存在数组 keys
里,这就是哈希环本体。 虚拟节点与真实节点的映射保存在 hashMap
中。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type Hash func (data []byte ) uint32 type Map struct { hash Hash replicas int keys []int hashMap map [int ]string }func New (replicas int , fn Hash) *Map { m := &Map{ hash: fn, replicas: replicas, hashMap: make (map [int ]string ), } if m.hash == nil { m.hash = crc32.ChecksumIEEE } return m }
(2)节点数量变化的处理
暂时不考虑已有缓存迁移的问题,仅考虑节点数量变化后,能否找到对应的真实节点。
加入节点:
加入一个真实节点,实际操作是,加入 replicas
个虚拟节点。 先都加入到 keys
中,再排序 1 2 3 4 5 6 7 8 9 10 11 12 func (m *Map) Add(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) m.keys = append (m.keys, hash) m.hashMap[hash] = key } } sort.Ints(m.keys) }
删除节点:
删除真实节点与虚拟节点的映射 重新构建哈希环(仅保留未被删除的哈希值) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (m *Map) Del(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) delete (m.hashMap, hash) } } var newKeys []int for hash := range m.hashMap { newKeys = append (newKeys, hash) } sort.Ints(newKeys) m.keys = newKeys }
(3)查找 key 所在节点
最初的定义是,每个 key 归属于顺时针找到的第一个节点。 给定一个 hash,他归属的节点就是第一个大于等于 hash 的节点。 通过二分查找来寻找第一个大于等于 hash 的节点,注意需手动传入判断函数。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (m *Map) Get(key string ) string { if len (m.keys) == 0 { return "" } hash := int (m.hash([]byte (key))) index := sort.Search(len (m.keys), func (i int ) bool { return m.keys[i] >= hash }) return m.hashMap[m.keys[index%len (m.keys)]] }
同时来了解一下,sort.Search
函数,二分查找,找到第一个符合条件的索引。什么条件?就是传入的函数为 true。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func Search (n int , f func (int ) bool ) int { i, j := 0 , n for i < j { h := int (uint (i+j) >> 1 ) if !f(h) { i = h + 1 } else { j = h } } return i }
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 version_4_consistent_hashing[geecache] ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ └── consistent_hash.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ ├── lru.go │ └── lru_test.go └── go.mod
consistenthash/consistent_hash.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package consistenthashimport ( "hash/crc32" "sort" "strconv" )type Hash func (data []byte ) uint32 type Map struct { hash Hash replicas int keys []int hashMap map [int ]string }func New (replicas int , fn Hash) *Map { m := &Map{ hash: fn, replicas: replicas, hashMap: make (map [int ]string ), } if m.hash == nil { m.hash = crc32.ChecksumIEEE } return m }func (m *Map) Add(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) m.keys = append (m.keys, hash) m.hashMap[hash] = key } } sort.Ints(m.keys) }func (m *Map) Del(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) delete (m.hashMap, hash) } } var newKeys []int for hash := range m.hashMap { newKeys = append (newKeys, hash) } sort.Ints(newKeys) m.keys = newKeys }func (m *Map) Get(key string ) string { if len (m.keys) == 0 { return "" } hash := int (m.hash([]byte (key))) index := sort.Search(len (m.keys), func (i int ) bool { return m.keys[i] >= hash }) return m.hashMap[m.keys[index%len (m.keys)]] }
5.分布式节点 这一章节作者原文站在架构的角度自上而下描述的,看的时候未免有些疑惑,比如为什么上来要抽象两个接口。下面我会按照实用主义角度来回答,“基于什么需求,所以这么做”,以作为补充。
分布式缓存要做哪些工作? 整个分布式缓存,要做的工作就是实现下面这个图。
graph LR
A[接收 key] --> B[是否被缓存?]
B -->|是| C[返回缓存值-步骤1]
B -->|否| D[是否应从远程节点获取?]
D -->|是| E[与远程节点交互]
E --> F[返回缓存值-步骤2]
D -->|否| G[调用回调函数]
G --> H[获取值并添加到缓存]
H --> I[返回缓存值-步骤3]
在第二章节,我们完成了单个节点的功能:查找本地缓存(步骤1);找不到时进行回源储存(步骤3)。 在第三章节,我们解决了单个节点如何为其他节点提供服务的问题。 在第四章节,我们解决了多个节点的情况下,查找缓存该查哪个节点的问题。 有了以上铺垫,我们该完成上图中的步骤二了:与远程节点交互,查找远程节点的数据。 graph LR
A[接收 key] --> B[是否被缓存?]
B -->|是| C[返回缓存值-步骤1]
B -->|否| E[使用一致性哈希选择节点,是否从远程节点获取?]
E -->|是| G[HTTP 客户端访问远程节点]
G -->|成功| I[返回缓存值-步骤2]
G -->|失败| K[调用回调函数]
K --> L[获取值并添加到缓存]
L --> M[返回缓存值-步骤3]
E -->|否| K
补充一些步骤二的细节,如上图。
站在使用者的角度,当我拿到 Group 之后,要完成步骤二,我需要哪些方法供我调用?
方法一:通过 Key 查找到属于哪个远程节点。 方法二:访问远程节点,获取缓存。 方法三:远程节点也没有,则调用本地的回调函数。 所以 Group 声明了两个接口,PeerPicker 为 Group 提供方法一,PeerGetter 为 Group 提供方法二,方法三在原有 group.load
逻辑中改写。
1 2 3 4 5 6 7 8 9 10 11 type PeerPicker interface { PickPeer(key string ) (peer PeerGetter, ok bool ) }type PeerGetter interface { Get(group string , key string ) ([]byte , error ) }
Group 只需要拿着口变量调用接口的方法,就可以完成整体逻辑。
至于这两个接口是谁来实现,用什么协议实现,Group 并不关心,我就规定了这几个方法,谁实现了我用谁完成。所以可能是 HTTPPool 用 HTTP 协议实现的,也可能是 HTTPPool 用 protobuf 实现的。
就比如你是一个 leader,你有一项工作,你分为part 1、part2,你只需要说出你最终想要什么。你也不管谁来做,具体怎么做。比如张三李四说他们有能力做,那你要做的就是合并张三李四成果,向上汇报就行。
这样做有什么好处? 领导轻松 。
业务与实现分离,自己做自己的事情:Group 调用各种方法,完成业务逻辑。 HTTPPool 实现 Group 规定的两个方法。 依赖倒置,Group 不依赖 HTTPPool,HTTPPool 变成可插拔的了,方便自定义修改。(也就是上面例子里,把张三李四换成王五,让王五来干) 如何实现 Group 访问远程节点的逻辑? 首先将 PeerPicker 接口变量,放在 Group 结构体中,并提供注入方法,便于 Group 后续调用。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type Group struct { name string getter Getter mainCache cache peers PeerPicker }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }
在查找缓存时,仅在 load 函数里增加去远程节点查找的逻辑。 去远程节点查找的逻辑更为简单。(就像领导直接安排张三李四干活一样)先调用 PeerPicker.PickPeer
找到远程节点,并拿到访问这个节点的客户端。 再调用 PeerGetter.Get
给远程节点发送请求,获取缓存值。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (g *Group) load(key string ) (ByteView, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{bytes}, nil }
ok,至此,Group 的工作就做完了,只管调用 PeerPicker.PickPeer
和 PeerGetter.Get
的方法,这两件事谁来干不知道,具体怎么干不知道,十分简洁优雅。
谁来完成具体的工作呢?我们把这个重任交给第三章中出现的 HTTPPool,让 HTTPPool 实现 PeerPicker.PickPeer
接口,用户就可以调用 RegisterPeers,把 HTTPPool 注入 Group 了。
为什么是 HTTPPool 来实现?
统一网络通信层:第三章中 HTTPPool 主要是作为服务端,提供获取本机缓存的 HTTP 服务;那同样他也可以作为客户端,可以发送 HTTP 请求到其他节点。 HTTPPool 如何实现 PeerPicker 接口? 实现 PickPeer 方法就实现了这个接口,也就是实现通过 key 查找远程节点的功能。
要查找远程节点,则需要将一致性哈希环集成到内部。 找到节点后,要可以请求访问,则需要保存各节点 IP/Port。 如何将一致性哈希加入 HTTPPool? 增加 sync.Mutex
锁。为什么要加锁? 增加一致哈希 peers,保存所有节点。 增加 map,将节点和节点的 ip/port 映射起来,知道访问那个 IP 去查询。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type HTTPPool struct { self string basePath string mu sync.Mutex peers *consistenthash.Map httpGetters map [string ]*httpGetter }type httpGetter struct { baseURL string }
HTTPPool 能做的,也就是对 consistenthash.Map
能力进行封装,为自己所用:
HTTPPool.Set
初始化一致性哈希 peers,把所有节点都加到哈希环里,并为每个节点初始化 HTTP 客户端 httpGetter(其实就是保存了 IP/Port 而已)。HTTPPool.PickPeer
通过 key,在一致性哈希环上,查找 key 所属的节点,返回这个节点的 IP/Port,我们将 IP/Port 保存在客户端 httpGetter,所以直接返回 httpGetter 实例。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (h *HTTPPool) Set(peers ...string ) { h.mu.Lock() defer h.mu.Unlock() h.peers = consistenthash.New(defaultReplicas, nil ) h.peers.Add(peers...) h.httpGetters = make (map [string ]*httpGetter, len (peers)) for _, peer := range peers { h.httpGetters[peer] = &httpGetter{ baseURL: peer + h.basePath, } } }func (h *HTTPPool) PickPeer(key string ) (PeerGetter, bool ) { h.mu.Lock() defer h.mu.Unlock() if peer := h.peers.Get(key); peer != "" && peer != h.self { h.Log("Pick peer %s" , peer) return h.httpGetters[peer], true } return nil , false }
为什么要返回客户端 httpGetter 实例,而不是直接的 IP/Port,让 Group 拿到 IP/Port 自己去发 HTTP 请求呢?
和上面,Group 为什么设计 PeerPicker 和 PeerGetter 接口一样。因为我们要做到 HTTPPool 和 Group 解耦,实现与业务分离。
HTTPPool 负责管理所有节点信息,根据节点信息提供 HTTP 服务、访问 HTTP 服务。 Group 只管调用 PeerPicker.PickPeer
和 PeerGetter.Get
的方法,去完成业务逻辑即可。 所以,我们还需为 httpGetter 写一个发送请求的方法,让 httpGetter 实现 PeerGetter 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (g *httpGetter) Get(group string , key string ) ([]byte , error ) { u := fmt.Sprintf("%v%v/%v" , g.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return nil , err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil , fmt.Errorf("server returned %v" , res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return nil , fmt.Errorf("reading response body %v" , err) } return bytes, nil }
如何验证功能? 这个 main 函数看着很绕,其实逻辑是这样:
8001 - 8003 端口模拟三个的主机节点。 端口 9999 相当于 8001 节点机器供用户交互的网关,每个节点都有与用户交互的网关,这里演示只启动 8001 节点的 9999 端口用于交互。 main 做的就是初始化 Group;初始化 HTTPPool;将 HTTPPool 注册到 Group 中;启动一个 api 服务监听 9999 端口。 graph LR
A[查询请求] --> B[API-端口 9999 + 分布式节点一-端口 8001]
B --> D[分布式节点二-端口 8002]
B -->|经过查询 key 在节点三| E[分布式节点三-端口 8003]
E --> F[返回缓存值]
具体实现不细描述,跑起来就明白了。
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 . ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ ├── consistent_hash.go │ │ └── consistent_hash_test.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ ├── lru.go │ └── lru_test.go ├── go.mod └── main.go
geecache/geecache.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 package geecacheimport ( "fmt" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache peers PeerPicker }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{bytes}, nil }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }type PeerGetter interface { Get(group string , key string ) ([]byte , error ) }type PeerPicker interface { PickPeer(key string ) (peer PeerGetter, ok bool ) }
geecache/http.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 package geecacheimport ( "fmt" "geecache/geecache/consistenthash" "io" "log" "net/http" "net/url" "strings" "sync" )const ( defaultBasePath = "/_geecache/" defaultReplicas = 3 )type HTTPPool struct { self string basePath string mu sync.Mutex peers *consistenthash.Map httpGetters map [string ]*httpGetter }func NewHTTPPool (self string ) *HTTPPool { return &HTTPPool{ self: self, basePath: defaultBasePath, } }func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, h.basePath) { panic ("Error path: " + r.URL.Path) } h.Log("%s %s" , r.Method, r.URL.Path) parts := strings.SplitN(r.URL.Path[len (defaultBasePath):], "/" , 2 ) if len (parts) != 2 { http.Error(w, "bad request" , http.StatusBadRequest) return } groupName := parts[0 ] key := parts[1 ] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: " +groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/json" ) w.Write(view.ByteSlice()) }func (h *HTTPPool) Log(format string , v ...interface {}) { log.Printf("[Server %s] %s" , h.self, fmt.Sprintf(format, v...)) }func (h *HTTPPool) Set(peers ...string ) { h.mu.Lock() defer h.mu.Unlock() h.peers = consistenthash.New(defaultReplicas, nil ) h.peers.Add(peers...) h.httpGetters = make (map [string ]*httpGetter, len (peers)) for _, peer := range peers { h.httpGetters[peer] = &httpGetter{ baseURL: peer + h.basePath, } } }func (h *HTTPPool) PickPeer(key string ) (PeerGetter, bool ) { h.mu.Lock() defer h.mu.Unlock() if peer := h.peers.Get(key); peer != "" && peer != h.self { h.Log("Pick peer %s" , peer) return h.httpGetters[peer], true } return nil , false }type httpGetter struct { baseURL string }func (g *httpGetter) Get(group string , key string ) ([]byte , error ) { u := fmt.Sprintf("%v%v/%v" , g.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return nil , err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil , fmt.Errorf("server returned %v" , res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return nil , fmt.Errorf("reading response body %v" , err) } return bytes, nil }var _ PeerGetter = (*httpGetter)(nil )
main.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package mainimport ( "flag" "fmt" "geecache/geecache" "log" "net/http" )var db = map [string ]string { "Tom" : "630" , "Jack" : "589" , "Sam" : "567" , }func createGroup () *geecache.Group { return geecache.NewGroup("scores" , 2 <<10 , geecache.GetterFunc(func (key string ) ([]byte , error ) { log.Println("[SlowDB] search key" , key) if v, ok := db[key]; ok { return []byte (v), nil } return nil , fmt.Errorf("%s not exist" , key) })) }func startCacheServer (addr string , addrs []string , gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) peers.Set(addrs...) gee.RegisterPeers(peers) log.Println("geecache is running at" , addr) log.Fatal(http.ListenAndServe(addr[7 :], peers)) }func startAPIServer (apiAddr string , gee *geecache.Group) { http.Handle("/api" , http.HandlerFunc( func (w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key" ) view, err := gee.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/octet-stream" ) w.Write(view.ByteSlice()) })) log.Println("fontend server is running at" , apiAddr) log.Fatal(http.ListenAndServe(apiAddr[7 :], nil )) }func main () { var port int var api bool flag.IntVar(&port, "port" , 8001 , "Geecache server port" ) flag.BoolVar(&api, "api" , false , "Start a api server?" ) flag.Parse() apiAddr := "http://localhost:9999" addrMap := map [int ]string { 8001 : "http://localhost:8001" , 8002 : "http://localhost:8002" , 8003 : "http://localhost:8003" , } var addrs []string for _, v := range addrMap { addrs = append (addrs, v) } gee := createGroup() if api { go startAPIServer(apiAddr, gee) } startCacheServer(addrMap[port], []string (addrs), gee) }
6.防止缓存击穿 缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。 缓存击穿 :一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。缓存穿透 :查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。延伸阅读:golang 中 singleflight 包的实现:singleflight.go
如何防止缓存击穿? 这一章节可写的不多,主要是一个思想:
对于短时间内的大量并发的同种请求,只响应第一个。 其他的请求等待第一个请求完毕,直接使用第一个请求的结果。 值得学习的地方:
singleflight 的实现,使用代理模式,load 中查询远程节点的逻辑,全部交给了 singleflight.Do
这个方法进行封装,有点类似中间件的工作模式。 仅需少量代码就可以完成对于每个请求的控制,简单优雅。 什么时候应该加锁? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (g *Group) Do(key string , fn func () (interface {}, error )) (interface {}, error ) { if g.m == nil { g.mu.Lock() if g.m == nil { g.m = make (map [string ]*call) } g.mu.Unlock() } g.mu.RLock() if c, ok := g.m[key]; ok { g.mu.RUnlock() c.wg.Wait() return c.val, c.err } g.mu.RUnlock() c := new (call) c.wg.Add(1 ) g.mu.Lock() g.m[key] = c g.mu.Unlock() c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete (g.m, key) g.mu.Unlock() return c.val, c.err }
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 . ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ ├── consistent_hash.go │ │ └── consistent_hash_test.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ │ ├── lru.go │ │ └── lru_test.go │ └── singleflight │ └── singleflight.go ├── go.mod └── main.go
geecache/singleflight/singleflight.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package singleflightimport "sync" type call struct { wg sync.WaitGroup val interface {} err error }type Group struct { mu sync.RWMutex m map [string ]*call }func (g *Group) Do(key string , fn func () (interface {}, error )) (interface {}, error ) { if g.m == nil { g.mu.Lock() if g.m == nil { g.m = make (map [string ]*call) } g.mu.Unlock() } g.mu.RLock() if c, ok := g.m[key]; ok { g.mu.RUnlock() c.wg.Wait() return c.val, c.err } g.mu.RUnlock() c := new (call) c.wg.Add(1 ) g.mu.Lock() g.m[key] = c g.mu.Unlock() c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete (g.m, key) g.mu.Unlock() return c.val, c.err }
geecache/geecache.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 package geecacheimport ( "fmt" "geecache/geecache/singleflight" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache peers PeerPicker loader *singleflight.Group }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, loader: &singleflight.Group{}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { val, err := g.loader.Do(key, func () (interface {}, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }) if err == nil { return val.(ByteView), nil } return ByteView{}, err }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{bytes}, nil }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }type PeerGetter interface { Get(group string , key string ) ([]byte , error ) }type PeerPicker interface { PickPeer(key string ) (peer PeerGetter, ok bool ) }
7.使用 Protobuf 编码 使用 protobuf 编码需要做哪些更改? (1)原来逻辑
客户端:
拿到 group、key 构造 url 使用 http.Get
发送请求 服务端:
处理请求,将结果 view 返回 客户端:
接收服务端结果,直接使用 (2)使用 protobuf 编码后的逻辑
客户端:
拿到 group、key 构造 url 使用 http.Get
发送请求 服务端:
处理请求,将结果 view 使用 protobuf 编码(变更) 客户端:
接收服务端结果,使用 protobuf 解码。(变更) 如何实现 创建 proto 文件,定义 Request 和 Response。 更改 PeerGetter.Get
参数,改为 Request 和 Response。 在处理请求和接收数据时,使用 protobuf 进行编解码。 完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 . ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ ├── consistent_hash.go │ │ └── consistent_hash_test.go │ ├── geecache.go │ ├── geecachepb │ │ ├── geecachepb.pb.go │ │ └── geecachepb.proto │ ├── geecache_test.go │ ├── geers.go │ ├── http.go │ └── lru │ │ ├── lru.go │ │ └── lru_test.go │ └── singleflight │ └── singleflight.go ├── go.mod └── main.go
geecache/geecachepb/geecachepb.proto
1 2 3 4 5 6 7 8 9 10 11 12 syntax = "proto3" ;option go_package = ".;geecachepb" ;message Request { string group = 1 ; string key = 2 ; }message Response { bytes value = 1 ; }
geecache/geecache.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 package geecacheimport ( "fmt" "geecache/geecache/geecachepb" "geecache/geecache/singleflight" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache peers PeerPicker loader *singleflight.Group }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, loader: &singleflight.Group{}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { val, err := g.loader.Do(key, func () (interface {}, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }) if err == nil { return val.(ByteView), nil } return ByteView{}, err }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { req := &geecachepb.Request{ Group: g.name, Key: key, } res := &geecachepb.Response{} err := peer.Get(req, res) if err != nil { return ByteView{}, err } return ByteView{res.Value}, nil }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }
geecache/http.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 package geecacheimport ( "fmt" "geecache/geecache/consistenthash" "geecache/geecache/geecachepb" "io" "log" "net/http" "net/url" "strings" "sync" "google.golang.org/protobuf/proto" )const ( defaultBasePath = "/_geecache/" defaultReplicas = 3 )type HTTPPool struct { self string basePath string mu sync.Mutex peers *consistenthash.Map httpGetters map [string ]*httpGetter }func NewHTTPPool (self string ) *HTTPPool { return &HTTPPool{ self: self, basePath: defaultBasePath, } }func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, h.basePath) { panic ("Error path: " + r.URL.Path) } h.Log("%s %s" , r.Method, r.URL.Path) parts := strings.SplitN(r.URL.Path[len (defaultBasePath):], "/" , 2 ) if len (parts) != 2 { http.Error(w, "bad request" , http.StatusBadRequest) return } groupName := parts[0 ] key := parts[1 ] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: " +groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } res := &geecachepb.Response{ Value: view.ByteSlice(), } body, err := proto.Marshal(res) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/json" ) w.Write(body) }func (h *HTTPPool) Log(format string , v ...interface {}) { log.Printf("[Server %s] %s" , h.self, fmt.Sprintf(format, v...)) }func (h *HTTPPool) Set(peers ...string ) { h.mu.Lock() defer h.mu.Unlock() h.peers = consistenthash.New(defaultReplicas, nil ) h.peers.Add(peers...) h.httpGetters = make (map [string ]*httpGetter, len (peers)) for _, peer := range peers { h.httpGetters[peer] = &httpGetter{ baseURL: peer + h.basePath, } } }func (h *HTTPPool) PickPeer(key string ) (PeerGetter, bool ) { h.mu.Lock() defer h.mu.Unlock() if peer := h.peers.Get(key); peer != "" && peer != h.self { h.Log("Pick peer %s" , peer) return h.httpGetters[peer], true } return nil , false }type httpGetter struct { baseURL string }func (g *httpGetter) Get(in *geecachepb.Request, out *geecachepb.Response) error { group := in.Group key := in.Key u := fmt.Sprintf("%v%v/%v" , g.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return fmt.Errorf("server returned %v" , res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return fmt.Errorf("reading response body %v" , err) } err = proto.Unmarshal(bytes, out) if err != nil { return err } return nil }var _ PeerGetter = (*httpGetter)(nil )