# compact 源码解析

当你通过 API 发起一个 compact 请求后：&#x20;

1. &#x20;KV Server 收到 Compact 请求提交到 raft 模块处理&#x20;
2. 在 raft 模块中提交后，apply 模块就会通过 MVCC 模块的 Compact 接口执行 此压缩任务

&#x20;Compact 接口首先会更新当前 server 已压缩的版本号，并将耗时昂贵的压缩任务保存到 FIFO 队列中异步执行：

1. 压缩任务执行时，它首先会压缩 treeIndex 模块中的 keyIndex 索引&#x20;
2. 其次会遍历 boltdb 中的 key ，删除已废弃的 key

## memoryStorage 接收并执行 compact

raft 模块对于 memoryStorage 定义：

注意 ents 这个变量，它是一个 Entry 结构体类型的变量（*ents\[i] has raft log position i+snapshot.Metadata.Index*），其中字段含义如下：

* Term
* Index
* Type
* Data

```go
// MemoryStorage implements the Storage interface backed by an
// in-memory array.

type MemoryStorage struct {
	// Protects access to all fields. Most methods of MemoryStorage are
	// run on the raft goroutine, but Append() is run on an application
	// goroutine.
	sync.Mutex

	hardState pb.HardState
	snapshot  pb.Snapshot
	
	// ents[i] has raft log position i+snapshot.Metadata.Index
	ents []pb.Entry

```

```go
type Entry struct {
   Term  uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
   Index uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
   Type  EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
   Data  []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
}
```

**memoryStorage Compact 实现逻辑：**

1. 计算当前 \* 中的索引位置 `offset := ms.ents[0].Index`
2. 判断压缩的版本是否小于等于这个位置
   1. 是：报错并返回
   2. 否：继续
3. 判断要压缩的版本，是否高于当前最大的版本
   1. 是：报错并返回
   2. 否：继续
4. 计算本次要压缩版本的偏移量，也就是要压缩的版本在这个数组中的下标 `i := compactIndex - offset`
5. 定义一个与原 ents 类型相同的空数组，长度为： 1，容量为：要保留的元素个数 + 1 `ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)`
6. 将空数组下标为 0 的元素（即：第一个元素）的 Index 和 Term 赋值为原数组第 i 个元素的 Index 和 Term&#x20;
7. 将原 ents 第 i 个元素后的所有内容（不包括第 i 个元素），填充到该数组的后部分（即：除了第一个元素外）`ents = append(ents, ms.ents[i+1:]...)`
8. 用新的 ents 整体替换旧的 ，这样即便 compact 操作过程耗时，也不会影响到其他操作`ms.ents = ents`&#x20;

```go
// Compact discards all log entries prior to compactIndex.
// It is the application's responsibility to not attempt to compact an index
// greater than raftLog.applied.

func (ms *MemoryStorage) Compact(compactIndex uint64) error {
	ms.Lock()
	defer ms.Unlock()
	offset := ms.ents[0].Index
	if compactIndex <= offset {
		return ErrCompacted
	}
	if compactIndex > ms.lastIndex() {
		getLogger().Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
	}

	i := compactIndex - offset
	ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
	ents[0].Index = ms.ents[i].Index
	ents[0].Term = ms.ents[i].Term
	ents = append(ents, ms.ents[i+1:]...)
	ms.ents = ents
	return nil
}
```

## KVStore 获得和应用 compact entry 的方式

在 server 启动时，会通过  `func (s *EtcdServer) run()` 实时的从队列中获去需要执行的信息 `case ap := <-s.r.apply()`

从 KVStore 中删除已 compact 的 version，就是其中一个分类。

那什么时候往队列里放东西呢？

当 raftNode ready 的时候。这个状态，我们后续再详细分析。

### func (s \*EtcdServer) apply

compact 属于 normal entry 的一种

```go
func (s *EtcdServer) apply(
  
   ...
   
   for i := range es {
     
     ...
     
      switch e.Type {
      case raftpb.EntryNormal:
         s.applyEntryNormal(&e)
        
        ...
}
```

### func (s \*EtcdServer) applyEntryNormal

```go
func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
	shouldApplyV3 := membership.ApplyV2storeOnly
	index := s.consistIndex.ConsistentIndex()
	if e.Index > index {
		// set the consistent index of current executing entry
		s.consistIndex.SetConsistentIndex(e.Index, e.Term)
		shouldApplyV3 = membership.ApplyBoth
	}
	
	...
	
	if needResult || !noSideEffect(&raftReq) {
	
		...
		
		ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
	}
	
	...
	
}
```

### func (a \*applierV3backend) Apply

r 是什么类型？通道吗？

```go
func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *applyResult {
   
   ...

   switch {
   
   ...
   
   case r.Compaction != nil:
      op = "Compaction"
      ar.resp, ar.physc, ar.trace, ar.err = a.s.applyV3.Compaction(r.Compaction)
  
   ...
   
}
```

### func (a \*applierV3backend) Compaction

```go
func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
	...
	
	ch, err := a.s.KV().Compact(trace, compaction.Revision)
	
	...
}
```

### func (s \*store) Compact

1. 将此次压缩信息加到 store 的 FIFO 的 schedule 中
2. 调用 store 的 compact 方法

```go
func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
	
	s.mu.Lock()

	ch, err := s.updateCompactRev(rev)
	
	s.mu.Unlock()

	return s.compact(trace, rev)
}
```

### func (s \*store) compact

1. 先操作 treeIndex，删除需要 compact 的 generation 和 revision [`kvindex.Compact`](https://liu-tongtong.gitbook.io/dba/etcd/compact-yuan-ma-jie-xi#func-ti-treeindex-compact)
2. 以 map 格式返回需要保留的 generation 和 revision 等信息，存放到变量 `keep` 中
3. store 根据此 map 异步 compact

```go
func (s *store) compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) {
		
		...
		keep := s.kvindex.Compact(rev)
		
		s.scheduleCompaction(rev, keep) 
			
}
```

### func (ti \*treeIndex) Compact

1. 加锁，确保操作安全
2. treeIndex 和 keyIndex 关系：
   1. [treeIndex](https://github.com/etcd-io/etcd/blob/v3.3.10/mvcc/index.go)顾名思义就是一个树状索引，它通过在内存中维护一个[B树](https://github.com/google/btree)，来达到加速查询key的功能。
   2. 这棵树的每一个节点都是 keyIndex

```go
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
	available := make(map[revision]struct{})
	ti.lg.Info("compact tree index", zap.Int64("revision", rev))
	ti.Lock()
	clone := ti.tree.Clone()
	ti.Unlock()

	clone.Ascend(func(item btree.Item) bool {
		keyi := item.(*keyIndex)
		//Lock is needed here to prevent modification to the keyIndex while
		//compaction is going on or revision added to empty before deletion
		ti.Lock()
		keyi.compact(ti.lg, rev, available)
		if keyi.isEmpty() {
			item := ti.tree.Delete(keyi)
			if item == nil {
				ti.lg.Panic("failed to delete during compaction")
			}
		}
		ti.Unlock()
		return true
	})
	return available
}
```

### func (ki \*keyIndex) compact

1. 通过 [`ki.doCompact`](https://liu-tongtong.gitbook.io/dba/etcd/compact-yuan-ma-jie-xi#func-ki-keyindex-docompact) 计算出 available map
2. 删除小于压缩版本的 revision
3. 删除小于压缩版本且标记为“已删除（tombstone）”的 revision
4. 删除已经不包含 revision 了的 generation

```go
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
	...

	genIdx, revIndex := ki.doCompact(atRev, available)

	if !g.isEmpty() {
		 
			g.revs = g.revs[revIndex:]
		
			delete(available, g.revs[0])
	
		}
	}
  
	ki.generations = ki.generations[genIdx:]
}
```

### func (ki \*keyIndex) doCompact

1. 到序遍历该 keyIndex 中 generation 的 revision 信息
2. 找到小于等于要 compact 的最小版本
3. 将其加入到 available map 中

```go
func (ki *keyIndex) doCompact(atRev int64, available map[revision]struct{}) (genIdx int, revIndex int) {

 ​ ...
	revIndex = g.walk(f)

}
```

### func (s \*store) scheduleCompaction

1. 预处理 + 后处理 `defer` + 加锁
2. 计数器等统计信息
3. 调用 backend 的 unsafeDelete

```go
func (s *store) scheduleCompaction(compactMainRev int64, keep map[revision]struct{}) bool {
   
   ...
            tx.UnsafeDelete(buckets.Key, key)

}
```

### func (t \*batchTx) UnsafeDelete

调用 bbolt 的方法执行真正的删除操作

```go
func (t *batchTx) UnsafeDelete(bucketType Bucket, key []byte) {
   bucket := t.tx.Bucket(bucketType.Name())
   
 ​  ...
   bucket.Delete(key)
   
}
```

## 技巧

从源码上，我们除了可以看出 etcd 是如何实现 compact 的以外，还可以学到一些小技巧。

比如在给切片做 append 时，一定要格外注意长度（len）和容量（cap）的区别。我们分别创建容量为 5， 和长度为 5 的切片，观察在它们之后做 append 的效果：

```go
package main

import "fmt"

func main() {

	capCase := make([]int, 0, 5)
	fmt.Printf("%v\n", append(capCase,[]int{3,4}...))

	lenCase := make([]int,5)
	fmt.Printf("%v", append(lenCase,[]int{3,4}... ))
}
```

输出：

\[3 4]&#x20;

\[0 0 0 0 0 3 4]


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://liu-tongtong.gitbook.io/dba/etcd/compact-yuan-ma-jie-xi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
