Compact 接口首先会更新当前 server 已压缩的版本号,并将耗时昂贵的压缩任务保存到 FIFO 队列中异步执行:
压缩任务执行时,它首先会压缩 treeIndex 模块中的 keyIndex 索引
其次会遍历 boltdb 中的 key ,删除已废弃的 key
memoryStorage 接收并执行 compact
raft 模块对于 memoryStorage 定义:
注意 ents 这个变量,它是一个 Entry 结构体类型的变量(ents[i] has raft log position i+snapshot.Metadata.Index),其中字段含义如下:
Term
Index
Type
Data
// MemoryStorage implements the Storage interface backed by an
// in-memory array.
type MemoryStorage struct {
// Protects access to all fields. Most methods of MemoryStorage are
// run on the raft goroutine, but Append() is run on an application
// goroutine.
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
// ents[i] has raft log position i+snapshot.Metadata.Index
ents []pb.Entry
type Entry struct {
Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
}
memoryStorage Compact 实现逻辑:
计算当前 * 中的索引位置 offset := ms.ents[0].Index
判断压缩的版本是否小于等于这个位置
是:报错并返回
否:继续
判断要压缩的版本,是否高于当前最大的版本
是:报错并返回
否:继续
计算本次要压缩版本的偏移量,也就是要压缩的版本在这个数组中的下标 i := compactIndex - offset
// Compact discards all log entries prior to compactIndex.
// It is the application's responsibility to not attempt to compact an index
// greater than raftLog.applied.
func (ms *MemoryStorage) Compact(compactIndex uint64) error {
ms.Lock()
defer ms.Unlock()
offset := ms.ents[0].Index
if compactIndex <= offset {
return ErrCompacted
}
if compactIndex > ms.lastIndex() {
getLogger().Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
}
i := compactIndex - offset
ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
ents[0].Index = ms.ents[i].Index
ents[0].Term = ms.ents[i].Term
ents = append(ents, ms.ents[i+1:]...)
ms.ents = ents
return nil
}
KVStore 获得和应用 compact entry 的方式
在 server 启动时,会通过 func (s *EtcdServer) run() 实时的从队列中获去需要执行的信息 case ap := <-s.r.apply()
从 KVStore 中删除已 compact 的 version,就是其中一个分类。
那什么时候往队列里放东西呢?
当 raftNode ready 的时候。这个状态,我们后续再详细分析。
func (s *EtcdServer) apply
compact 属于 normal entry 的一种
func (s *EtcdServer) apply(
...
for i := range es {
...
switch e.Type {
case raftpb.EntryNormal:
s.applyEntryNormal(&e)
...
}
func (s *EtcdServer) applyEntryNormal
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
shouldApplyV3 := membership.ApplyV2storeOnly
index := s.consistIndex.ConsistentIndex()
if e.Index > index {
// set the consistent index of current executing entry
s.consistIndex.SetConsistentIndex(e.Index, e.Term)
shouldApplyV3 = membership.ApplyBoth
}
...
if needResult || !noSideEffect(&raftReq) {
...
ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
}
...
}
func (a *applierV3backend) Apply
r 是什么类型?通道吗?
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
...
switch {
...
case r.Compaction != nil:
op = "Compaction"
ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
...
}
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
available := make(map[revision]struct{})
ti.lg.Info("compact tree index", zap.Int64("revision", rev))
ti.Lock()
clone := ti.tree.Clone()
ti.Unlock()
clone.Ascend(func(item btree.Item) bool {
keyi := item.(*keyIndex)
//Lock is needed here to prevent modification to the keyIndex while
//compaction is going on or revision added to empty before deletion
ti.Lock()
keyi.compact(ti.lg, rev, available)
if keyi.isEmpty() {
item := ti.tree.Delete(keyi)
if item == nil {
ti.lg.Panic("failed to delete during compaction")
}
}
ti.Unlock()
return true
})
return available
}
func (ki *keyIndex) compact
删除小于压缩版本的 revision
删除小于压缩版本且标记为“已删除(tombstone)”的 revision
删除已经不包含 revision 了的 generation
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
...
genIdx, revIndex := ki.doCompact(atRev, available)
if !g.isEmpty() {
g.revs = g.revs[revIndex:]
delete(available, g.revs[0])
}
}
ki.generations = ki.generations[genIdx:]
}
func (ki *keyIndex) doCompact
到序遍历该 keyIndex 中 generation 的 revision 信息
找到小于等于要 compact 的最小版本
将其加入到 available map 中
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {
...
revIndex = g.walk(f)
}