本文逐步实现了一个分布式缓存系统,涵盖LRU缓存淘汰策略、单机并发控制、HTTP服务接口及一致性哈希算法。通过封装缓存值、互斥锁机制和虚拟节点映射,解决了并发安全、节点负载均衡等核心问题,构建了支持分布式部署的缓存框架。
通过回答关键问题的方式,记录阅读 geeCache 代码过程中的思考,并做出补充和改进,重点在“怎么做”的基础上,补充“为什么这么做”等逻辑。
0.序言 为什么需要 cache 优化性能:将耗时操作的结果储存起来,下次需要请求,直接使用,避免重复耗时操作。
降低延迟:静态资源缓存(CDN/浏览器缓存),提高访问速度。
转移负载:将高频访问的数据,从数据库迁移到内存中,降低后端压力。
高并发支撑:通过内存级的响应能力支持大规模的并发请求,避免数据库被击穿。
cache 要解决哪些问题 1.缓存淘汰策略 介绍三种常见的缓存淘汰算法 缓存储存在内存中,内存是有限的,对于没用的数据就需要移出。三种常见的缓存淘汰算法。
FIFO(First In First Out)
LFU(Least Frequently Used)
LRU(Least Recently Used)
基于时间局部性原理,认为最近被访问过的,更有可能被再次访问。那么最近没使用过的数据,就是要淘汰的数据。
实现:维护一个队列,某个数据被访问,就将数据移至队尾,这样队首则是最近最少被访问的数据。
优点:FIFO 考虑时间因素,LFU 考虑访问频率,LRU 既考虑时间,也考虑频次(多次访问,意味着多次被移到队尾)。
如何设计 cache cache 数据结构设计思路
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 type Cache struct { maxBytes int64 nBytes int64 ll *list.List cache map [string ]*list.Element OnEvicted func (key string , value Value) }type entry struct { key string value Value }type Value interface { Len() int }func New (maxBytes int64 , onEvicted func (key string , value Value) ) *Cache { return &Cache{ maxBytes: maxBytes, ll: list.New(), cache: make (map [string ]*list.Element), OnEvicted: onEvicted, } }
为什么 value 不直接放在链表里,要放在 entry 结构体里 为什么要新建一个 Value 接口类型,并包含 Len 方法 缓存的数据可以什么格式都有,用接口变量储存最好,即 interface{}。
因为整个 cache 是有大小的,所有有需要查询所有缓存数据的大小。所以在 interface{} 中增加一个 Len方法,只有实现了这个方法的变量,才能作为接口类型 Value 的变量储存在 cache 中。
下面举个例子,如歌 entry 的 value 变量需要储存 String 变量,String 需要主动实现 Len 方法,也就是实现 Value 接口。
1 2 3 4 5 type String string func (s String) Len() int { return len (s) }
如何实现 cache 增删改查 增加
检查是否存在,存在即更新,不存在则增加缓存
维护缓存大小,超过最大值需要淘汰缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (c *Cache) Add(key string , value Value) { if _, ok := c.cache[key]; ok { c.Update(key, value) } else { ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.nBytes += int64 (len (key)) + int64 (value.Len()) for c.maxBytes != 0 && c.maxBytes < c.nBytes { c.RemoveOldest() } } }
删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func (c *Cache) RemoveOldest() { ele := c.ll.Back() if ele != nil { c.ll.Remove(ele) kv := ele.Value.(*entry) delete (c.cache, kv.key) c.nBytes -= int64 (len (kv.key)) + int64 (kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key, kv.value) } } }
修改
1 2 3 4 5 6 7 8 9 10 11 12 13 func (c *Cache) Update(key string , value Value) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) c.nBytes += int64 (value.Len()) - int64 (kv.value.Len()) kv.value = value } else { c.Add(key, value) } }
查询
1 2 3 4 5 6 7 8 9 10 11 func (c *Cache) Get(key string ) (value Value, ok bool ) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) return kv.value, true } return }
ele.Value.(*entry) 的写法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package listtype Element struct { next, prev *Element list *List Value any }
完整代码 1 2 3 4 5 version_1_LRU[geecache] ├── go.mod └── lru ├── lru.go └── lru_test.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 package lruimport "container/list" type Cache struct { maxBytes int64 nBytes int64 ll *list.List cache map [string ]*list.Element OnEvicted func (key string , value Value) }type entry struct { key string value Value }type Value interface { Len() int }func New (maxBytes int64 , onEvicted func (key string , value Value) ) *Cache { return &Cache{ maxBytes: maxBytes, ll: list.New(), cache: make (map [string ]*list.Element), OnEvicted: onEvicted, } }func (c *Cache) Add(key string , value Value) { if _, ok := c.cache[key]; ok { c.Update(key, value) } else { ele := c.ll.PushFront(&entry{key, value}) c.cache[key] = ele c.nBytes += int64 (len (key)) + int64 (value.Len()) for c.maxBytes != 0 && c.maxBytes < c.nBytes { c.RemoveOldest() } } }func (c *Cache) RemoveOldest() { ele := c.ll.Back() if ele != nil { c.ll.Remove(ele) kv := ele.Value.(*entry) delete (c.cache, kv.key) c.nBytes -= int64 (len (kv.key)) + int64 (kv.value.Len()) if c.OnEvicted != nil { c.OnEvicted(kv.key, kv.value) } } }func (c *Cache) Update(key string , value Value) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) c.nBytes += int64 (value.Len()) - int64 (kv.value.Len()) kv.value = value } else { c.Add(key, value) } }func (c *Cache) Get(key string ) (value Value, ok bool ) { if ele, ok := c.cache[key]; ok { c.ll.MoveToFront(ele) kv := ele.Value.(*entry) return kv.value, true } return }func (c *Cache) Len() int { return c.ll.Len() }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 package lruimport ( "fmt" "testing" )type String string func (s String) Len() int { return len (s) }func TestFunction (t *testing.T) { lru := New(int64 (10 ), nil ) lru.Add("k1" , String("1234" )) fmt.Println(lru.Get("k1" )) lru.RemoveOldest() fmt.Println(lru.Get("k1" )) lru.Update("k2" , String("123" )) fmt.Println(lru.Get("k2" )) }func TestAutoRemoveOldest (t *testing.T) { testData := []struct { key string value Value }{ {"k1" , String("1234567890" )}, {"k2" , String("234567890" )}, {"k3" , String("34567890" )}, } lru := New(int64 (10 ), nil ) for _, test := range testData { lru.Add(test.key, test.value) } for _, test := range testData { fmt.Println(lru.Get(test.key)) } }func TestCallbackOnEvicted (t *testing.T) { keys := make ([]string , 0 ) lru := New(10 , func (key string , value Value) { keys = append (keys, key) }) lru.Add("k1" , String("k1" )) lru.Add("k2" , String("k2" )) lru.Add("k3" , String("k3" )) lru.Add("k4" , String("k4" )) fmt.Println(keys) }
2.单机并发缓存 如何设计并发安全的 cache 主要做两件事:加锁 + 保证数据不可变性
对互斥操作进行加锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 type cache struct { mu sync.Mutex lru *lru.Cache cacheBytes int64 }func (c *cache) add(key string , value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil ) } c.lru.Add(key, value) }func (c *cache) get(key string ) (value ByteView, ok bool ) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { return } if v, ok := c.lru.Get(key); ok { return v.(ByteView), ok } return }
保证数据不可变性 (只读)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package geecachetype ByteView struct { b []byte }func (byteView ByteView) Len() int { return len (byteView.b) }func (byteView ByteView) ByteSlice() []byte { return cloneBytes(byteView.b) }func cloneBytes (b []byte ) []byte { c := make ([]byte , len (b)) copy (c, b) return c }func (byteView ByteView) String() string { return string (byteView.b) }
为什么要抽象 ByteView 数据类型来表示缓存值?
在 lru 中定义的 entry.value 是 Value 接口类型,所有传入的数据的类型均需要实现这个接口,也就是实现 Len 函数,比较麻烦。[]byte 可以支持任何数据类型的存储,并为 []byte 抽象为 ByteView 类型,并实现 lru.Value 接口。
保证 ByteView 是只读类型。如何保证?
ByteView 的变量只有一个 b,b 是小写,外部无法直接读取。只能通过 ByteSlice 和 String 方法获取 b 的数据ByteSlice 返回 slice 时,会拷贝一个副本再返回String 将 b 的数据强制转换为 String。(外界也无法直接修改 b 的值) 为什么 get 操作也要加锁 想象两个场景:
cache 在 add 函数中,初始化 lru.Cache 的过程是懒加载,lru.Cache 初始化过程中,还为初始化完,如果这时候访问 Get,会导致空指针的情况。
lru.cache 在写数据时,需要移动节点到首部,如果这时候去访问,会同时移动链表节点到首部,会造成链表断裂,节点丢失。
也就是说,需要保证 lru.cache 初始化的原子性,以及 lru.cache 内部操作的原子性。
为什么要延迟初始化(Lazy Initialization) 1 2 3 4 5 6 7 8 9 10 11 12 func (c *cache) add(key string , value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil ) } c.lru.Add(key, value) }
为什么要设计 Group 在使用 cache 对 lru.Cache 进行封装后,又使用 Group 对 cache 进行了封装。这是为什么呢?
其实站在使用者的角度,只有一个并发安全的 cache,会存在很多问题没法解决:
为此,我们使用分层设计的思想,再封装一层,cache 负责并发存储,Group 负责业务逻辑。
有哪些业务逻辑?
如何设计 Group 1 2 3 4 5 6 7 8 9 10 11 12 13 type Group struct { name string getter Getter mainCache cache }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )
通过 groups 进行全局管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }
统一管理缓存回源逻辑
首先是回源接口,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }
有了回源接口,需要实现 group.Get 的逻辑,参考这个流程图。
graph LR
A[接收 key] --> B[是否被缓存?]
B -->|是| C[返回缓存值-步骤1]
B -->|否| D[是否应从远程节点获取?]
D -->|是| E[与远程节点交互]
E --> F[返回缓存值-步骤2]
D -->|否| G[调用回调函数]
G --> H[获取值并添加到缓存]
H --> I[返回缓存值-步骤3]
Get 完成步骤1
load + getLocally 完成步骤3
步骤二在第5章节《分布式节点》中实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { return g.getLocally(key) }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }
完整代码 1 2 3 4 5 6 7 8 9 10 version_2_singal_node[geecache] ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── geecache.go │ ├── geecache_test.go │ └── lru │ ├── lru.go │ └── lru_test.go └── go.mod
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package geecachetype ByteView struct { b []byte }func (byteView ByteView) Len() int { return len (byteView.b) }func (byteView ByteView) ByteSlice() []byte { return cloneBytes(byteView.b) }func cloneBytes (b []byte ) []byte { c := make ([]byte , len (b)) copy (c, b) return c }func (byteView ByteView) String() string { return string (byteView.b) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 package geecacheimport ( "sync" "geecache/geecache/lru" )type cache struct { mu sync.Mutex lru *lru.Cache cacheBytes int64 }func (c *cache) add(key string , value ByteView) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { c.lru = lru.New(c.cacheBytes, nil ) } c.lru.Add(key, value) }func (c *cache) get(key string ) (value ByteView, ok bool ) { c.mu.Lock() defer c.mu.Unlock() if c.lru == nil { return } if v, ok := c.lru.Get(key); ok { return v.(ByteView), ok } return }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 package geecacheimport ( "fmt" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { return g.getLocally(key) }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package geecacheimport ( "fmt" "log" "testing" )func TestGetter (t *testing.T) { var g Getter = GetterFunc(func (key string ) ([]byte , error ) { return []byte (key), nil }) bytes, _ := g.Get("key" ) fmt.Printf("%c" , bytes) }var db = map [string ]string { "Tom" : "630" , "Jack" : "589" , "Sam" : "567" , }func TestGet (t *testing.T) { loadCounts := make (map [string ]int , len (db)) gee := NewGroup("scores" , 1024 , GetterFunc( func (key string ) ([]byte , error ) { log.Println("[SlowDB] search key" , key) if v, ok := db[key]; ok { if _, ok := loadCounts[key]; !ok { loadCounts[key] = 0 } loadCounts[key] += 1 return []byte (v), nil } return nil , fmt.Errorf("%s not exist" , key) })) for k, v := range db { view, err := gee.Get(k) if err != nil { panic (err) } fmt.Println(k, v, view.String()) } view, err := gee.Get("unknown" ) if err != nil { fmt.Println(err) } fmt.Println(view.String()) }
3.HTTP 服务器 这部分比较简单,主要是启动一个 http 服务,处理 URL,获取到 group name 和 key,返回缓存查询到的结果。
完整代码 1 2 3 4 5 6 7 8 9 10 11 version_3_http_server[geecache] ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ ├── lru.go │ └── lru_test.go └── go.mod
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 package geecacheimport ( "fmt" "log" "net/http" "strings" )const BASEPATH = "/_geecache/" type HTTPPool struct { self string basePath string }func NewHTTPPool (self string ) *HTTPPool { return &HTTPPool{ self: self, basePath: BASEPATH, } }func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, h.basePath) { panic ("Error path: " + r.URL.Path) } h.Log("%s %s" , r.Method, r.URL.Path) parts := strings.SplitN(r.URL.Path[len (BASEPATH):], "/" , 2 ) if len (parts) != 2 { http.Error(w, "bad request" , http.StatusBadRequest) return } groupName := parts[0 ] key := parts[1 ] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: " +groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/json" ) w.Write(view.ByteSlice()) }func (h *HTTPPool) Log(format string , v ...interface {}) { log.Printf("[Server %s] %s" , h.self, fmt.Sprintf(format, v...)) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package geecacheimport ( "fmt" "log" "net/http" "testing" )var db = map [string ]string { "Tom" : "630" , "Jack" : "589" , "Sam" : "567" , }func TestHTTP (t *testing.T) { NewGroup("scores" , 1024 , GetterFunc( func (key string ) ([]byte , error ) { log.Println("[SlowDB] search key" , key) if v, ok := db[key]; ok { return []byte (v), nil } return nil , fmt.Errorf("%s not exist" , key) })) addr := "localhost:9999" peers := NewHTTPPool(addr) log.Println("cache is running at" , addr) log.Fatal(http.ListenAndServe(addr, peers)) }
4.一致性哈希 该访问哪个缓存节点 缓存节点数量调整怎么办 一致性哈希是什么
数据倾斜问题怎么处理 如何实现一致性哈希 主要有三件事:构建哈希环、节点数量变化的处理、查找 key 所在节点。
(1)构建哈希环
哈希环就是map,
支持自定义哈希函数,在初始化时,使用者传入,不传入则默认使用 crc32。
设置虚拟节点,虚拟节点个数是真实节点的 replicas 倍。
将所有虚拟节点有序排列,储存在数组 keys 里,这就是哈希环本体。
虚拟节点与真实节点的映射保存在 hashMap 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type Hash func (data []byte ) uint32 type Map struct { hash Hash replicas int keys []int hashMap map [int ]string }func New (replicas int , fn Hash) *Map { m := &Map{ hash: fn, replicas: replicas, hashMap: make (map [int ]string ), } if m.hash == nil { m.hash = crc32.ChecksumIEEE } return m }
(2)节点数量变化的处理
暂时不考虑已有缓存迁移的问题,仅考虑节点数量变化后,能否找到对应的真实节点。
加入节点:
1 2 3 4 5 6 7 8 9 10 11 12 func (m *Map) Add(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) m.keys = append (m.keys, hash) m.hashMap[hash] = key } } sort.Ints(m.keys) }
删除节点:
删除真实节点与虚拟节点的映射
重新构建哈希环(仅保留未被删除的哈希值)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 func (m *Map) Del(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) delete (m.hashMap, hash) } } var newKeys []int for hash := range m.hashMap { newKeys = append (newKeys, hash) } sort.Ints(newKeys) m.keys = newKeys }
(3)查找 key 所在节点
最初的定义是,每个 key 归属于顺时针找到的第一个节点。
给定一个 hash,他归属的节点就是第一个大于等于 hash 的节点。
通过二分查找来寻找第一个大于等于 hash 的节点,注意需手动传入判断函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 func (m *Map) Get(key string ) string { if len (m.keys) == 0 { return "" } hash := int (m.hash([]byte (key))) index := sort.Search(len (m.keys), func (i int ) bool { return m.keys[i] >= hash }) return m.hashMap[m.keys[index%len (m.keys)]] }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func Search (n int , f func (int ) bool ) int { i, j := 0 , n for i < j { h := int (uint (i+j) >> 1 ) if !f(h) { i = h + 1 } else { j = h } } return i }
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 version_4_consistent_hashing[geecache] ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ └── consistent_hash.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ ├── lru.go │ └── lru_test.go └── go.mod
▶
consistenthash/consistent_hash.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 package consistenthashimport ( "hash/crc32" "sort" "strconv" )type Hash func (data []byte ) uint32 type Map struct { hash Hash replicas int keys []int hashMap map [int ]string }func New (replicas int , fn Hash) *Map { m := &Map{ hash: fn, replicas: replicas, hashMap: make (map [int ]string ), } if m.hash == nil { m.hash = crc32.ChecksumIEEE } return m }func (m *Map) Add(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) m.keys = append (m.keys, hash) m.hashMap[hash] = key } } sort.Ints(m.keys) }func (m *Map) Del(keys ...string ) { for _, key := range keys { for i := 0 ; i < m.replicas; i++ { hash := int (m.hash([]byte (strconv.Itoa(i) + key))) delete (m.hashMap, hash) } } var newKeys []int for hash := range m.hashMap { newKeys = append (newKeys, hash) } sort.Ints(newKeys) m.keys = newKeys }func (m *Map) Get(key string ) string { if len (m.keys) == 0 { return "" } hash := int (m.hash([]byte (key))) index := sort.Search(len (m.keys), func (i int ) bool { return m.keys[i] >= hash }) return m.hashMap[m.keys[index%len (m.keys)]] }
5.分布式节点 这一章节作者原文站在架构的角度自上而下描述的,看的时候未免有些疑惑,比如为什么上来要抽象两个接口。下面我会按照实用主义角度来回答,“基于什么需求,所以这么做”,以作为补充。
分布式缓存要做哪些工作 整个分布式缓存,要做的工作就是实现下面这个图。
graph LR
A[接收 key] --> B[是否被缓存?]
B -->|是| C[返回缓存值-步骤1]
B -->|否| D[是否应从远程节点获取?]
D -->|是| E[与远程节点交互]
E --> F[返回缓存值-步骤2]
D -->|否| G[调用回调函数]
G --> H[获取值并添加到缓存]
H --> I[返回缓存值-步骤3]
在第二章节,我们完成了单个节点的功能:查找本地缓存(步骤1);找不到时进行回源储存(步骤3)。
在第三章节,我们解决了单个节点如何为其他节点提供服务的问题。
在第四章节,我们解决了多个节点的情况下,查找缓存该查哪个节点的问题。
有了以上铺垫,我们该完成上图中的步骤二了:与远程节点交互,查找远程节点的数据。
graph LR
A[接收 key] --> B[是否被缓存?]
B -->|是| C[返回缓存值-步骤1]
B -->|否| E[使用一致性哈希选择节点,是否从远程节点获取?]
E -->|是| G[HTTP 客户端访问远程节点]
G -->|成功| I[返回缓存值-步骤2]
G -->|失败| K[调用回调函数]
K --> L[获取值并添加到缓存]
L --> M[返回缓存值-步骤3]
E -->|否| K
补充一些步骤二的细节,如上图。
站在使用者的角度,当我拿到 Group 之后,要完成步骤二,我需要哪些方法供我调用?
方法一:通过 Key 查找到属于哪个远程节点。
方法二:访问远程节点,获取缓存。
方法三:远程节点也没有,则调用本地的回调函数。
所以 Group 声明了两个接口,PeerPicker 为 Group 提供方法一,PeerGetter 为 Group 提供方法二,方法三在原有 group.load 逻辑中改写。
1 2 3 4 5 6 7 8 9 10 11 type PeerPicker interface { PickPeer(key string ) (peer PeerGetter, ok bool ) }type PeerGetter interface { Get(group string , key string ) ([]byte , error ) }
Group 只需要拿着口变量调用接口的方法,就可以完成整体逻辑。
至于这两个接口是谁来实现,用什么协议实现,Group 并不关心,我就规定了这几个方法,谁实现了我用谁完成。所以可能是 HTTPPool 用 HTTP 协议实现的,也可能是 HTTPPool 用 protobuf 实现的。
就比如你是一个 leader,你有一项工作,你分为part 1、part2,你只需要说出你最终想要什么。你也不管谁来做,具体怎么做。比如张三李四说他们有能力做,那你要做的就是合并张三李四成果,向上汇报就行。
这样做有什么好处? 领导轻松 。
如何实现 Group 访问远程节点的逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type Group struct { name string getter Getter mainCache cache peers PeerPicker }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 func (g *Group) load(key string ) (ByteView, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{bytes}, nil }
ok,至此,Group 的工作就做完了,只管调用 PeerPicker.PickPeer 和 PeerGetter.Get 的方法,这两件事谁来干不知道,具体怎么干不知道,十分简洁优雅。
谁来完成具体的工作呢?我们把这个重任交给第三章中出现的 HTTPPool,让 HTTPPool 实现 PeerPicker.PickPeer 接口,用户就可以调用 RegisterPeers,把 HTTPPool 注入 Group 了。
为什么是 HTTPPool 来实现?
HTTPPool 如何实现 PeerPicker 接口 实现 PickPeer 方法就实现了这个接口,也就是实现通过 key 查找远程节点的功能。
如何将一致性哈希加入 HTTPPool 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 type HTTPPool struct { self string basePath string mu sync.Mutex peers *consistenthash.Map httpGetters map [string ]*httpGetter }type httpGetter struct { baseURL string }
HTTPPool 能做的,也就是对 consistenthash.Map 能力进行封装,为自己所用:
HTTPPool.Set 初始化一致性哈希 peers,把所有节点都加到哈希环里,并为每个节点初始化 HTTP 客户端 httpGetter(其实就是保存了 IP/Port 而已)。
HTTPPool.PickPeer 通过 key,在一致性哈希环上,查找 key 所属的节点,返回这个节点的 IP/Port,我们将 IP/Port 保存在客户端 httpGetter,所以直接返回 httpGetter 实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func (h *HTTPPool) Set(peers ...string ) { h.mu.Lock() defer h.mu.Unlock() h.peers = consistenthash.New(defaultReplicas, nil ) h.peers.Add(peers...) h.httpGetters = make (map [string ]*httpGetter, len (peers)) for _, peer := range peers { h.httpGetters[peer] = &httpGetter{ baseURL: peer + h.basePath, } } }func (h *HTTPPool) PickPeer(key string ) (PeerGetter, bool ) { h.mu.Lock() defer h.mu.Unlock() if peer := h.peers.Get(key); peer != "" && peer != h.self { h.Log("Pick peer %s" , peer) return h.httpGetters[peer], true } return nil , false }
为什么要返回客户端 httpGetter 实例,而不是直接的 IP/Port,让 Group 拿到 IP/Port 自己去发 HTTP 请求呢?
和上面,Group 为什么设计 PeerPicker 和 PeerGetter 接口一样。因为我们要做到 HTTPPool 和 Group 解耦,实现与业务分离。
所以,我们还需为 httpGetter 写一个发送请求的方法,让 httpGetter 实现 PeerGetter 接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (g *httpGetter) Get(group string , key string ) ([]byte , error ) { u := fmt.Sprintf("%v%v/%v" , g.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return nil , err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil , fmt.Errorf("server returned %v" , res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return nil , fmt.Errorf("reading response body %v" , err) } return bytes, nil }
如何验证功能 这个 main 函数看着很绕,其实逻辑是这样:
graph LR
A[查询请求] --> B[API-端口 9999 + 分布式节点一-端口 8001]
B --> D[分布式节点二-端口 8002]
B -->|经过查询 key 在节点三| E[分布式节点三-端口 8003]
E --> F[返回缓存值]
具体实现不细描述,跑起来就明白了。
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 . ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ ├── consistent_hash.go │ │ └── consistent_hash_test.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ ├── lru.go │ └── lru_test.go ├── go.mod └── main.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 package geecacheimport ( "fmt" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache peers PeerPicker }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{bytes}, nil }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }type PeerGetter interface { Get(group string , key string ) ([]byte , error ) }type PeerPicker interface { PickPeer(key string ) (peer PeerGetter, ok bool ) }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 package geecacheimport ( "fmt" "geecache/geecache/consistenthash" "io" "log" "net/http" "net/url" "strings" "sync" )const ( defaultBasePath = "/_geecache/" defaultReplicas = 3 )type HTTPPool struct { self string basePath string mu sync.Mutex peers *consistenthash.Map httpGetters map [string ]*httpGetter }func NewHTTPPool (self string ) *HTTPPool { return &HTTPPool{ self: self, basePath: defaultBasePath, } }func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, h.basePath) { panic ("Error path: " + r.URL.Path) } h.Log("%s %s" , r.Method, r.URL.Path) parts := strings.SplitN(r.URL.Path[len (defaultBasePath):], "/" , 2 ) if len (parts) != 2 { http.Error(w, "bad request" , http.StatusBadRequest) return } groupName := parts[0 ] key := parts[1 ] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: " +groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/json" ) w.Write(view.ByteSlice()) }func (h *HTTPPool) Log(format string , v ...interface {}) { log.Printf("[Server %s] %s" , h.self, fmt.Sprintf(format, v...)) }func (h *HTTPPool) Set(peers ...string ) { h.mu.Lock() defer h.mu.Unlock() h.peers = consistenthash.New(defaultReplicas, nil ) h.peers.Add(peers...) h.httpGetters = make (map [string ]*httpGetter, len (peers)) for _, peer := range peers { h.httpGetters[peer] = &httpGetter{ baseURL: peer + h.basePath, } } }func (h *HTTPPool) PickPeer(key string ) (PeerGetter, bool ) { h.mu.Lock() defer h.mu.Unlock() if peer := h.peers.Get(key); peer != "" && peer != h.self { h.Log("Pick peer %s" , peer) return h.httpGetters[peer], true } return nil , false }type httpGetter struct { baseURL string }func (g *httpGetter) Get(group string , key string ) ([]byte , error ) { u := fmt.Sprintf("%v%v/%v" , g.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return nil , err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return nil , fmt.Errorf("server returned %v" , res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return nil , fmt.Errorf("reading response body %v" , err) } return bytes, nil }var _ PeerGetter = (*httpGetter)(nil )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 package mainimport ( "flag" "fmt" "geecache/geecache" "log" "net/http" )var db = map [string ]string { "Tom" : "630" , "Jack" : "589" , "Sam" : "567" , }func createGroup () *geecache.Group { return geecache.NewGroup("scores" , 2 <<10 , geecache.GetterFunc(func (key string ) ([]byte , error ) { log.Println("[SlowDB] search key" , key) if v, ok := db[key]; ok { return []byte (v), nil } return nil , fmt.Errorf("%s not exist" , key) })) }func startCacheServer (addr string , addrs []string , gee *geecache.Group) { peers := geecache.NewHTTPPool(addr) peers.Set(addrs...) gee.RegisterPeers(peers) log.Println("geecache is running at" , addr) log.Fatal(http.ListenAndServe(addr[7 :], peers)) }func startAPIServer (apiAddr string , gee *geecache.Group) { http.Handle("/api" , http.HandlerFunc( func (w http.ResponseWriter, r *http.Request) { key := r.URL.Query().Get("key" ) view, err := gee.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/octet-stream" ) w.Write(view.ByteSlice()) })) log.Println("fontend server is running at" , apiAddr) log.Fatal(http.ListenAndServe(apiAddr[7 :], nil )) }func main () { var port int var api bool flag.IntVar(&port, "port" , 8001 , "Geecache server port" ) flag.BoolVar(&api, "api" , false , "Start a api server?" ) flag.Parse() apiAddr := "http://localhost:9999" addrMap := map [int ]string { 8001 : "http://localhost:8001" , 8002 : "http://localhost:8002" , 8003 : "http://localhost:8003" , } var addrs []string for _, v := range addrMap { addrs = append (addrs, v) } gee := createGroup() if api { go startAPIServer(apiAddr, gee) } startCacheServer(addrMap[port], []string (addrs), gee) }
6.防止缓存击穿 缓存雪崩:缓存在同一时刻全部失效,造成瞬时DB请求量大、压力骤增,引起雪崩。缓存雪崩通常因为缓存服务器宕机、缓存的 key 设置了相同的过期时间等引起。
缓存击穿 :一个存在的key,在缓存过期的一刻,同时有大量的请求,这些请求都会击穿到 DB ,造成瞬时DB请求量大、压力骤增。
缓存穿透 :查询一个不存在的数据,因为不存在则不会写到缓存中,所以每次都会去请求 DB,如果瞬间流量过大,穿透到 DB,导致宕机。
延伸阅读:golang 中 singleflight 包的实现:singleflight.go
如何防止缓存击穿 这一章节可写的不多,主要是一个思想:
值得学习的地方:
什么时候应该加锁 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 func (g *Group) Do(key string , fn func () (interface {}, error )) (interface {}, error ) { if g.m == nil { g.mu.Lock() if g.m == nil { g.m = make (map [string ]*call) } g.mu.Unlock() } g.mu.RLock() if c, ok := g.m[key]; ok { g.mu.RUnlock() c.wg.Wait() return c.val, c.err } g.mu.RUnlock() c := new (call) c.wg.Add(1 ) g.mu.Lock() g.m[key] = c g.mu.Unlock() c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete (g.m, key) g.mu.Unlock() return c.val, c.err }
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 . ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ ├── consistent_hash.go │ │ └── consistent_hash_test.go │ ├── geecache.go │ ├── geecache_test.go │ ├── http.go │ └── lru │ │ ├── lru.go │ │ └── lru_test.go │ └── singleflight │ └── singleflight.go ├── go.mod └── main.go
▶
geecache/singleflight/singleflight.go
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package singleflightimport "sync" type call struct { wg sync.WaitGroup val interface {} err error }type Group struct { mu sync.RWMutex m map [string ]*call }func (g *Group) Do(key string , fn func () (interface {}, error )) (interface {}, error ) { if g.m == nil { g.mu.Lock() if g.m == nil { g.m = make (map [string ]*call) } g.mu.Unlock() } g.mu.RLock() if c, ok := g.m[key]; ok { g.mu.RUnlock() c.wg.Wait() return c.val, c.err } g.mu.RUnlock() c := new (call) c.wg.Add(1 ) g.mu.Lock() g.m[key] = c g.mu.Unlock() c.val, c.err = fn() c.wg.Done() g.mu.Lock() delete (g.m, key) g.mu.Unlock() return c.val, c.err }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 package geecacheimport ( "fmt" "geecache/geecache/singleflight" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache peers PeerPicker loader *singleflight.Group }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, loader: &singleflight.Group{}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { val, err := g.loader.Do(key, func () (interface {}, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }) if err == nil { return val.(ByteView), nil } return ByteView{}, err }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { bytes, err := peer.Get(g.name, key) if err != nil { return ByteView{}, err } return ByteView{bytes}, nil }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }type PeerGetter interface { Get(group string , key string ) ([]byte , error ) }type PeerPicker interface { PickPeer(key string ) (peer PeerGetter, ok bool ) }
7.使用 Protobuf 编码 使用 protobuf 编码需要做哪些更改 (1)原来逻辑
客户端:
拿到 group、key 构造 url
使用 http.Get 发送请求
服务端:
处理请求,将结果 view 返回
客户端:
接收服务端结果,直接使用
(2)使用 protobuf 编码后的逻辑
客户端:
拿到 group、key 构造 url
使用 http.Get 发送请求
服务端:
处理请求,将结果 view 使用 protobuf 编码(变更)
客户端:
接收服务端结果,使用 protobuf 解码。(变更)
如何实现 创建 proto 文件,定义 Request 和 Response。
更改 PeerGetter.Get 参数,改为 Request 和 Response。
在处理请求和接收数据时,使用 protobuf 进行编解码。
完整代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 . ├── geecache │ ├── byteview.go │ ├── cache.go │ ├── consistenthash │ │ ├── consistent_hash.go │ │ └── consistent_hash_test.go │ ├── geecache.go │ ├── geecachepb │ │ ├── geecachepb.pb.go │ │ └── geecachepb.proto │ ├── geecache_test.go │ ├── geers.go │ ├── http.go │ └── lru │ │ ├── lru.go │ │ └── lru_test.go │ └── singleflight │ └── singleflight.go ├── go.mod └── main.go
▶
geecache/geecachepb/geecachepb.proto
1 2 3 4 5 6 7 8 9 10 11 12 syntax = "proto3" ;option go_package = ".;geecachepb" ;message Request { string group = 1 ; string key = 2 ; }message Response { bytes value = 1 ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 package geecacheimport ( "fmt" "geecache/geecache/geecachepb" "geecache/geecache/singleflight" "log" "sync" )type Getter interface { Get(key string ) ([]byte , error ) }type GetterFunc func (key string ) ([]byte , error )func (f GetterFunc) Get(key string ) ([]byte , error ) { return f(key) }type Group struct { name string getter Getter mainCache cache peers PeerPicker loader *singleflight.Group }var ( groupMu sync.RWMutex groups = make (map [string ]*Group) )func NewGroup (name string , cacheBytes int64 , getter Getter) *Group { if getter == nil { panic ("nil Getter" ) } groupMu.Lock() defer groupMu.Unlock() g := &Group{ name: name, getter: getter, mainCache: cache{cacheBytes: cacheBytes}, loader: &singleflight.Group{}, } groups[name] = g return g }func GetGroup (name string ) *Group { groupMu.RLock() g := groups[name] groupMu.RUnlock() return g }func (g *Group) Get(key string ) (ByteView, error ) { if key == "" { return ByteView{}, fmt.Errorf("key is required" ) } if v, ok := g.mainCache.get(key); ok { log.Println("[GeeCache] hit" ) return v, nil } return g.load(key) }func (g *Group) load(key string ) (ByteView, error ) { val, err := g.loader.Do(key, func () (interface {}, error ) { if g.peers != nil { if peer, ok := g.peers.PickPeer(key); ok { value, err := g.getFromPeer(peer, key) if err == nil { return value, nil } log.Println("[GeeCache] failed to get from peer" , err) } } return g.getLocally(key) }) if err == nil { return val.(ByteView), nil } return ByteView{}, err }func (g *Group) getFromPeer(peer PeerGetter, key string ) (ByteView, error ) { req := &geecachepb.Request{ Group: g.name, Key: key, } res := &geecachepb.Response{} err := peer.Get(req, res) if err != nil { return ByteView{}, err } return ByteView{res.Value}, nil }func (g *Group) getLocally(key string ) (ByteView, error ) { bytes, err := g.getter.Get(key) if err != nil { return ByteView{}, err } value := ByteView{cloneBytes(bytes)} g.populateCache(key, value) return value, nil }func (g *Group) populateCache(key string , value ByteView) { g.mainCache.add(key, value) }func (g *Group) RegisterPeers(peers PeerPicker) { if g.peers != nil { panic ("RegisterPeerPicker called more than once" ) } g.peers = peers }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 package geecacheimport ( "fmt" "geecache/geecache/consistenthash" "geecache/geecache/geecachepb" "io" "log" "net/http" "net/url" "strings" "sync" "google.golang.org/protobuf/proto" )const ( defaultBasePath = "/_geecache/" defaultReplicas = 3 )type HTTPPool struct { self string basePath string mu sync.Mutex peers *consistenthash.Map httpGetters map [string ]*httpGetter }func NewHTTPPool (self string ) *HTTPPool { return &HTTPPool{ self: self, basePath: defaultBasePath, } }func (h *HTTPPool) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !strings.HasPrefix(r.URL.Path, h.basePath) { panic ("Error path: " + r.URL.Path) } h.Log("%s %s" , r.Method, r.URL.Path) parts := strings.SplitN(r.URL.Path[len (defaultBasePath):], "/" , 2 ) if len (parts) != 2 { http.Error(w, "bad request" , http.StatusBadRequest) return } groupName := parts[0 ] key := parts[1 ] group := GetGroup(groupName) if group == nil { http.Error(w, "no such group: " +groupName, http.StatusNotFound) return } view, err := group.Get(key) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } res := &geecachepb.Response{ Value: view.ByteSlice(), } body, err := proto.Marshal(res) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type" , "application/json" ) w.Write(body) }func (h *HTTPPool) Log(format string , v ...interface {}) { log.Printf("[Server %s] %s" , h.self, fmt.Sprintf(format, v...)) }func (h *HTTPPool) Set(peers ...string ) { h.mu.Lock() defer h.mu.Unlock() h.peers = consistenthash.New(defaultReplicas, nil ) h.peers.Add(peers...) h.httpGetters = make (map [string ]*httpGetter, len (peers)) for _, peer := range peers { h.httpGetters[peer] = &httpGetter{ baseURL: peer + h.basePath, } } }func (h *HTTPPool) PickPeer(key string ) (PeerGetter, bool ) { h.mu.Lock() defer h.mu.Unlock() if peer := h.peers.Get(key); peer != "" && peer != h.self { h.Log("Pick peer %s" , peer) return h.httpGetters[peer], true } return nil , false }type httpGetter struct { baseURL string }func (g *httpGetter) Get(in *geecachepb.Request, out *geecachepb.Response) error { group := in.Group key := in.Key u := fmt.Sprintf("%v%v/%v" , g.baseURL, url.QueryEscape(group), url.QueryEscape(key)) res, err := http.Get(u) if err != nil { return err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return fmt.Errorf("server returned %v" , res.Status) } bytes, err := io.ReadAll(res.Body) if err != nil { return fmt.Errorf("reading response body %v" , err) } err = proto.Unmarshal(bytes, out) if err != nil { return err } return nil }var _ PeerGetter = (*httpGetter)(nil )