Commit ac044945 authored by liuyuhang's avatar liuyuhang

add kvmvccmavl compact

parent 5e89ddfb
......@@ -36,6 +36,7 @@ var (
quit bool
isPrunedMavl bool // 是否是被裁剪过的 mavl
delPrunedMavlState int32 = delPrunedMavlStart // Upgrade时候删除pruned mavl的状态
isCompactDelMavl bool // 是否对删除mavl后压缩
)
const (
......@@ -149,6 +150,11 @@ func New(cfg *types.Store, sub []byte, chain33cfg *types.Chain33Config) queue.Mo
if err == nil {
isDelMavlData = true
}
// 查询是否已经压缩过
_, err = bs.GetDB().Get(genCompactDelMavlKey(mvccPrefix))
if err == nil {
isCompactDelMavl = true
}
// 查询是否是删除裁剪版mavl
isPrunedMavl = isPrunedMavlDB(bs.GetDB())
// 读取fork高度
......@@ -205,11 +211,6 @@ func (kvmMavls *KVmMavlStore) Set(datas *types.StoreSet, sync bool) ([]byte, err
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
// 删除Mavl数据
if datas.Height > delMavlDataHeight && !isDelMavlData && !isDelMavling() {
wg.Add(1)
go DelMavl(kvmMavls.GetDB())
}
return hash, err
}
......@@ -273,6 +274,10 @@ func (kvmMavls *KVmMavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte,
wg.Add(1)
go DelMavl(kvmMavls.GetDB())
}
// 对删除的mavl进行压缩
if isDelMavlData && !isCompactDelMavl && !isDelMavling() {
go CompactDelMavl(kvmMavls.GetDB())
}
return hash, err
}
......@@ -509,6 +514,24 @@ func setDelMavl(state int32) {
atomic.StoreInt32(&delMavlDataState, state)
}
func CompactDelMavl(db dbm.DB) {
setDelMavl(delMavlStateStart)
defer setDelMavl(delMavlStateEnd)
// 开始进行压缩处理
kmlog.Info("start Compact db")
err := db.CompactRange(nil, nil)
if err == nil {
db.Set(genCompactDelMavlKey(mvccPrefix), []byte(""))
isCompactDelMavl = true
}
kmlog.Info("end Compact db", "error", err)
}
func genCompactDelMavlKey(prefix []byte) []byte {
key := "--compactDelMavl--"
return []byte(fmt.Sprintf("%s%s", string(prefix), key))
}
func isNeedDelPrunedMavl() bool {
return atomic.LoadInt32(&delPrunedMavlState) == 0
}
......
......@@ -601,6 +601,24 @@ func TestDelMavlData(t *testing.T) {
require.NoError(t, err)
}
func TestCompactDelMavl(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg := newStoreCfg(dir)
store := New(storeCfg, nil, nil).(*KVmMavlStore)
assert.NotNil(t, store)
db := store.GetDB()
for i := 0; i < 100; i++ {
db.Set([]byte(GetRandomString(MaxKeylenth)), []byte(fmt.Sprintf("v%d", i)))
}
CompactDelMavl(db)
_, err = db.Get(genCompactDelMavlKey(mvccPrefix))
assert.NoError(t, err)
}
func TestPruning(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment