Commit 08d8d17c authored by liuyuhang's avatar liuyuhang

modify write batch panic and slow del mavl key 500ms

parent 25b3c68b
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
drivers "github.com/33cn/chain33/system/store" drivers "github.com/33cn/chain33/system/store"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"fmt"
) )
var klog = log.New("module", "kvmvccdb") var klog = log.New("module", "kvmvccdb")
...@@ -232,6 +233,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) { ...@@ -232,6 +233,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
err := storeBatch.Write() err := storeBatch.Write()
if err != nil { if err != nil {
klog.Error("store kvmvcc saveKVSets to db failed") klog.Error("store kvmvcc saveKVSets to db failed")
panic(fmt.Sprint("batch write err", err))
} }
} }
......
...@@ -367,6 +367,7 @@ func delMavlData(db dbm.DB) bool { ...@@ -367,6 +367,7 @@ func delMavlData(db dbm.DB) bool {
it := db.Iterator(nil, nil, true) it := db.Iterator(nil, nil, true)
defer it.Close() defer it.Close()
batch := db.NewBatch(true) batch := db.NewBatch(true)
var err error
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
return false return false
...@@ -374,14 +375,20 @@ func delMavlData(db dbm.DB) bool { ...@@ -374,14 +375,20 @@ func delMavlData(db dbm.DB) bool {
if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除 if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除
batch.Delete(it.Key()) batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
batch.Write() err = batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 500)
} }
} }
} }
batch.Set(genDelMavlKey(mvccPrefix), []byte("")) batch.Set(genDelMavlKey(mvccPrefix), []byte(""))
batch.Write() err = batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
return true return true
} }
...@@ -433,6 +440,7 @@ func deletePrunedMavl(db dbm.DB) { ...@@ -433,6 +440,7 @@ func deletePrunedMavl(db dbm.DB) {
func deletePrunedMavlData(db dbm.DB, prefix string) { func deletePrunedMavlData(db dbm.DB, prefix string) {
it := db.Iterator([]byte(prefix), nil, true) it := db.Iterator([]byte(prefix), nil, true)
defer it.Close() defer it.Close()
var err error
if it.Rewind() && it.Valid() { if it.Rewind() && it.Valid() {
batch := db.NewBatch(false) batch := db.NewBatch(false)
for it.Next(); it.Valid(); it.Next() { //第一个不做删除 for it.Next(); it.Valid(); it.Next() { //第一个不做删除
...@@ -441,11 +449,17 @@ func deletePrunedMavlData(db dbm.DB, prefix string) { ...@@ -441,11 +449,17 @@ func deletePrunedMavlData(db dbm.DB, prefix string) {
} }
batch.Delete(it.Key()) batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
batch.Write() err = batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 500)
} }
} }
batch.Write() err = batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
} }
} }
...@@ -184,7 +184,10 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) { ...@@ -184,7 +184,10 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
batch.Set(kvset[i].Key, kvset[i].Value) batch.Set(kvset[i].Key, kvset[i].Value)
} }
} }
batch.Write() err := batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
delete(mvccs.kvsetmap, string(req.Hash)) delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
...@@ -266,6 +269,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) { ...@@ -266,6 +269,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
err := storeBatch.Write() err := storeBatch.Write()
if err != nil { if err != nil {
kmlog.Info("KVMVCCStore saveKVSets", "Write error", err) kmlog.Info("KVMVCCStore saveKVSets", "Write error", err)
panic(fmt.Sprint("batch write err", err))
} }
} }
...@@ -390,13 +394,17 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) { ...@@ -390,13 +394,17 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
return return
} }
batch.Reset() batch.Reset()
var err error
for key, vals := range mp { for key, vals := range mp {
if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除 if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除
for _, val := range vals[1:] { //从第二个开始判断 for _, val := range vals[1:] { //从第二个开始判断
if curHeight >= val+int64(pruneHeight) { if curHeight >= val+int64(pruneHeight) {
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
batch.Write() err = batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
} }
} }
...@@ -404,7 +412,10 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) { ...@@ -404,7 +412,10 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
} }
delete(mp, key) delete(mp, key)
} }
batch.Write() err = batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
} }
func genKeyVersion(key []byte, height int64) []byte { func genKeyVersion(key []byte, height int64) []byte {
......
...@@ -263,13 +263,13 @@ func addLeafCountKeyToSecondLevel(db dbm.DB, kvs []*types.KeyValue, batch dbm.Ba ...@@ -263,13 +263,13 @@ func addLeafCountKeyToSecondLevel(db dbm.DB, kvs []*types.KeyValue, batch dbm.Ba
batch.Set(genOldLeafCountKeyFromKey(kv.Key), kv.Value) batch.Set(genOldLeafCountKeyFromKey(kv.Key), kv.Value)
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil { if err = batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
batch.Reset() batch.Reset()
} }
} }
if err = batch.Write(); err != nil { if err = batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
} }
...@@ -298,7 +298,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm. ...@@ -298,7 +298,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.
batch.Delete(val.hash) // 叶子节点hash值 batch.Delete(val.hash) // 叶子节点hash值
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil { if err = batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
batch.Reset() batch.Reset()
} }
...@@ -308,7 +308,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm. ...@@ -308,7 +308,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.
delete(mp, key) delete(mp, key)
} }
if err = batch.Write(); err != nil { if err = batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
} }
...@@ -412,13 +412,13 @@ func deleteOldNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch d ...@@ -412,13 +412,13 @@ func deleteOldNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch d
delete(mp, key) delete(mp, key)
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil { if err = batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
batch.Reset() batch.Reset()
} }
} }
if err = batch.Write(); err != nil { if err = batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
} }
......
...@@ -405,7 +405,7 @@ func (t *Tree) RemoveLeafCountKey(height int64) { ...@@ -405,7 +405,7 @@ func (t *Tree) RemoveLeafCountKey(height int64) {
} }
} }
if err := batch.Write(); err != nil { if err := batch.Write(); err != nil {
return panic(fmt.Sprint("batch write err", err))
} }
} }
...@@ -713,6 +713,7 @@ func (ndb *nodeDB) Commit() error { ...@@ -713,6 +713,7 @@ func (ndb *nodeDB) Commit() error {
// Write saves // Write saves
err := ndb.batch.Write() err := ndb.batch.Write()
if err != nil { if err != nil {
panic(fmt.Sprint("batch write err", err))
treelog.Error("Commit batch.Write err", "err", err) treelog.Error("Commit batch.Write err", "err", err)
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment