Commit 1251ed80 authored by liuyuhang's avatar liuyuhang

modify kvmvccmal global config

parent f53537d2
...@@ -610,10 +610,7 @@ func TestPruning(t *testing.T) { ...@@ -610,10 +610,7 @@ func TestPruning(t *testing.T) {
store := New(storeCfg, nil, nil).(*KVmMavlStore) store := New(storeCfg, nil, nil).(*KVmMavlStore)
assert.NotNil(t, store) assert.NotNil(t, store)
kvmvccStore := NewKVMVCC(&subKVMVCCConfig{}, store.GetDB()) kvmvccStore := NewKVMVCC(&subKVMVCCConfig{PruneHeight:10}, store.GetDB())
SetPruneHeight(10)
defer SetPruneHeight(0)
var kv []*types.KeyValue var kv []*types.KeyValue
var key string var key string
...@@ -649,7 +646,7 @@ func TestPruning(t *testing.T) { ...@@ -649,7 +646,7 @@ func TestPruning(t *testing.T) {
hashes = append(hashes, hash) hashes = append(hashes, hash)
} }
pruningMVCC(store.GetDB(), 99) pruningMVCC(store.GetDB(), 99, &KVMCCCConfig{PruneHeight:10})
//check //check
getDatas := &types.StoreGet{ getDatas := &types.StoreGet{
......
...@@ -25,13 +25,10 @@ const ( ...@@ -25,13 +25,10 @@ const (
onceScanCount = 10000 // 单次扫描数目 onceScanCount = 10000 // 单次扫描数目
onceCount = 1000 // 容器长度 onceCount = 1000 // 容器长度
levelPruningHeight = 100 * 10000 levelPruningHeight = 100 * 10000
defaultPruneHeight = 10000 // 每个10000裁剪一次
) )
var ( var (
// 是否开启裁剪
enablePrune bool
// 每个10000裁剪一次
pruneHeight = 10000
pruningState int32 pruningState int32
batch dbm.Batch batch dbm.Batch
) )
...@@ -46,33 +43,41 @@ var ( ...@@ -46,33 +43,41 @@ var (
//mvccMetaVersionKeyList = append(mvccMeta, []byte("versionkl.")...) //mvccMetaVersionKeyList = append(mvccMeta, []byte("versionkl.")...)
) )
type KVMCCCConfig struct {
EnableMVCCIter bool
EnableMavlPrune bool
PruneHeight int32
}
// KVMVCCStore provide kvmvcc store interface implementation // KVMVCCStore provide kvmvcc store interface implementation
type KVMVCCStore struct { type KVMVCCStore struct {
db dbm.DB db dbm.DB
mvcc dbm.MVCC mvcc dbm.MVCC
kvsetmap map[string][]*types.KeyValue kvsetmap map[string][]*types.KeyValue
enableMVCCIter bool
enableMavlPrune bool
pruneHeight int32
sync bool sync bool
kvmvccCfg *KVMCCCConfig
} }
// NewKVMVCC construct KVMVCCStore module // NewKVMVCC construct KVMVCCStore module
func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore { func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore {
var kvs *KVMVCCStore var kvs *KVMVCCStore
enable := false if sub == nil {
if sub != nil { panic("sub is nil memory")
enable = sub.EnableMVCCIter }
if sub.PruneHeight == 0 {
sub.PruneHeight = defaultPruneHeight
}
kvmvccCfg := &KVMCCCConfig{
EnableMVCCIter: sub.EnableMVCCIter,
EnableMavlPrune: sub.EnableMavlPrune,
PruneHeight: sub.PruneHeight,
} }
if enable { if kvmvccCfg.EnableMVCCIter {
kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue), kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue),false, kvmvccCfg}
true, sub.EnableMavlPrune, sub.PruneHeight, false}
} else { } else {
kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue), kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue),false, kvmvccCfg}
false, sub.EnableMavlPrune, sub.PruneHeight, false}
} }
EnablePrune(sub.EnableMavlPrune)
SetPruneHeight(int(sub.PruneHeight))
return kvs return kvs
} }
...@@ -137,12 +142,12 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ...@@ -137,12 +142,12 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool)
mvccs.kvsetmap[string(hash)] = kvset mvccs.kvsetmap[string(hash)] = kvset
mvccs.sync = sync mvccs.sync = sync
// 进行裁剪 // 进行裁剪
if enablePrune && !isPruning() && if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMavlPrune &&
pruneHeight != 0 && !isPruning() && mvccs.kvmvccCfg.PruneHeight != 0 &&
datas.Height%int64(pruneHeight) == 0 && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 &&
datas.Height/int64(pruneHeight) > 1 { datas.Height/int64(mvccs.kvmvccCfg.PruneHeight) > 1 {
wg.Add(1) wg.Add(1)
go pruning(mvccs.db, datas.Height) go pruning(mvccs.db, datas.Height, mvccs.kvmvccCfg)
} }
return hash, nil return hash, nil
} }
...@@ -205,7 +210,7 @@ func (mvccs *KVMVCCStore) Rollback(req *types.ReqHash) ([]byte, error) { ...@@ -205,7 +210,7 @@ func (mvccs *KVMVCCStore) Rollback(req *types.ReqHash) ([]byte, error) {
// IterateRangeByStateHash travel with Prefix by StateHash to get the latest version kvs. // IterateRangeByStateHash travel with Prefix by StateHash to get the latest version kvs.
func (mvccs *KVMVCCStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) { func (mvccs *KVMVCCStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
if !mvccs.enableMVCCIter { if !mvccs.kvmvccCfg.EnableMVCCIter {
panic("call IterateRangeByStateHash when disable mvcc iter") panic("call IterateRangeByStateHash when disable mvcc iter")
} }
//按照kv最新值来进行遍历处理,要求statehash必须是最新区块的statehash,否则不支持该接口 //按照kv最新值来进行遍历处理,要求statehash必须是最新区块的statehash,否则不支持该接口
...@@ -339,31 +344,21 @@ func calcHash(datas proto.Message) []byte { ...@@ -339,31 +344,21 @@ func calcHash(datas proto.Message) []byte {
/*裁剪-------------------------------------------*/ /*裁剪-------------------------------------------*/
// EnablePrune 使能裁剪 func pruning(db dbm.DB, height int64, KVmvccCfg *KVMCCCConfig) {
func EnablePrune(enable bool) {
enablePrune = enable
}
// SetPruneHeight 设置每次裁剪高度
func SetPruneHeight(height int) {
pruneHeight = height
}
func pruning(db dbm.DB, height int64) {
defer wg.Done() defer wg.Done()
pruningMVCC(db, height) pruningMVCC(db, height, KVmvccCfg)
} }
func pruningMVCC(db dbm.DB, height int64) { func pruningMVCC(db dbm.DB, height int64, KVmvccCfg *KVMCCCConfig) {
setPruning(pruningStateStart) setPruning(pruningStateStart)
defer setPruning(pruningStateEnd) defer setPruning(pruningStateEnd)
start := time.Now() start := time.Now()
pruningFirst(db, height) pruningFirst(db, height, KVmvccCfg)
end := time.Now() end := time.Now()
kmlog.Debug("pruningMVCC", "height", height, "cost", end.Sub(start)) kmlog.Debug("pruningMVCC", "height", height, "cost", end.Sub(start))
} }
func pruningFirst(db dbm.DB, curHeight int64) { func pruningFirst(db dbm.DB, curHeight int64, KVmvccCfg *KVMCCCConfig) {
it := db.Iterator(mvccData, nil, true) it := db.Iterator(mvccData, nil, true)
defer it.Close() defer it.Close()
...@@ -385,24 +380,24 @@ func pruningFirst(db dbm.DB, curHeight int64) { ...@@ -385,24 +380,24 @@ func pruningFirst(db dbm.DB, curHeight int64) {
} }
if curHeight < height+levelPruningHeight && if curHeight < height+levelPruningHeight &&
curHeight >= height+int64(pruneHeight) { curHeight >= height+int64(KVmvccCfg.PruneHeight) {
mp[string(key)] = append(mp[string(key)], height) mp[string(key)] = append(mp[string(key)], height)
count++ count++
} }
if len(mp) >= onceCount-1 || count > onceScanCount { if len(mp) >= onceCount-1 || count > onceScanCount {
deleteOldKV(mp, curHeight, batch) deleteOldKV(mp, curHeight, batch, KVmvccCfg)
mp = nil mp = nil
count = 0 count = 0
} }
} }
if len(mp) > 0 { if len(mp) > 0 {
deleteOldKV(mp, curHeight, batch) deleteOldKV(mp, curHeight, batch, KVmvccCfg)
mp = nil mp = nil
_ = mp _ = mp
} }
} }
func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) { func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch, KVmvccCfg *KVMCCCConfig) {
if len(mp) == 0 { if len(mp) == 0 {
return return
} }
...@@ -410,7 +405,7 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) { ...@@ -410,7 +405,7 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
for key, vals := range mp { for key, vals := range mp {
if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除 if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除
for _, val := range vals[1:] { //从第二个开始判断 for _, val := range vals[1:] { //从第二个开始判断
if curHeight >= val+int64(pruneHeight) { if curHeight >= val+int64(KVmvccCfg.PruneHeight) {
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
dbm.MustWrite(batch) dbm.MustWrite(batch)
......
...@@ -24,12 +24,9 @@ const ( ...@@ -24,12 +24,9 @@ const (
// MavlStore mavl store struct // MavlStore mavl store struct
type MavlStore struct { type MavlStore struct {
db dbm.DB db dbm.DB
trees *sync.Map trees *sync.Map
enableMavlPrefix bool treeCfg *mavl.TreeConfig
enableMVCC bool
enableMavlPrune bool
pruneHeight int32
} }
// NewMavl new mavl store module // NewMavl new mavl store module
...@@ -44,14 +41,21 @@ func NewMavl(sub *subMavlConfig, db dbm.DB) *MavlStore { ...@@ -44,14 +41,21 @@ func NewMavl(sub *subMavlConfig, db dbm.DB) *MavlStore {
subcfg.EnableMemVal = sub.EnableMemVal subcfg.EnableMemVal = sub.EnableMemVal
subcfg.TkCloseCacheLen = sub.TkCloseCacheLen subcfg.TkCloseCacheLen = sub.TkCloseCacheLen
} }
mavls := &MavlStore{db, &sync.Map{}, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight}
mavl.EnableMavlPrefix(subcfg.EnableMavlPrefix) // 开启裁剪需要同时开启前缀树
mavl.EnableMVCC(subcfg.EnableMVCC) if subcfg.EnableMavlPrune {
mavl.EnablePrune(subcfg.EnableMavlPrune) subcfg.EnableMavlPrefix = subcfg.EnableMavlPrune
mavl.SetPruneHeight(int(subcfg.PruneHeight)) }
mavl.EnableMemTree(subcfg.EnableMemTree) treeCfg := &mavl.TreeConfig{
mavl.EnableMemVal(subcfg.EnableMemVal) EnableMavlPrefix: subcfg.EnableMavlPrefix,
mavl.TkCloseCacheLen(subcfg.TkCloseCacheLen) EnableMVCC: subcfg.EnableMVCC,
EnableMavlPrune: subcfg.EnableMavlPrune,
PruneHeight: subcfg.PruneHeight,
EnableMemTree: subcfg.EnableMemTree,
EnableMemVal: subcfg.EnableMemVal,
TkCloseCacheLen: subcfg.TkCloseCacheLen,
}
mavls := &MavlStore{db, &sync.Map{}, treeCfg}
return mavls return mavls
} }
...@@ -63,7 +67,7 @@ func (mavls *MavlStore) Close() { ...@@ -63,7 +67,7 @@ func (mavls *MavlStore) Close() {
// Set set k v to mavl store db; sync is true represent write sync // Set set k v to mavl store db; sync is true represent write sync
func (mavls *MavlStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) { func (mavls *MavlStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
return mavl.SetKVPair(mavls.db, datas, sync) return mavl.SetKVPair(mavls.db, datas, sync, mavls.treeCfg)
} }
// Get get values by keys // Get get values by keys
...@@ -75,7 +79,7 @@ func (mavls *MavlStore) Get(datas *types.StoreGet) [][]byte { ...@@ -75,7 +79,7 @@ func (mavls *MavlStore) Get(datas *types.StoreGet) [][]byte {
if data, ok := mavls.trees.Load(search); ok { if data, ok := mavls.trees.Load(search); ok {
tree = data.(*mavl.Tree) tree = data.(*mavl.Tree)
} else { } else {
tree = mavl.NewTree(mavls.db, true) tree = mavl.NewTree(mavls.db, true, mavls.treeCfg)
//get接口也应该传入高度 //get接口也应该传入高度
//tree.SetBlockHeight(datas.Height) //tree.SetBlockHeight(datas.Height)
err = tree.Load(datas.StateHash) err = tree.Load(datas.StateHash)
...@@ -103,7 +107,7 @@ func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) ...@@ -103,7 +107,7 @@ func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error)
mavls.trees.Store(string(datas.StateHash), nil) mavls.trees.Store(string(datas.StateHash), nil)
return datas.StateHash, nil return datas.StateHash, nil
} }
tree := mavl.NewTree(mavls.db, sync) tree := mavl.NewTree(mavls.db, sync, mavls.treeCfg)
tree.SetBlockHeight(datas.Height) tree.SetBlockHeight(datas.Height)
err := tree.Load(datas.StateHash) err := tree.Load(datas.StateHash)
if err != nil { if err != nil {
...@@ -127,7 +131,7 @@ func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, ...@@ -127,7 +131,7 @@ func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte,
kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null") kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
return datas.StateHash, nil return datas.StateHash, nil
} }
tree := mavl.NewTree(mavls.db, sync) tree := mavl.NewTree(mavls.db, sync, mavls.treeCfg)
tree.SetBlockHeight(datas.Height) tree.SetBlockHeight(datas.Height)
err := tree.Load(datas.StateHash) err := tree.Load(datas.StateHash)
if err != nil { if err != nil {
...@@ -182,7 +186,7 @@ func (mavls *MavlStore) Rollback(req *types.ReqHash) ([]byte, error) { ...@@ -182,7 +186,7 @@ func (mavls *MavlStore) Rollback(req *types.ReqHash) ([]byte, error) {
// IterateRangeByStateHash 迭代实现功能; statehash:当前状态hash, start:开始查找的key, end: 结束的key, ascending:升序,降序, fn 迭代回调函数 // IterateRangeByStateHash 迭代实现功能; statehash:当前状态hash, start:开始查找的key, end: 结束的key, ascending:升序,降序, fn 迭代回调函数
func (mavls *MavlStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) { func (mavls *MavlStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
mavl.IterateRangeByStateHash(mavls.db, statehash, start, end, ascending, fn) mavl.IterateRangeByStateHash(mavls.db, statehash, start, end, ascending, mavls.treeCfg, fn)
} }
// ProcEvent not support message // ProcEvent not support message
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment