compact 源码解析

当你通过 API 发起一个 compact 请求后:

  1. KV Server 收到 Compact 请求提交到 raft 模块处理

  2. 在 raft 模块中提交后,apply 模块就会通过 MVCC 模块的 Compact 接口执行 此压缩任务

Compact 接口首先会更新当前 server 已压缩的版本号,并将耗时昂贵的压缩任务保存到 FIFO 队列中异步执行:

  1. 压缩任务执行时,它首先会压缩 treeIndex 模块中的 keyIndex 索引

  2. 其次会遍历 boltdb 中的 key ,删除已废弃的 key

memoryStorage 接收并执行 compact

raft 模块对于 memoryStorage 定义:

注意 ents 这个变量,它是一个 Entry 结构体类型的变量(ents[i] has raft log position i+snapshot.Metadata.Index),其中字段含义如下:

  • Term

  • Index

  • Type

  • Data

// MemoryStorage implements the Storage interface backed by an
// in-memory array.

type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex

	hardState pb.HardState
	snapshot  pb.Snapshot
	
	// ents[i] has raft log position i+snapshot.Metadata.Index
	ents []pb.Entry

memoryStorage Compact 实现逻辑:

  1. 计算当前 * 中的索引位置 offset := ms.ents[0].Index

  2. 判断压缩的版本是否小于等于这个位置

    1. 是:报错并返回

    2. 否:继续

  3. 判断要压缩的版本,是否高于当前最大的版本

    1. 是:报错并返回

    2. 否:继续

  4. 计算本次要压缩版本的偏移量,也就是要压缩的版本在这个数组中的下标 i := compactIndex - offset

  5. 定义一个与原 ents 类型相同的空数组,长度为: 1,容量为:要保留的元素个数 + 1 ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)

  6. 将空数组下标为 0 的元素(即:第一个元素)的 Index 和 Term 赋值为原数组第 i 个元素的 Index 和 Term

  7. 将原 ents 第 i 个元素后的所有内容(不包括第 i 个元素),填充到该数组的后部分(即:除了第一个元素外)ents = append(ents, ms.ents[i+1:]...)

  8. 用新的 ents 整体替换旧的 ,这样即便 compact 操作过程耗时,也不会影响到其他操作ms.ents = ents

KVStore 获得和应用 compact entry 的方式

在 server 启动时,会通过 func (s *EtcdServer) run() 实时的从队列中获去需要执行的信息 case ap := <-s.r.apply()

从 KVStore 中删除已 compact 的 version,就是其中一个分类。

那什么时候往队列里放东西呢?

当 raftNode ready 的时候。这个状态,我们后续再详细分析。

func (s *EtcdServer) apply

compact 属于 normal entry 的一种

func (s *EtcdServer) applyEntryNormal

func (a *applierV3backend) Apply

r 是什么类型?通道吗?

func (a *applierV3backend) Compaction

func (s *store) Compact

  1. 将此次压缩信息加到 store 的 FIFO 的 schedule 中

  2. 调用 store 的 compact 方法

func (s *store) compact

  1. 先操作 treeIndex,删除需要 compact 的 generation 和 revision kvindex.Compact

  2. 以 map 格式返回需要保留的 generation 和 revision 等信息,存放到变量 keep

  3. store 根据此 map 异步 compact

func (ti *treeIndex) Compact

  1. 加锁,确保操作安全

  2. treeIndex 和 keyIndex 关系:

    1. treeIndex顾名思义就是一个树状索引,它通过在内存中维护一个B树,来达到加速查询key的功能。

    2. 这棵树的每一个节点都是 keyIndex

func (ki *keyIndex) compact

  1. 通过 ki.doCompact 计算出 available map

  2. 删除小于压缩版本的 revision

  3. 删除小于压缩版本且标记为“已删除(tombstone)”的 revision

  4. 删除已经不包含 revision 了的 generation

func (ki *keyIndex) doCompact

  1. 到序遍历该 keyIndex 中 generation 的 revision 信息

  2. 找到小于等于要 compact 的最小版本

  3. 将其加入到 available map 中

func (s *store) scheduleCompaction

  1. 预处理 + 后处理 defer + 加锁

  2. 计数器等统计信息

  3. 调用 backend 的 unsafeDelete

func (t *batchTx) UnsafeDelete

调用 bbolt 的方法执行真正的删除操作

技巧

从源码上,我们除了可以看出 etcd 是如何实现 compact 的以外,还可以学到一些小技巧。

比如在给切片做 append 时,一定要格外注意长度(len)和容量(cap)的区别。我们分别创建容量为 5, 和长度为 5 的切片,观察在它们之后做 append 的效果:

输出:

[3 4]

[0 0 0 0 0 3 4]

最后更新于

这有帮助吗?