Commit 25c26857 authored by liuyuhang's avatar liuyuhang Committed by vipwzw

modify kvmvcc sync

parent 1d9e819e
...@@ -117,9 +117,9 @@ type RoundState struct { ...@@ -117,9 +117,9 @@ type RoundState struct {
// RoundStateMessage ... // RoundStateMessage ...
func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg { func (rs *RoundState) RoundStateMessage() *tmtypes.NewRoundStepMsg {
return &tmtypes.NewRoundStepMsg{ return &tmtypes.NewRoundStepMsg{
Height: rs.Height, Height: rs.Height,
Round: int32(rs.Round), Round: int32(rs.Round),
Step: int32(rs.Step), Step: int32(rs.Step),
SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()), SecondsSinceStartTime: int32(time.Since(rs.StartTime).Seconds()),
LastCommitRound: int32(rs.LastCommit.Round()), LastCommitRound: int32(rs.LastCommit.Round()),
} }
......
...@@ -73,9 +73,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate { ...@@ -73,9 +73,9 @@ func ParseX509CertificateToSm2(x509Cert *x509.Certificate) *sm2.Certificate {
UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage, UnknownExtKeyUsage: x509Cert.UnknownExtKeyUsage,
BasicConstraintsValid: x509Cert.BasicConstraintsValid, BasicConstraintsValid: x509Cert.BasicConstraintsValid,
IsCA: x509Cert.IsCA, IsCA: x509Cert.IsCA,
MaxPathLen: x509Cert.MaxPathLen, MaxPathLen: x509Cert.MaxPathLen,
MaxPathLenZero: x509Cert.MaxPathLenZero, MaxPathLenZero: x509Cert.MaxPathLenZero,
SubjectKeyId: x509Cert.SubjectKeyId, SubjectKeyId: x509Cert.SubjectKeyId,
AuthorityKeyId: x509Cert.AuthorityKeyId, AuthorityKeyId: x509Cert.AuthorityKeyId,
...@@ -136,9 +136,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate { ...@@ -136,9 +136,9 @@ func ParseSm2CertificateToX509(sm2Cert *sm2.Certificate) *x509.Certificate {
UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage, UnknownExtKeyUsage: sm2Cert.UnknownExtKeyUsage,
BasicConstraintsValid: sm2Cert.BasicConstraintsValid, BasicConstraintsValid: sm2Cert.BasicConstraintsValid,
IsCA: sm2Cert.IsCA, IsCA: sm2Cert.IsCA,
MaxPathLen: sm2Cert.MaxPathLen, MaxPathLen: sm2Cert.MaxPathLen,
MaxPathLenZero: sm2Cert.MaxPathLenZero, MaxPathLenZero: sm2Cert.MaxPathLenZero,
SubjectKeyId: sm2Cert.SubjectKeyId, SubjectKeyId: sm2Cert.SubjectKeyId,
AuthorityKeyId: sm2Cert.AuthorityKeyId, AuthorityKeyId: sm2Cert.AuthorityKeyId,
......
...@@ -38,6 +38,7 @@ type KVMVCCStore struct { ...@@ -38,6 +38,7 @@ type KVMVCCStore struct {
mvcc dbm.MVCC mvcc dbm.MVCC
kvsetmap map[string][]*types.KeyValue kvsetmap map[string][]*types.KeyValue
enableMVCCIter bool enableMVCCIter bool
sync bool
} }
type subConfig struct { type subConfig struct {
...@@ -55,9 +56,9 @@ func New(cfg *types.Store, sub []byte) queue.Module { ...@@ -55,9 +56,9 @@ func New(cfg *types.Store, sub []byte) queue.Module {
enable = subcfg.EnableMVCCIter enable = subcfg.EnableMVCCIter
} }
if enable { if enable {
kvs = &KVMVCCStore{bs, dbm.NewMVCCIter(bs.GetDB()), make(map[string][]*types.KeyValue), true} kvs = &KVMVCCStore{bs, dbm.NewMVCCIter(bs.GetDB()), make(map[string][]*types.KeyValue), true, false}
} else { } else {
kvs = &KVMVCCStore{bs, dbm.NewMVCC(bs.GetDB()), make(map[string][]*types.KeyValue), false} kvs = &KVMVCCStore{bs, dbm.NewMVCC(bs.GetDB()), make(map[string][]*types.KeyValue), false, false}
} }
bs.SetChild(kvs) bs.SetChild(kvs)
return kvs return kvs
...@@ -76,7 +77,7 @@ func (mvccs *KVMVCCStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) ...@@ -76,7 +77,7 @@ func (mvccs *KVMVCCStore) Set(datas *types.StoreSet, sync bool) ([]byte, error)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mvccs.saveKVSets(kvlist) mvccs.saveKVSets(kvlist, sync)
return hash, nil return hash, nil
} }
...@@ -101,6 +102,10 @@ func (mvccs *KVMVCCStore) Get(datas *types.StoreGet) [][]byte { ...@@ -101,6 +102,10 @@ func (mvccs *KVMVCCStore) Get(datas *types.StoreGet) [][]byte {
// MemSet set kvs to the mem of KVMVCCStore module and return the StateHash // MemSet set kvs to the mem of KVMVCCStore module and return the StateHash
func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) { func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
klog.Info("kvmvcc MemSet", "cost", types.Since(beg))
}()
kvset, err := mvccs.checkVersion(datas.Height) kvset, err := mvccs.checkVersion(datas.Height)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -114,19 +119,24 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, erro ...@@ -114,19 +119,24 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, erro
if len(kvlist) > 0 { if len(kvlist) > 0 {
kvset = append(kvset, kvlist...) kvset = append(kvset, kvlist...)
} }
mvccs.sync = sync
mvccs.kvsetmap[string(hash)] = kvset mvccs.kvsetmap[string(hash)] = kvset
return hash, nil return hash, nil
} }
// Commit kvs in the mem of KVMVCCStore module to state db and return the StateHash // Commit kvs in the mem of KVMVCCStore module to state db and return the StateHash
func (mvccs *KVMVCCStore) Commit(req *types.ReqHash) ([]byte, error) { func (mvccs *KVMVCCStore) Commit(req *types.ReqHash) ([]byte, error) {
beg := types.Now()
defer func() {
klog.Info("kvmvcc Commit", "cost", types.Since(beg))
}()
_, ok := mvccs.kvsetmap[string(req.Hash)] _, ok := mvccs.kvsetmap[string(req.Hash)]
if !ok { if !ok {
klog.Error("store kvmvcc commit", "err", types.ErrHashNotFound) klog.Error("store kvmvcc commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound return nil, types.ErrHashNotFound
} }
//klog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash)) //klog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash))
mvccs.saveKVSets(mvccs.kvsetmap[string(req.Hash)]) mvccs.saveKVSets(mvccs.kvsetmap[string(req.Hash)], mvccs.sync)
delete(mvccs.kvsetmap, string(req.Hash)) delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
...@@ -202,16 +212,16 @@ func (mvccs *KVMVCCStore) Del(req *types.StoreDel) ([]byte, error) { ...@@ -202,16 +212,16 @@ func (mvccs *KVMVCCStore) Del(req *types.StoreDel) ([]byte, error) {
} }
klog.Info("KVMVCCStore Del", "hash", common.ToHex(req.StateHash), "height", req.Height) klog.Info("KVMVCCStore Del", "hash", common.ToHex(req.StateHash), "height", req.Height)
mvccs.saveKVSets(kvset) mvccs.saveKVSets(kvset, mvccs.sync)
return req.StateHash, nil return req.StateHash, nil
} }
func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue) { func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
if len(kvset) == 0 { if len(kvset) == 0 {
return return
} }
storeBatch := mvccs.GetDB().NewBatch(true) storeBatch := mvccs.GetDB().NewBatch(sync)
for i := 0; i < len(kvset); i++ { for i := 0; i < len(kvset); i++ {
if kvset[i].Value == nil { if kvset[i].Value == nil {
......
...@@ -53,6 +53,7 @@ type KVMVCCStore struct { ...@@ -53,6 +53,7 @@ type KVMVCCStore struct {
enableMVCCIter bool enableMVCCIter bool
enableMavlPrune bool enableMavlPrune bool
pruneHeight int32 pruneHeight int32
sync bool
} }
// NewKVMVCC construct KVMVCCStore module // NewKVMVCC construct KVMVCCStore module
...@@ -64,10 +65,10 @@ func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore { ...@@ -64,10 +65,10 @@ func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore {
} }
if enable { if enable {
kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue), kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue),
true, sub.EnableMavlPrune, sub.PruneHeight} true, sub.EnableMavlPrune, sub.PruneHeight, false}
} else { } else {
kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue), kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue),
false, sub.EnableMavlPrune, sub.PruneHeight} false, sub.EnableMavlPrune, sub.PruneHeight, false}
} }
EnablePrune(sub.EnableMavlPrune) EnablePrune(sub.EnableMavlPrune)
SetPruneHeight(int(sub.PruneHeight)) SetPruneHeight(int(sub.PruneHeight))
...@@ -88,7 +89,7 @@ func (mvccs *KVMVCCStore) Set(datas *types.StoreSet, hash []byte, sync bool) ([] ...@@ -88,7 +89,7 @@ func (mvccs *KVMVCCStore) Set(datas *types.StoreSet, hash []byte, sync bool) ([]
if err != nil { if err != nil {
return nil, err return nil, err
} }
mvccs.saveKVSets(kvlist) mvccs.saveKVSets(kvlist, sync)
return hash, nil return hash, nil
} }
...@@ -113,6 +114,10 @@ func (mvccs *KVMVCCStore) Get(datas *types.StoreGet) [][]byte { ...@@ -113,6 +114,10 @@ func (mvccs *KVMVCCStore) Get(datas *types.StoreGet) [][]byte {
// MemSet set kvs to the mem of KVMVCCStore module and return the StateHash // MemSet set kvs to the mem of KVMVCCStore module and return the StateHash
func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ([]byte, error) { func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
kmlog.Info("kvmvcc MemSet", "cost", types.Since(beg))
}()
kvset, err := mvccs.checkVersion(datas.Height) kvset, err := mvccs.checkVersion(datas.Height)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -129,6 +134,7 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ...@@ -129,6 +134,7 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool)
kvset = append(kvset, kvlist...) kvset = append(kvset, kvlist...)
} }
mvccs.kvsetmap[string(hash)] = kvset mvccs.kvsetmap[string(hash)] = kvset
mvccs.sync = sync
// 进行裁剪 // 进行裁剪
if enablePrune && !isPruning() && if enablePrune && !isPruning() &&
pruneHeight != 0 && pruneHeight != 0 &&
...@@ -142,13 +148,17 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ...@@ -142,13 +148,17 @@ func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool)
// Commit kvs in the mem of KVMVCCStore module to state db and return the StateHash // Commit kvs in the mem of KVMVCCStore module to state db and return the StateHash
func (mvccs *KVMVCCStore) Commit(req *types.ReqHash) ([]byte, error) { func (mvccs *KVMVCCStore) Commit(req *types.ReqHash) ([]byte, error) {
beg := types.Now()
defer func() {
kmlog.Info("kvmvcc Commit", "cost", types.Since(beg))
}()
_, ok := mvccs.kvsetmap[string(req.Hash)] _, ok := mvccs.kvsetmap[string(req.Hash)]
if !ok { if !ok {
kmlog.Error("store kvmvcc commit", "err", types.ErrHashNotFound) kmlog.Error("store kvmvcc commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound return nil, types.ErrHashNotFound
} }
//kmlog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash)) //kmlog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash))
mvccs.saveKVSets(mvccs.kvsetmap[string(req.Hash)]) mvccs.saveKVSets(mvccs.kvsetmap[string(req.Hash)], mvccs.sync)
delete(mvccs.kvsetmap, string(req.Hash)) delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
...@@ -162,7 +172,7 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) { ...@@ -162,7 +172,7 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
} }
//kmlog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash)) //kmlog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash))
if batch == nil { if batch == nil {
batch = mvccs.db.NewBatch(true) batch = mvccs.db.NewBatch(mvccs.sync)
} }
batch.Reset() batch.Reset()
kvset := mvccs.kvsetmap[string(req.Hash)] kvset := mvccs.kvsetmap[string(req.Hash)]
...@@ -174,7 +184,6 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) { ...@@ -174,7 +184,6 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
} }
} }
batch.Write() batch.Write()
mvccs.saveKVSets(mvccs.kvsetmap[string(req.Hash)])
delete(mvccs.kvsetmap, string(req.Hash)) delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
...@@ -235,16 +244,16 @@ func (mvccs *KVMVCCStore) Del(req *types.StoreDel) ([]byte, error) { ...@@ -235,16 +244,16 @@ func (mvccs *KVMVCCStore) Del(req *types.StoreDel) ([]byte, error) {
} }
kmlog.Info("KVMVCCStore Del", "hash", common.ToHex(req.StateHash), "height", req.Height) kmlog.Info("KVMVCCStore Del", "hash", common.ToHex(req.StateHash), "height", req.Height)
mvccs.saveKVSets(kvset) mvccs.saveKVSets(kvset, mvccs.sync)
return req.StateHash, nil return req.StateHash, nil
} }
func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue) { func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
if len(kvset) == 0 { if len(kvset) == 0 {
return return
} }
storeBatch := mvccs.db.NewBatch(true) storeBatch := mvccs.db.NewBatch(sync)
for i := 0; i < len(kvset); i++ { for i := 0; i < len(kvset); i++ {
if kvset[i].Value == nil { if kvset[i].Value == nil {
......
...@@ -86,7 +86,7 @@ func (mavls *MavlStore) Get(datas *types.StoreGet) [][]byte { ...@@ -86,7 +86,7 @@ func (mavls *MavlStore) Get(datas *types.StoreGet) [][]byte {
func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) { func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now() beg := types.Now()
defer func() { defer func() {
kmlog.Info("MemSet", "cost", types.Since(beg)) kmlog.Info("mavl MemSet", "cost", types.Since(beg))
}() }()
if len(datas.KV) == 0 { if len(datas.KV) == 0 {
kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null") kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
...@@ -111,7 +111,7 @@ func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) ...@@ -111,7 +111,7 @@ func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error)
func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) { func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now() beg := types.Now()
defer func() { defer func() {
kmlog.Info("MemSet", "cost", types.Since(beg)) kmlog.Info("mavl MemSet", "cost", types.Since(beg))
}() }()
if len(datas.KV) == 0 { if len(datas.KV) == 0 {
kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null") kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
...@@ -134,7 +134,7 @@ func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, ...@@ -134,7 +134,7 @@ func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte,
func (mavls *MavlStore) Commit(req *types.ReqHash) ([]byte, error) { func (mavls *MavlStore) Commit(req *types.ReqHash) ([]byte, error) {
beg := types.Now() beg := types.Now()
defer func() { defer func() {
kmlog.Info("Commit", "cost", types.Since(beg)) kmlog.Info("mavl Commit", "cost", types.Since(beg))
}() }()
tree, ok := mavls.trees.Load(string(req.Hash)) tree, ok := mavls.trees.Load(string(req.Hash))
if !ok { if !ok {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment