Commit afa2401a authored by liuyuhang's avatar liuyuhang

add kvmvcc prune

parent 41a71e98
......@@ -38,7 +38,6 @@ const (
batchDataSize = 1024 * 1024 * 1
delMavlStateStart = 1
delMavlStateEnd = 0
mvccPrefix = ".-mvcc-." //同common/db中的mvccPrefix一致
)
// SetLogLevel set log level
......@@ -65,6 +64,8 @@ type KVmMavlStore struct {
type subKVMVCCConfig struct {
EnableMVCCIter bool `json:"enableMVCCIter"`
EnableMavlPrune bool `json:"enableMavlPrune"`
PruneHeight int32 `json:"pruneHeight"`
}
type subMavlConfig struct {
......@@ -92,6 +93,9 @@ func New(cfg *types.Store, sub []byte) queue.Module {
if sub != nil {
types.MustDecode(sub, &subcfg)
subKVMVCCcfg.EnableMVCCIter = subcfg.EnableMVCCIter
subKVMVCCcfg.EnableMavlPrune = subcfg.EnableMavlPrune
subKVMVCCcfg.PruneHeight = subcfg.PruneHeight
subMavlcfg.EnableMavlPrefix = subcfg.EnableMavlPrefix
subMavlcfg.EnableMVCC = subcfg.EnableMVCC
subMavlcfg.EnableMavlPrune = subcfg.EnableMavlPrune
......@@ -294,9 +298,9 @@ func delMavlData(db dbm.DB) bool {
return true
}
func genDelMavlKey(prefix string) []byte {
func genDelMavlKey(prefix []byte) []byte {
delMavl := "--delMavlData--"
return []byte(fmt.Sprintf("%s%s", prefix, delMavl))
return []byte(fmt.Sprintf("%s%s", string(prefix), delMavl))
}
func isDelMavling() bool {
......
......@@ -10,9 +10,38 @@ import (
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto"
"sync/atomic"
"fmt"
"strconv"
"bytes"
)
var maxRollbackNum = 200
const (
pruningStateStart = 1
pruningStateEnd = 0
onceScanCount = 10000 // 单次扫描数目
onceCount = 1000 // 容器长度
LevelPruningHeight = 100 * 10000
)
var (
maxRollbackNum = 200
// 是否开启裁剪
enablePrune bool
// 每个10000裁剪一次
pruneHeight = 10000
pruningState int32
)
var (
//同common/db中的mvcc相关的定义保持一致
mvccPrefix = []byte(".-mvcc-.")
mvccMeta = append(mvccPrefix, []byte("m.")...)
mvccData = append(mvccPrefix, []byte("d.")...)
mvccLast = append(mvccPrefix, []byte("l.")...)
mvccMetaVersion = append(mvccMeta, []byte("version.")...)
mvccMetaVersionKeyList = append(mvccMeta, []byte("versionkl.")...)
)
// KVMVCCStore provide kvmvcc store interface implementation
type KVMVCCStore struct {
......@@ -20,6 +49,8 @@ type KVMVCCStore struct {
mvcc dbm.MVCC
kvsetmap map[string][]*types.KeyValue
enableMVCCIter bool
enableMavlPrune bool
pruneHeight int32
}
// NewKVMVCC construct KVMVCCStore module
......@@ -30,10 +61,14 @@ func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore {
enable = sub.EnableMVCCIter
}
if enable {
kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue), true}
kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue),
true, sub.EnableMavlPrune, sub.PruneHeight}
} else {
kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue), false}
kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue),
false, sub.EnableMavlPrune, sub.PruneHeight}
}
EnablePrune(sub.EnableMavlPrune)
SetPruneHeight(int(sub.PruneHeight))
return kvs
}
......@@ -92,6 +127,14 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool)
kvset = append(kvset, kvlist...)
}
mvccs.kvsetmap[string(hash)] = kvset
// 进行裁剪
if enablePrune && !isPruning() &&
pruneHeight != 0 &&
datas.Height%int64(pruneHeight) == 0 &&
datas.Height/int64(pruneHeight) > 1 {
wg.Add(1)
go pruning(mvccs.db, datas.Height)
}
return hash, nil
}
......@@ -236,3 +279,117 @@ func calcHash(datas proto.Message) []byte {
b := types.Encode(datas)
return common.Sha256(b)
}
//裁剪-------------------------------------------
// EnablePrune 使能裁剪
func EnablePrune(enable bool) {
enablePrune = enable
}
// SetPruneHeight 设置每次裁剪高度
func SetPruneHeight(height int) {
pruneHeight = height
}
func pruning(db dbm.DB, height int64) {
defer wg.Done()
pruningMVCC(db, height)
}
func pruningMVCC(db dbm.DB, height int64) {
setPruning(pruningStateStart)
defer setPruning(pruningStateEnd)
pruningFirst(db, height)
}
func pruningFirst(db dbm.DB, curHeight int64) {
it := db.Iterator(mvccData, nil, true)
defer it.Close()
var mp map[string][]int64
count := 0
batch := db.NewBatch(true)
for it.Rewind(); it.Valid(); it.Next() {
if quit {
//该处退出
return
}
if mp == nil {
mp = make(map[string][]int64, onceCount)
}
key, height, err := getKeyVersion(it.Key())
if err != nil {
continue
}
if curHeight < int64(height) + LevelPruningHeight &&
curHeight >= int64(height) + int64(pruneHeight) {
mp[string(key)] = append(mp[string(key)], height)
count++
}
if len(mp) >= onceCount-1 || count > onceScanCount {
deleteOldKV(mp, curHeight, batch)
mp = nil
count = 0
}
}
}
func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
if len(mp) == 0 {
return
}
batch.Reset()
for key, vals := range mp {
if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除
for _, val := range vals[1:] { //从第二个开始判断
if curHeight >= val + int64(pruneHeight) {
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key
if batch.ValueSize() > batchDataSize {
batch.Write()
batch.Reset()
}
}
}
}
delete(mp, key)
}
batch.Write()
}
func genKeyVersion(key []byte, height int64) []byte {
b := append([]byte{}, mvccData...)
newkey := append(b, key...)
newkey = append(newkey, []byte(".")...)
newkey = append(newkey, pad(height)...)
return newkey
}
func getKeyVersion(vsnKey []byte) ([]byte, int64, error) {
if len(vsnKey) <= len(mvccData) + 1 + 20 {
return nil, 0, types.ErrSize
}
sLen := vsnKey[len(vsnKey)-20:]
iLen, err := strconv.Atoi(string(sLen))
if err != nil {
return nil, 0, types.ErrSize
}
k := bytes.TrimPrefix(vsnKey, mvccData)
key := k[:len(k)-1-20]
return key, int64(iLen), nil
}
func pad(version int64) []byte {
s := fmt.Sprintf("%020d", version)
return []byte(s)
}
func isPruning() bool {
return atomic.LoadInt32(&pruningState) == 1
}
func setPruning(state int32) {
atomic.StoreInt32(&pruningState, state)
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment