Commit a139816a authored by liuyuhang's avatar liuyuhang Committed by vipwzw

修改upgrade裁剪版mavl存在的bug,以及同步过程中删除裁剪版mavl减少磁盘占用

parent 35a8bab7
...@@ -117,9 +117,9 @@ type RoundState struct { ...@@ -117,9 +117,9 @@ type RoundState struct {
// RoundStateMessage ... // RoundStateMessage ...
func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg { func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg {
return &tmtypes.NewRoundStepMsg{ return &tmtypes.NewRoundStepMsg{
Height: rs.Height, Height: rs.Height,
Round: int32(rs.Round), Round: int32(rs.Round),
Step: int32(rs.Step), Step: int32(rs.Step),
SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()), SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()),
LastCommitRound: int32(rs.LastCommit.Round()), LastCommitRound: int32(rs.LastCommit.Round()),
} }
......
...@@ -73,9 +73,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate { ...@@ -73,9 +73,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate {
UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage, UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage,
BasicConstraintsValid: x509Cert.BasicConstraintsValid, BasicConstraintsValid: x509Cert.BasicConstraintsValid,
IsCA: x509Cert.IsCA, IsCA: x509Cert.IsCA,
MaxPathLen: x509Cert.MaxPathLen, MaxPathLen: x509Cert.MaxPathLen,
MaxPathLenZero: x509Cert.MaxPathLenZero, MaxPathLenZero: x509Cert.MaxPathLenZero,
SubjectKeyId: x509Cert.SubjectKeyId, SubjectKeyId: x509Cert.SubjectKeyId,
AuthorityKeyId: x509Cert.AuthorityKeyId, AuthorityKeyId: x509Cert.AuthorityKeyId,
...@@ -136,9 +136,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate { ...@@ -136,9 +136,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate {
UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage, UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage,
BasicConstraintsValid: sm2Cert.BasicConstraintsValid, BasicConstraintsValid: sm2Cert.BasicConstraintsValid,
IsCA: sm2Cert.IsCA, IsCA: sm2Cert.IsCA,
MaxPathLen: sm2Cert.MaxPathLen, MaxPathLen: sm2Cert.MaxPathLen,
MaxPathLenZero: sm2Cert.MaxPathLenZero, MaxPathLenZero: sm2Cert.MaxPathLenZero,
SubjectKeyId: sm2Cert.SubjectKeyId, SubjectKeyId: sm2Cert.SubjectKeyId,
AuthorityKeyId: sm2Cert.AuthorityKeyId, AuthorityKeyId: sm2Cert.AuthorityKeyId,
......
...@@ -27,14 +27,15 @@ import ( ...@@ -27,14 +27,15 @@ import (
var ( var (
kmlog = log.New("module", "kvmvccMavl") kmlog = log.New("module", "kvmvccMavl")
// ErrStateHashLost ... // ErrStateHashLost ...
ErrStateHashLost = errors.New("ErrStateHashLost") ErrStateHashLost = errors.New("ErrStateHashLost")
kvmvccMavlFork int64 = 200 * 10000 kvmvccMavlFork int64 = 200 * 10000
isDelMavlData = false isDelMavlData = false
delMavlDataHeight = kvmvccMavlFork + 10000 delMavlDataHeight = kvmvccMavlFork + 10000
delMavlDataState int32 delMavlDataState int32
wg sync.WaitGroup wg sync.WaitGroup
quit bool quit bool
isUpgradeCommitMavl bool isPrunedMavl bool // 是否是被裁剪过的 mavl
delPrunedMavlState int32 = delPrunedMavlStart // Upgrade时候删除pruned mavl的状态
) )
const ( const (
...@@ -42,6 +43,10 @@ const ( ...@@ -42,6 +43,10 @@ const (
batchDataSize = 1024 * 1024 * 1 batchDataSize = 1024 * 1024 * 1
delMavlStateStart = 1 delMavlStateStart = 1
delMavlStateEnd = 0 delMavlStateEnd = 0
delPrunedMavlStart = 0
delPrunedMavlStarting = 1
delPruneMavlEnd = 2
) )
// SetLogLevel set log level // SetLogLevel set log level
...@@ -128,8 +133,8 @@ func New(cfg *types.Store, sub []byte) queue.Module { ...@@ -128,8 +133,8 @@ func New(cfg *types.Store, sub []byte) queue.Module {
if err == nil { if err == nil {
isDelMavlData = true isDelMavlData = true
} }
// 查询是Upgrade是否需保存mavl // 查询是Upgrade,需要存储mavl到db
isUpgradeCommitMavl = isCommitMavlDB(bs.GetDB()) isPrunedMavl = isPrunedMavlDB(bs.GetDB())
bs.SetChild(kvms) bs.SetChild(kvms)
return kvms return kvms
...@@ -269,7 +274,7 @@ func (kvmMavls *KVmMavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([ ...@@ -269,7 +274,7 @@ func (kvmMavls *KVmMavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([
var hash []byte var hash []byte
var err error var err error
if isUpgradeCommitMavl { if isPrunedMavl {
hash, err = kvmMavls.MavlStore.MemSet(datas, sync) hash, err = kvmMavls.MavlStore.MemSet(datas, sync)
if err != nil { if err != nil {
return hash, err return hash, err
...@@ -301,8 +306,12 @@ func (kvmMavls *KVmMavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([ ...@@ -301,8 +306,12 @@ func (kvmMavls *KVmMavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([
func (kvmMavls *KVmMavlStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) { func (kvmMavls *KVmMavlStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
var hash []byte var hash []byte
var err error var err error
if isUpgradeCommitMavl { if isPrunedMavl {
hash, err = kvmMavls.Commit(req) hash, err = kvmMavls.Commit(req)
if isNeedDelPrunedMavl() {
wg.Add(1)
go deletePrunedMavl(kvmMavls.GetDB())
}
} else { } else {
hash, err = kvmMavls.KVMVCCStore.CommitUpgrade(req) hash, err = kvmMavls.KVMVCCStore.CommitUpgrade(req)
} }
...@@ -381,7 +390,18 @@ func setDelMavl(state int32) { ...@@ -381,7 +390,18 @@ func setDelMavl(state int32) {
atomic.StoreInt32(&delMavlDataState, state) atomic.StoreInt32(&delMavlDataState, state)
} }
func isCommitMavlDB(db dbm.DB) bool { func isNeedDelPrunedMavl() bool {
if atomic.LoadInt32(&delPrunedMavlState) == 0 {
return true
}
return false
}
func setDelPrunedMavl(state int32) {
atomic.StoreInt32(&delPrunedMavlState, state)
}
func isPrunedMavlDB(db dbm.DB) bool {
prefix := []byte(leafNodePrefix) prefix := []byte(leafNodePrefix)
it := db.Iterator(prefix, nil, true) it := db.Iterator(prefix, nil, true)
defer it.Close() defer it.Close()
...@@ -393,3 +413,34 @@ func isCommitMavlDB(db dbm.DB) bool { ...@@ -393,3 +413,34 @@ func isCommitMavlDB(db dbm.DB) bool {
} }
return isCommit return isCommit
} }
func deletePrunedMavl(db dbm.DB) {
defer wg.Done()
setDelPrunedMavl(delPrunedMavlStarting)
defer setDelPrunedMavl(delPruneMavlEnd)
deletePrunedMavlData(db, hashNodePrefix)
deletePrunedMavlData(db, leafNodePrefix)
deletePrunedMavlData(db, leafKeyCountPrefix)
deletePrunedMavlData(db, oldLeafKeyCountPrefix)
}
func deletePrunedMavlData(db dbm.DB, prefix string) {
it := db.Iterator([]byte(prefix), nil, true)
defer it.Close()
if it.Rewind() && it.Valid() {
batch := db.NewBatch(false)
for it.Next(); it.Valid(); it.Next() { //第一个不做删除
if quit {
return
}
batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize {
batch.Write()
batch.Reset()
time.Sleep(time.Millisecond * 100)
}
}
batch.Write()
}
}
...@@ -720,15 +720,46 @@ func TestIsCommitMavl(t *testing.T) { ...@@ -720,15 +720,46 @@ func TestIsCommitMavl(t *testing.T) {
store := New(storeCfg, nil).(*KVmMavlStore) store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store) assert.NotNil(t, store)
isComm := isCommitMavlDB(store.GetDB()) isComm := isPrunedMavlDB(store.GetDB())
require.Equal(t, false, isComm) require.Equal(t, false, isComm)
store.GetDB().Set([]byte(fmt.Sprintln(leafNodePrefix, "123")), []byte("v1")) store.GetDB().Set([]byte(fmt.Sprintln(leafNodePrefix, "123")), []byte("v1"))
store.GetDB().Set([]byte(fmt.Sprintln(leafNodePrefix, "456")), []byte("v2")) store.GetDB().Set([]byte(fmt.Sprintln(leafNodePrefix, "456")), []byte("v2"))
isComm = isCommitMavlDB(store.GetDB()) isComm = isPrunedMavlDB(store.GetDB())
require.Equal(t, true, isComm) require.Equal(t, true, isComm)
} }
func TestDeletePrunedMavl(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg := newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
deletePrunedMavlData(store.GetDB(), hashNodePrefix)
store.GetDB().Set([]byte(fmt.Sprintln(hashNodePrefix, "123")), []byte("v1"))
//测试只有一条数据时候, 则不做删除
deletePrunedMavlData(store.GetDB(), hashNodePrefix)
v1, err := store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "123")))
require.NoError(t, err)
require.Equal(t, v1, []byte("v1"))
//测试再加入一条数据,即两条时候
store.GetDB().Set([]byte(fmt.Sprintln(hashNodePrefix, "456")), []byte("v2"))
deletePrunedMavlData(store.GetDB(), hashNodePrefix)
v1, err = store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "123")))
require.Error(t, err)
require.Equal(t, v1, []uint8([]byte(nil)))
v2, err := store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "456")))
require.NoError(t, err)
require.Equal(t, v2, []byte("v2"))
}
func BenchmarkGetkmvccMavl(b *testing.B) { benchmarkGet(b, false) } func BenchmarkGetkmvccMavl(b *testing.B) { benchmarkGet(b, false) }
func BenchmarkGetkmvcc(b *testing.B) { benchmarkGet(b, true) } func BenchmarkGetkmvcc(b *testing.B) { benchmarkGet(b, true) }
......
...@@ -16,7 +16,10 @@ import ( ...@@ -16,7 +16,10 @@ import (
const ( const (
// 同store/mavl中定义保持一致,即裁剪的加前缀 // 同store/mavl中定义保持一致,即裁剪的加前缀
leafNodePrefix = "_mb_" hashNodePrefix = "_mh_"
leafNodePrefix = "_mb_"
leafKeyCountPrefix = "..mk.."
oldLeafKeyCountPrefix = "..mok.."
) )
// MavlStore mavl store struct // MavlStore mavl store struct
......
...@@ -205,7 +205,7 @@ func (t *Tree) Save() []byte { ...@@ -205,7 +205,7 @@ func (t *Tree) Save() []byte {
return nil return nil
} }
if t.ndb != nil { if t.ndb != nil {
if t.isRemoveLeafCountKey() { if enablePrune && t.isRemoveLeafCountKey() {
//DelLeafCountKV 需要先提前将leafcoutkey删除,这里需先于t.ndb.Commit() //DelLeafCountKV 需要先提前将leafcoutkey删除,这里需先于t.ndb.Commit()
err := DelLeafCountKV(t.ndb.db, t.blockHeight) err := DelLeafCountKV(t.ndb.db, t.blockHeight)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment