Commit 8efcb50b authored by yxq's avatar yxq Committed by vipwzw

update: reduce mvcc

parent 02e1456b
...@@ -81,8 +81,9 @@ type KVmMavlStore struct { ...@@ -81,8 +81,9 @@ type KVmMavlStore struct {
type subKVMVCCConfig struct { type subKVMVCCConfig struct {
EnableMVCCIter bool `json:"enableMVCCIter"` EnableMVCCIter bool `json:"enableMVCCIter"`
EnableMVCCPrune bool `json:"enableMVCCPrune"` EnableMVCCPrune bool `json:"enableMVCCPrune"`
PruneHeight int32 `json:"pruneHeight"` PruneHeight int32 `json:"pruneHeight"` //每PruneHeight高度做一次状态精简
EnableEmptyBlockHandle bool `json:"enableEmptyBlockHandle"` EnableEmptyBlockHandle bool `json:"enableEmptyBlockHandle"`
ReservedHeight int64 `json:"reservedHeight"` //保留最新的ReservedHeight个区块的状态数据
} }
type subMavlConfig struct { type subMavlConfig struct {
...@@ -114,6 +115,8 @@ type subConfig struct { ...@@ -114,6 +115,8 @@ type subConfig struct {
TkCloseCacheLen int32 `json:"tkCloseCacheLen"` TkCloseCacheLen int32 `json:"tkCloseCacheLen"`
// 使能空块处理 // 使能空块处理
EnableEmptyBlockHandle bool `json:"enableEmptyBlockHandle"` EnableEmptyBlockHandle bool `json:"enableEmptyBlockHandle"`
//保留的状态数据的高度
ReservedHeight int64 `json:"reservedHeight"`
} }
// New construct KVMVCCStore module // New construct KVMVCCStore module
...@@ -128,6 +131,7 @@ func New(cfg *types.Store, sub []byte, chain33cfg *types.Chain33Config) queue.Mo ...@@ -128,6 +131,7 @@ func New(cfg *types.Store, sub []byte, chain33cfg *types.Chain33Config) queue.Mo
subKVMVCCcfg.EnableMVCCPrune = subcfg.EnableMVCCPrune subKVMVCCcfg.EnableMVCCPrune = subcfg.EnableMVCCPrune
subKVMVCCcfg.PruneHeight = subcfg.PruneMVCCHeight subKVMVCCcfg.PruneHeight = subcfg.PruneMVCCHeight
subKVMVCCcfg.EnableEmptyBlockHandle = subcfg.EnableEmptyBlockHandle subKVMVCCcfg.EnableEmptyBlockHandle = subcfg.EnableEmptyBlockHandle
subKVMVCCcfg.ReservedHeight = subcfg.ReservedHeight
subMavlcfg.EnableMavlPrefix = subcfg.EnableMavlPrefix subMavlcfg.EnableMavlPrefix = subcfg.EnableMavlPrefix
subMavlcfg.EnableMVCC = subcfg.EnableMVCC subMavlcfg.EnableMVCC = subcfg.EnableMVCC
......
...@@ -664,7 +664,8 @@ func TestPruning(t *testing.T) { ...@@ -664,7 +664,8 @@ func TestPruning(t *testing.T) {
hashes = append(hashes, hash) hashes = append(hashes, hash)
} }
pruningMVCC(store.GetDB(), 99, &KVMCCCConfig{PruneHeight: 10}) pruningMVCCData(store.GetDB(), 80)
pruningMVCCMeta(store.GetDB(), 80)
//check //check
getDatas := &types.StoreGet{ getDatas := &types.StoreGet{
......
...@@ -36,31 +36,23 @@ var ( ...@@ -36,31 +36,23 @@ var (
var ( var (
//同common/db中的mvcc相关的定义保持一致 //同common/db中的mvcc相关的定义保持一致
mvccPrefix = []byte(".-mvcc-.") mvccPrefix = []byte(".-mvcc-.")
//mvccMeta = append(mvccPrefix, []byte("m.")...) mvccMeta = append(mvccPrefix, []byte("m.")...)
mvccData = append(mvccPrefix, []byte("d.")...) mvccData = append(mvccPrefix, []byte("d.")...)
//mvccLast = append(mvccPrefix, []byte("l.")...) //mvccLast = append(mvccPrefix, []byte("l.")...)
//mvccMetaVersion = append(mvccMeta, []byte("version.")...) mvccMetaVersion = append(mvccMeta, []byte("version.")...)
//mvccMetaVersionKeyList = append(mvccMeta, []byte("versionkl.")...) mvccMetaVersionKeyList = append(mvccMeta, []byte("versionkl.")...)
// for empty block // for empty block
rdmHashPrefix = append(mvccPrefix, []byte("rdm.")...) rdmHashPrefix = append(mvccPrefix, []byte("rdm.")...)
) )
// KVMCCCConfig KVMCCC config
type KVMCCCConfig struct {
EnableMVCCIter bool
EnableMVCCPrune bool
PruneHeight int32
EnableEmptyBlockHandle bool
}
// KVMVCCStore provide kvmvcc store interface implementation // KVMVCCStore provide kvmvcc store interface implementation
type KVMVCCStore struct { type KVMVCCStore struct {
db dbm.DB db dbm.DB
mvcc dbm.MVCC mvcc dbm.MVCC
kvsetmap map[string][]*types.KeyValue kvsetmap map[string][]*types.KeyValue
sync bool sync bool
kvmvccCfg *KVMCCCConfig kvmvccCfg *subKVMVCCConfig
} }
// NewKVMVCC construct KVMVCCStore module // NewKVMVCC construct KVMVCCStore module
...@@ -72,16 +64,10 @@ func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore { ...@@ -72,16 +64,10 @@ func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore {
if sub.PruneHeight == 0 { if sub.PruneHeight == 0 {
sub.PruneHeight = defaultPruneHeight sub.PruneHeight = defaultPruneHeight
} }
kvmvccCfg := &KVMCCCConfig{ if sub.EnableMVCCIter {
EnableMVCCIter: sub.EnableMVCCIter, kvs = &KVMVCCStore{db: db, mvcc: dbm.NewMVCCIter(db), kvsetmap: make(map[string][]*types.KeyValue), kvmvccCfg: sub}
EnableMVCCPrune: sub.EnableMVCCPrune,
PruneHeight: sub.PruneHeight,
EnableEmptyBlockHandle: sub.EnableEmptyBlockHandle,
}
if kvmvccCfg.EnableMVCCIter {
kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue), false, kvmvccCfg}
} else { } else {
kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue), false, kvmvccCfg} kvs = &KVMVCCStore{db: db, mvcc: dbm.NewMVCC(db), kvsetmap: make(map[string][]*types.KeyValue), kvmvccCfg: sub}
} }
return kvs return kvs
} }
...@@ -148,11 +134,9 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ...@@ -148,11 +134,9 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool)
mvccs.sync = sync mvccs.sync = sync
// 进行裁剪 // 进行裁剪
if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune && if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune &&
!isPruning() && mvccs.kvmvccCfg.PruneHeight != 0 && !isPruning() && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 {
datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 &&
datas.Height/int64(mvccs.kvmvccCfg.PruneHeight) > 1 {
wg.Add(1) wg.Add(1)
go pruning(mvccs.db, datas.Height, mvccs.kvmvccCfg) go pruningMVCC(mvccs.db, datas.Height-mvccs.kvmvccCfg.ReservedHeight)
} }
return hash, nil return hash, nil
} }
...@@ -421,11 +405,9 @@ func (mvccs *KVMVCCStore) MemSetRdm(datas *types.StoreSet, mavlHash []byte, sync ...@@ -421,11 +405,9 @@ func (mvccs *KVMVCCStore) MemSetRdm(datas *types.StoreSet, mavlHash []byte, sync
// 进行裁剪 // 进行裁剪
if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune && if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune &&
!isPruning() && mvccs.kvmvccCfg.PruneHeight != 0 && !isPruning() && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 {
datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 &&
datas.Height/int64(mvccs.kvmvccCfg.PruneHeight) > 1 {
wg.Add(1) wg.Add(1)
go pruning(mvccs.db, datas.Height, mvccs.kvmvccCfg) go pruningMVCC(mvccs.db, datas.Height-mvccs.kvmvccCfg.ReservedHeight)
} }
return hash, nil return hash, nil
} }
...@@ -455,94 +437,94 @@ func calcRdmKey(hash []byte, height int64) []byte { ...@@ -455,94 +437,94 @@ func calcRdmKey(hash []byte, height int64) []byte {
} }
/*裁剪-------------------------------------------*/ /*裁剪-------------------------------------------*/
func pruningMVCC(db dbm.DB, curHeight int64) {
func pruning(db dbm.DB, height int64, KVmvccCfg *KVMCCCConfig) {
defer wg.Done() defer wg.Done()
pruningMVCC(db, height, KVmvccCfg) if curHeight <= 0 {
} return
}
func pruningMVCC(db dbm.DB, height int64, KVmvccCfg *KVMCCCConfig) {
setPruning(pruningStateStart) setPruning(pruningStateStart)
defer setPruning(pruningStateEnd) defer setPruning(pruningStateEnd)
start := time.Now() start := time.Now()
pruningFirst(db, height, KVmvccCfg) pruningMVCCData(db, curHeight)
end := time.Now() pruningMVCCMeta(db, curHeight)
kmlog.Debug("pruningMVCC", "height", height, "cost", end.Sub(start)) _ = db.CompactRange(nil, nil)
kmlog.Info("pruningMVCC", "height", curHeight, "cost", time.Since(start))
} }
func pruningFirst(db dbm.DB, curHeight int64, KVmvccCfg *KVMCCCConfig) { func pruningMVCCData(db dbm.DB, curHeight int64) {
it := db.Iterator(mvccData, nil, true) it := db.Iterator(mvccData, nil, true)
defer it.Close() defer it.Close()
newKey := []byte("--.xxx.--")
var mp map[string][]int64
count := 0
batch := db.NewBatch(true) batch := db.NewBatch(true)
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
//该处退出 //该处退出
return return
} }
if mp == nil {
mp = make(map[string][]int64, onceCount)
}
key, height, err := getKeyVersion(it.Key()) key, height, err := getKeyVersion(it.Key())
if err != nil { if err != nil {
continue continue
} }
if height >= curHeight {
if curHeight < height+levelPruningHeight && continue
curHeight >= height+int64(KVmvccCfg.PruneHeight) {
mp[string(key)] = append(mp[string(key)], height)
count++
} }
if len(mp) >= onceCount-1 || count > onceScanCount { if bytes.Compare(key, newKey) != 0 {
deleteOldKV(mp, curHeight, batch, KVmvccCfg) newKey = make([]byte, len(key))
mp = nil copy(newKey, key)
count = 0 continue
}
batch.Delete(it.Key())
if batch.ValueSize() > 1<<20 {
dbm.MustWrite(batch)
batch.Reset()
} }
} }
if len(mp) > 0 { dbm.MustWrite(batch)
deleteOldKV(mp, curHeight, batch, KVmvccCfg)
mp = nil
_ = mp
}
} }
func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch, KVmvccCfg *KVMCCCConfig) { func pruningMVCCMeta(db dbm.DB, curHeight int64) {
if len(mp) == 0 { pruningMVCCMetaVersion(db, curHeight)
return pruningMVCCMetaVersionKeyList(db, curHeight)
} }
batch.Reset()
for key, vals := range mp { func pruningMVCCMetaVersion(db dbm.DB, curHeight int64) {
if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除 it := db.Iterator(append(mvccMetaVersion, pad(0)...), append(mvccMetaVersion, pad(curHeight)...), false)
for _, val := range vals[1:] { //从第二个开始判断 defer it.Close()
if curHeight >= val+int64(KVmvccCfg.PruneHeight) { batch := db.NewBatch(true)
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key for it.Rewind(); it.Valid(); it.Next() {
if batch.ValueSize() > batchDataSize { if quit {
dbm.MustWrite(batch) //该处退出
batch.Reset() return
} }
} batch.Delete(it.Key())
} batch.Delete(append(mvccMeta, it.Value()...))
if batch.ValueSize() > 1<<20 {
dbm.MustWrite(batch)
batch.Reset()
} }
delete(mp, key)
} }
dbm.MustWrite(batch) dbm.MustWrite(batch)
} }
func genKeyVersion(key []byte, height int64) []byte { func pruningMVCCMetaVersionKeyList(db dbm.DB, curHeight int64) {
b := append([]byte{}, mvccData...) it := db.Iterator(append(mvccMetaVersionKeyList, pad(0)...), append(mvccMetaVersionKeyList, pad(curHeight)...), false)
newkey := append(b, key...) defer it.Close()
newkey = append(newkey, []byte(".")...) batch := db.NewBatch(true)
newkey = append(newkey, pad(height)...) for it.Rewind(); it.Valid(); it.Next() {
return newkey if quit {
//该处退出
return
}
batch.Delete(it.Key())
if batch.ValueSize() > 1<<20 {
dbm.MustWrite(batch)
batch.Reset()
}
}
dbm.MustWrite(batch)
} }
func getKeyVersion(vsnKey []byte) ([]byte, int64, error) { func getKeyVersion(vsnKey []byte) ([]byte, int64, error) {
if !bytes.Contains(vsnKey, mvccData) {
return nil, 0, types.ErrSize
}
if len(vsnKey) < len(mvccData)+1+20 { if len(vsnKey) < len(mvccData)+1+20 {
return nil, 0, types.ErrSize return nil, 0, types.ErrSize
} }
...@@ -562,7 +544,7 @@ func pad(version int64) []byte { ...@@ -562,7 +544,7 @@ func pad(version int64) []byte {
} }
func isPruning() bool { func isPruning() bool {
return atomic.LoadInt32(&pruningState) == 1 return atomic.LoadInt32(&pruningState) == pruningStateStart
} }
func setPruning(state int32) { func setPruning(state int32) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment