Commit dd152b7e authored by yxq's avatar yxq Committed by vipwzw

update: reduce statedb

parent 4dbe1874
...@@ -175,6 +175,7 @@ func New(cfg *types.Store, sub []byte, chain33cfg *types.Chain33Config) queue.Mo ...@@ -175,6 +175,7 @@ func New(cfg *types.Store, sub []byte, chain33cfg *types.Chain33Config) queue.Mo
func (kvmMavls *KVmMavlStore) Close() { func (kvmMavls *KVmMavlStore) Close() {
quit = true quit = true
wg.Wait() wg.Wait()
kmlog.Info("store wait group done")
kvmMavls.KVMVCCStore.Close() kvmMavls.KVMVCCStore.Close()
kvmMavls.MavlStore.Close() kvmMavls.MavlStore.Close()
kvmMavls.BaseStore.Close() kvmMavls.BaseStore.Close()
......
...@@ -6,15 +6,14 @@ package kvmvccmavl ...@@ -6,15 +6,14 @@ package kvmvccmavl
import ( import (
"bytes" "bytes"
"fmt"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db" dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
) )
...@@ -22,9 +21,6 @@ import ( ...@@ -22,9 +21,6 @@ import (
const ( const (
pruningStateStart = 1 pruningStateStart = 1
pruningStateEnd = 0 pruningStateEnd = 0
onceScanCount = 10000 // 单次扫描数目
onceCount = 1000 // 容器长度
levelPruningHeight = 100 * 10000
defaultPruneHeight = 10000 // 每个10000裁剪一次 defaultPruneHeight = 10000 // 每个10000裁剪一次
) )
...@@ -136,7 +132,7 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ...@@ -136,7 +132,7 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool)
if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune && if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune &&
!isPruning() && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 { !isPruning() && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 {
wg.Add(1) wg.Add(1)
go pruningMVCC(mvccs.db, datas.Height-mvccs.kvmvccCfg.ReservedHeight) go mvccs.pruningMVCC(datas.Height)
} }
return hash, nil return hash, nil
} }
...@@ -407,7 +403,7 @@ func (mvccs *KVMVCCStore) MemSetRdm(datas *types.StoreSet, mavlHash []byte, sync ...@@ -407,7 +403,7 @@ func (mvccs *KVMVCCStore) MemSetRdm(datas *types.StoreSet, mavlHash []byte, sync
if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune && if mvccs.kvmvccCfg != nil && mvccs.kvmvccCfg.EnableMVCCPrune &&
!isPruning() && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 { !isPruning() && datas.Height%int64(mvccs.kvmvccCfg.PruneHeight) == 0 {
wg.Add(1) wg.Add(1)
go pruningMVCC(mvccs.db, datas.Height-mvccs.kvmvccCfg.ReservedHeight) go mvccs.pruningMVCC(datas.Height)
} }
return hash, nil return hash, nil
} }
...@@ -437,25 +433,29 @@ func calcRdmKey(hash []byte, height int64) []byte { ...@@ -437,25 +433,29 @@ func calcRdmKey(hash []byte, height int64) []byte {
} }
/*裁剪-------------------------------------------*/ /*裁剪-------------------------------------------*/
func pruningMVCC(db dbm.DB, curHeight int64) { func (mvccs *KVMVCCStore) pruningMVCC(curHeight int64) {
defer wg.Done() defer wg.Done()
if curHeight <= 0 { safeHeight := curHeight - mvccs.kvmvccCfg.ReservedHeight
if safeHeight <= 0 {
return return
} }
setPruning(pruningStateStart) setPruning(pruningStateStart)
defer setPruning(pruningStateEnd) defer setPruning(pruningStateEnd)
start := time.Now() start := time.Now()
pruningMVCCData(db, curHeight) pruningMVCCDappExpired(mvccs.db, safeHeight)
pruningMVCCMeta(db, curHeight) kmlog.Info("pruningMVCCDappExpired", "current height", curHeight, "cost", time.Since(start))
_ = db.CompactRange(nil, nil) pruningMVCCData(mvccs.db, safeHeight)
kmlog.Info("pruningMVCC", "height", curHeight, "cost", time.Since(start)) kmlog.Info("pruningMVCCData", "current height", curHeight, "cost", time.Since(start))
pruningMVCCMeta(mvccs.db, safeHeight)
kmlog.Info("pruningMVCCMeta", "current height", curHeight, "cost", time.Since(start))
} }
func pruningMVCCData(db dbm.DB, curHeight int64) { func pruningMVCCData(db dbm.DB, safeHeight int64) {
it := db.Iterator(mvccData, nil, true) it := db.Iterator(mvccData, nil, true)
defer it.Close() defer it.Close()
newKey := []byte("--.xxx.--") newKey := []byte("--.xxx.--")
batch := db.NewBatch(true) batch := db.NewBatch(false)
defer dbm.MustWrite(batch)
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
//该处退出 //该处退出
...@@ -465,7 +465,7 @@ func pruningMVCCData(db dbm.DB, curHeight int64) { ...@@ -465,7 +465,7 @@ func pruningMVCCData(db dbm.DB, curHeight int64) {
if err != nil { if err != nil {
continue continue
} }
if height >= curHeight { if height >= safeHeight {
continue continue
} }
if bytes.Compare(key, newKey) != 0 { if bytes.Compare(key, newKey) != 0 {
...@@ -479,18 +479,74 @@ func pruningMVCCData(db dbm.DB, curHeight int64) { ...@@ -479,18 +479,74 @@ func pruningMVCCData(db dbm.DB, curHeight int64) {
batch.Reset() batch.Reset()
} }
} }
}
// TODO:
// 合约里自定义规则用于检查哪些kv对是已经废弃的
// 对于定义过规则的合约,这里会遍历该合约所有的kv对,然后合约内部检查该kv对是否已经废弃
// 更高效的做法是仅遍历指定合约里可能废弃的那些key(通过进一步指定prefix实现),但通用性会变差
// 现阶段效率差不多,暂时不做进一步优化
func pruningMVCCDappExpired(db dbm.DB, safeHeight int64) {
names := dapp.KVExpiredCheckerList()
for _, name := range names {
pruneDapp(db, name, safeHeight)
}
}
func pruneDapp(db dbm.DB, name string, safeHeight int64) {
checkFunc, ok := dapp.LoadKVExpiredChecker(name)
if !ok {
return
}
var prefix []byte
prefix = append(prefix, mvccData...)
prefix = append(prefix, "mavl-" + name...)
it := db.Iterator(prefix, nil, true)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
if quit {
//该处退出
return
}
key, height, err := getKeyVersion(it.Key())
if err != nil {
continue
}
if height > safeHeight {
continue
}
if checkFunc(key, it.Value()) {
deleteKeyAllVersion(db, key)
}
}
}
func deleteKeyAllVersion(db dbm.DB, key []byte) {
start := append(mvccData, key...)
it := db.Iterator(start, nil, false)
defer it.Close()
batch := db.NewBatch(false)
for it.Rewind(); it.Valid(); it.Next() {
batch.Delete(it.Key())
if batch.ValueSize() > 1<<20 {
dbm.MustWrite(batch)
batch.Reset()
}
}
dbm.MustWrite(batch) dbm.MustWrite(batch)
} }
func pruningMVCCMeta(db dbm.DB, curHeight int64) { func pruningMVCCMeta(db dbm.DB, height int64) {
pruningMVCCMetaVersion(db, curHeight) pruningMVCCMetaVersion(db, height)
pruningMVCCMetaVersionKeyList(db, curHeight) pruningMVCCMetaVersionKeyList(db, height)
} }
func pruningMVCCMetaVersion(db dbm.DB, curHeight int64) { func pruningMVCCMetaVersion(db dbm.DB, height int64) {
it := db.Iterator(append(mvccMetaVersion, pad(0)...), append(mvccMetaVersion, pad(curHeight)...), false) startPrefix := append(mvccMetaVersion, pad(0)...)
endPrefix := append(mvccMetaVersion, pad(height)...)
it := db.Iterator(startPrefix, endPrefix, false)
defer it.Close() defer it.Close()
batch := db.NewBatch(true) batch := db.NewBatch(false)
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
//该处退出 //该处退出
...@@ -504,12 +560,15 @@ func pruningMVCCMetaVersion(db dbm.DB, curHeight int64) { ...@@ -504,12 +560,15 @@ func pruningMVCCMetaVersion(db dbm.DB, curHeight int64) {
} }
} }
dbm.MustWrite(batch) dbm.MustWrite(batch)
_ = db.CompactRange(startPrefix, endPrefix)
} }
func pruningMVCCMetaVersionKeyList(db dbm.DB, curHeight int64) { func pruningMVCCMetaVersionKeyList(db dbm.DB, height int64) {
it := db.Iterator(append(mvccMetaVersionKeyList, pad(0)...), append(mvccMetaVersionKeyList, pad(curHeight)...), false) startPrefix := append(mvccMetaVersionKeyList, pad(0)...)
endPrefix := append(mvccMetaVersionKeyList, pad(height)...)
it := db.Iterator(startPrefix, endPrefix, false)
defer it.Close() defer it.Close()
batch := db.NewBatch(true) batch := db.NewBatch(false)
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
//该处退出 //该处退出
...@@ -522,6 +581,7 @@ func pruningMVCCMetaVersionKeyList(db dbm.DB, curHeight int64) { ...@@ -522,6 +581,7 @@ func pruningMVCCMetaVersionKeyList(db dbm.DB, curHeight int64) {
} }
} }
dbm.MustWrite(batch) dbm.MustWrite(batch)
_ = db.CompactRange(startPrefix, endPrefix)
} }
func getKeyVersion(vsnKey []byte) ([]byte, int64, error) { func getKeyVersion(vsnKey []byte) ([]byte, int64, error) {
...@@ -539,8 +599,11 @@ func getKeyVersion(vsnKey []byte) ([]byte, int64, error) { ...@@ -539,8 +599,11 @@ func getKeyVersion(vsnKey []byte) ([]byte, int64, error) {
} }
func pad(version int64) []byte { func pad(version int64) []byte {
s := fmt.Sprintf("%020d", version) //equals to `[]byte(fmt.Sprintf("%020d", version))`
return []byte(s) sInt := strconv.FormatInt(version, 10)
result := []byte("00000000000000000000")
copy(result[20-len(sInt):], sInt)
return result
} }
func isPruning() bool { func isPruning() bool {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment