Commit d8b23ca4 authored by liuyuhang's avatar liuyuhang

modify mustwrite batch

parent 08d8d17c
...@@ -13,7 +13,6 @@ import ( ...@@ -13,7 +13,6 @@ import (
drivers "github.com/33cn/chain33/system/store" drivers "github.com/33cn/chain33/system/store"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"fmt"
) )
var klog = log.New("module", "kvmvccdb") var klog = log.New("module", "kvmvccdb")
...@@ -230,11 +229,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) { ...@@ -230,11 +229,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
storeBatch.Set(kvset[i].Key, kvset[i].Value) storeBatch.Set(kvset[i].Key, kvset[i].Value)
} }
} }
err := storeBatch.Write() dbm.MustWrite(storeBatch)
if err != nil {
klog.Error("store kvmvcc saveKVSets to db failed")
panic(fmt.Sprint("batch write err", err))
}
} }
func (mvccs *KVMVCCStore) checkVersion(height int64) ([]*types.KeyValue, error) { func (mvccs *KVMVCCStore) checkVersion(height int64) ([]*types.KeyValue, error) {
......
...@@ -367,7 +367,6 @@ func delMavlData(db dbm.DB) bool { ...@@ -367,7 +367,6 @@ func delMavlData(db dbm.DB) bool {
it := db.Iterator(nil, nil, true) it := db.Iterator(nil, nil, true)
defer it.Close() defer it.Close()
batch := db.NewBatch(true) batch := db.NewBatch(true)
var err error
for it.Rewind(); it.Valid(); it.Next() { for it.Rewind(); it.Valid(); it.Next() {
if quit { if quit {
return false return false
...@@ -375,20 +374,14 @@ func delMavlData(db dbm.DB) bool { ...@@ -375,20 +374,14 @@ func delMavlData(db dbm.DB) bool {
if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除 if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除
batch.Delete(it.Key()) batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
err = batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
} }
} }
} }
batch.Set(genDelMavlKey(mvccPrefix), []byte("")) batch.Set(genDelMavlKey(mvccPrefix), []byte(""))
err = batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
return true return true
} }
...@@ -440,7 +433,6 @@ func deletePrunedMavl(db dbm.DB) { ...@@ -440,7 +433,6 @@ func deletePrunedMavl(db dbm.DB) {
func deletePrunedMavlData(db dbm.DB, prefix string) { func deletePrunedMavlData(db dbm.DB, prefix string) {
it := db.Iterator([]byte(prefix), nil, true) it := db.Iterator([]byte(prefix), nil, true)
defer it.Close() defer it.Close()
var err error
if it.Rewind() && it.Valid() { if it.Rewind() && it.Valid() {
batch := db.NewBatch(false) batch := db.NewBatch(false)
for it.Next(); it.Valid(); it.Next() { //第一个不做删除 for it.Next(); it.Valid(); it.Next() { //第一个不做删除
...@@ -449,17 +441,11 @@ func deletePrunedMavlData(db dbm.DB, prefix string) { ...@@ -449,17 +441,11 @@ func deletePrunedMavlData(db dbm.DB, prefix string) {
} }
batch.Delete(it.Key()) batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
err = batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
} }
} }
err = batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
} }
} }
...@@ -184,10 +184,7 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) { ...@@ -184,10 +184,7 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
batch.Set(kvset[i].Key, kvset[i].Value) batch.Set(kvset[i].Key, kvset[i].Value)
} }
} }
err := batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
delete(mvccs.kvsetmap, string(req.Hash)) delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil return req.Hash, nil
} }
...@@ -266,11 +263,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) { ...@@ -266,11 +263,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
storeBatch.Set(kvset[i].Key, kvset[i].Value) storeBatch.Set(kvset[i].Key, kvset[i].Value)
} }
} }
err := storeBatch.Write() dbm.MustWrite(storeBatch)
if err != nil {
kmlog.Info("KVMVCCStore saveKVSets", "Write error", err)
panic(fmt.Sprint("batch write err", err))
}
} }
// GetMaxVersion 获取当前最大高度 // GetMaxVersion 获取当前最大高度
...@@ -394,17 +387,13 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) { ...@@ -394,17 +387,13 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
return return
} }
batch.Reset() batch.Reset()
var err error
for key, vals := range mp { for key, vals := range mp {
if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除 if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除
for _, val := range vals[1:] { //从第二个开始判断 for _, val := range vals[1:] { //从第二个开始判断
if curHeight >= val+int64(pruneHeight) { if curHeight >= val+int64(pruneHeight) {
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
err = batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
} }
} }
...@@ -412,10 +401,7 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) { ...@@ -412,10 +401,7 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
} }
delete(mp, key) delete(mp, key)
} }
err = batch.Write() dbm.MustWrite(batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
} }
func genKeyVersion(key []byte, height int64) []byte { func genKeyVersion(key []byte, height int64) []byte {
......
...@@ -96,6 +96,13 @@ type Batch interface { ...@@ -96,6 +96,13 @@ type Batch interface {
Reset() // Reset resets the batch for reuse Reset() // Reset resets the batch for reuse
} }
func MustWrite(batch Batch) {
err := batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
}
//IteratorSeeker ... //IteratorSeeker ...
type IteratorSeeker interface { type IteratorSeeker interface {
Rewind() bool Rewind() bool
......
...@@ -267,6 +267,10 @@ func (mBatch *goLevelDBBatch) Delete(key []byte) { ...@@ -267,6 +267,10 @@ func (mBatch *goLevelDBBatch) Delete(key []byte) {
} }
func (mBatch *goLevelDBBatch) Write() error { func (mBatch *goLevelDBBatch) Write() error {
if mBatch.batch.Len() == 0 {
llog.Info("Write", "len", len)
return nil
}
err := mBatch.db.db.Write(mBatch.batch, mBatch.wop) err := mBatch.db.db.Write(mBatch.batch, mBatch.wop)
if err != nil { if err != nil {
llog.Error("Write", "error", err) llog.Error("Write", "error", err)
......
...@@ -256,28 +256,22 @@ func pruningFirstLevelNode(db dbm.DB, curHeight int64) { ...@@ -256,28 +256,22 @@ func pruningFirstLevelNode(db dbm.DB, curHeight int64) {
} }
func addLeafCountKeyToSecondLevel(db dbm.DB, kvs []*types.KeyValue, batch dbm.Batch) { func addLeafCountKeyToSecondLevel(db dbm.DB, kvs []*types.KeyValue, batch dbm.Batch) {
var err error
batch.Reset() batch.Reset()
for _, kv := range kvs { for _, kv := range kvs {
batch.Delete(kv.Key) batch.Delete(kv.Key)
batch.Set(genOldLeafCountKeyFromKey(kv.Key), kv.Value) batch.Set(genOldLeafCountKeyFromKey(kv.Key), kv.Value)
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
} }
} }
if err = batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
} }
func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.Batch) { func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.Batch) {
if len(mp) == 0 { if len(mp) == 0 {
return return
} }
var err error
batch.Reset() batch.Reset()
for key, vals := range mp { for key, vals := range mp {
if len(vals) > 1 && vals[1].height != vals[0].height { //防止相同高度时候出现的误删除 if len(vals) > 1 && vals[1].height != vals[0].height { //防止相同高度时候出现的误删除
...@@ -297,9 +291,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm. ...@@ -297,9 +291,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.
batch.Delete(leafCountKey) // 叶子计数节点 batch.Delete(leafCountKey) // 叶子计数节点
batch.Delete(val.hash) // 叶子节点hash值 batch.Delete(val.hash) // 叶子节点hash值
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
} }
} }
...@@ -307,9 +299,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm. ...@@ -307,9 +299,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.
} }
delete(mp, key) delete(mp, key)
} }
if err = batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
} }
func pruningSecondLevel(db dbm.DB, curHeight int64) { func pruningSecondLevel(db dbm.DB, curHeight int64) {
...@@ -411,15 +401,11 @@ func deleteOldNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch d ...@@ -411,15 +401,11 @@ func deleteOldNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch d
} }
delete(mp, key) delete(mp, key)
if batch.ValueSize() > batchDataSize { if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
batch.Reset() batch.Reset()
} }
} }
if err = batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
} }
// PruningTreePrintDB pruning tree print db // PruningTreePrintDB pruning tree print db
......
...@@ -404,9 +404,7 @@ func (t *Tree) RemoveLeafCountKey(height int64) { ...@@ -404,9 +404,7 @@ func (t *Tree) RemoveLeafCountKey(height int64) {
treelog.Debug("RemoveLeafCountKey:", "height", height, "key:", string(k), "hash:", common.ToHex(hash)) treelog.Debug("RemoveLeafCountKey:", "height", height, "key:", string(k), "hash:", common.ToHex(hash))
} }
} }
if err := batch.Write(); err != nil { dbm.MustWrite(batch)
panic(fmt.Sprint("batch write err", err))
}
} }
// Iterate 依次迭代遍历树的所有键 // Iterate 依次迭代遍历树的所有键
...@@ -711,15 +709,11 @@ func (ndb *nodeDB) Commit() error { ...@@ -711,15 +709,11 @@ func (ndb *nodeDB) Commit() error {
defer ndb.mtx.Unlock() defer ndb.mtx.Unlock()
// Write saves // Write saves
err := ndb.batch.Write() dbm.MustWrite(ndb.batch)
if err != nil {
panic(fmt.Sprint("batch write err", err))
treelog.Error("Commit batch.Write err", "err", err)
}
ndb.batch = nil ndb.batch = nil
ndb.orphans = make(map[string]struct{}) ndb.orphans = make(map[string]struct{})
return err return nil
} }
// SetKVPair 设置kv对外接口 // SetKVPair 设置kv对外接口
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment