Commit 10044b04 authored by heyubin's avatar heyubin

Merge branch 'master' into issues-enable-levelfee

parents 2e8f5850 5c62f0d9
......@@ -229,10 +229,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
storeBatch.Set(kvset[i].Key, kvset[i].Value)
}
}
err := storeBatch.Write()
if err != nil {
klog.Error("store kvmvcc saveKVSets to db failed")
}
dbm.MustWrite(storeBatch)
}
func (mvccs *KVMVCCStore) checkVersion(height int64) ([]*types.KeyValue, error) {
......
......@@ -356,33 +356,49 @@ func DelMavl(db dbm.DB) {
defer wg.Done()
setDelMavl(delMavlStateStart)
defer setDelMavl(delMavlStateEnd)
isDel := delMavlData(db)
if isDel {
isDelMavlData = true
kmlog.Info("DelMavl success")
prefix := ""
for {
kmlog.Debug("start once del mavl")
var loop bool
loop, prefix = delMavlData(db, prefix)
if !loop {
break
}
kmlog.Debug("end once del mavl")
time.Sleep(time.Second * 1)
}
}
func delMavlData(db dbm.DB) bool {
it := db.Iterator(nil, nil, true)
func delMavlData(db dbm.DB, prefix string) (bool, string) {
it := db.Iterator([]byte(prefix), types.EmptyValue, false)
defer it.Close()
batch := db.NewBatch(true)
batch := db.NewBatch(false)
count := 0
const onceCount = 50
for it.Rewind(); it.Valid(); it.Next() {
if quit {
return false
return false, ""
}
if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除
batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize {
batch.Write()
dbm.MustWrite(batch)
batch.Reset()
time.Sleep(time.Millisecond * 100)
count++
}
}
if count > onceCount {
if it.Next() {
return true, string(it.Key())
}
return true, ""
}
}
batch.Set(genDelMavlKey(mvccPrefix), []byte(""))
batch.Write()
return true
dbm.MustWrite(batch)
isDelMavlData = true
kmlog.Info("DelMavl success")
return false, ""
}
func genDelMavlKey(prefix []byte) []byte {
......@@ -423,29 +439,43 @@ func deletePrunedMavl(db dbm.DB) {
defer wg.Done()
setDelPrunedMavl(delPrunedMavlStarting)
defer setDelPrunedMavl(delPruneMavlEnd)
deletePrunedMavlData(db, hashNodePrefix)
deletePrunedMavlData(db, leafNodePrefix)
deletePrunedMavlData(db, leafKeyCountPrefix)
deletePrunedMavlData(db, oldLeafKeyCountPrefix)
prefixS := []string{hashNodePrefix, leafNodePrefix, leafKeyCountPrefix, oldLeafKeyCountPrefix}
for _, str := range prefixS {
for {
stat := deletePrunedMavlData(db, str)
if stat == 0 {
return
} else if stat == 1 {
break
} else {
time.Sleep(time.Millisecond * 100)
}
}
}
}
func deletePrunedMavlData(db dbm.DB, prefix string) {
it := db.Iterator([]byte(prefix), nil, true)
func deletePrunedMavlData(db dbm.DB, prefix string) (status int) {
it := db.Iterator([]byte(prefix), nil, false)
defer it.Close()
count := 0
const onceCount = 200
if it.Rewind() && it.Valid() {
batch := db.NewBatch(false)
for it.Next(); it.Valid(); it.Next() { //第一个不做删除
if quit {
return
return 0 // quit
}
batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize {
batch.Write()
dbm.MustWrite(batch)
batch.Reset()
time.Sleep(time.Millisecond * 100)
count++
}
if count > onceCount {
return 2 //loop
}
}
batch.Write()
dbm.MustWrite(batch)
}
return 1 // this prefix Iterator over
}
......@@ -573,7 +573,14 @@ func TestDelMavlData(t *testing.T) {
db.Set([]byte("key22"), []byte("value22"))
quit = false
delMavlData(db)
prefix := ""
for {
var loop bool
loop, prefix = delMavlData(db, prefix)
if !loop {
break
}
}
v, err := db.Get([]byte(mvccPrefix))
require.NoError(t, err)
......@@ -748,15 +755,15 @@ func TestDeletePrunedMavl(t *testing.T) {
require.Equal(t, v1, []byte("v1"))
//测试再加入一条数据,即两条时候
store.GetDB().Set([]byte(fmt.Sprintln(hashNodePrefix, "456")), []byte("v2"))
store.GetDB().Set([]byte(fmt.Sprintln(hashNodePrefix, "123")), []byte("v1"))
deletePrunedMavlData(store.GetDB(), hashNodePrefix)
v1, err = store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "123")))
v1, err = store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "456")))
require.Error(t, err)
require.Equal(t, v1, []uint8([]byte(nil)))
v2, err := store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "456")))
v2, err := store.GetDB().Get([]byte(fmt.Sprintln(hashNodePrefix, "123")))
require.NoError(t, err)
require.Equal(t, v2, []byte("v2"))
require.Equal(t, v2, []byte("v1"))
wg.Add(1)
go deletePrunedMavl(store.GetDB())
......
......@@ -184,7 +184,7 @@ func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
batch.Set(kvset[i].Key, kvset[i].Value)
}
}
batch.Write()
dbm.MustWrite(batch)
delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil
}
......@@ -263,10 +263,7 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue, sync bool) {
storeBatch.Set(kvset[i].Key, kvset[i].Value)
}
}
err := storeBatch.Write()
if err != nil {
kmlog.Info("KVMVCCStore saveKVSets", "Write error", err)
}
dbm.MustWrite(storeBatch)
}
// GetMaxVersion 获取当前最大高度
......@@ -396,7 +393,7 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
if curHeight >= val+int64(pruneHeight) {
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key
if batch.ValueSize() > batchDataSize {
batch.Write()
dbm.MustWrite(batch)
batch.Reset()
}
}
......@@ -404,7 +401,7 @@ func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
}
delete(mp, key)
}
batch.Write()
dbm.MustWrite(batch)
}
func genKeyVersion(key []byte, height int64) []byte {
......
......@@ -11,6 +11,7 @@ import (
"github.com/33cn/chain33/common/version"
"github.com/33cn/chain33/types"
dbm "github.com/33cn/chain33/common/db"
)
// Upgrade 升级localDB和storeDB
......@@ -82,10 +83,7 @@ func (chain *BlockChain) ReExecBlock(startHeight, curHeight int64) {
if err != nil {
panic(fmt.Sprintf("execBlockEx connectBlock readd Txs fail height=%d err=%s, this not allow fail", i, err.Error()))
}
err = newbatch.Write()
if err != nil {
panic(err)
}
dbm.MustWrite(newbatch)
}
prevStateHash = block.StateHash
......
......@@ -96,6 +96,14 @@ type Batch interface {
Reset() // Reset resets the batch for reuse
}
// MustWrite must write correct
func MustWrite(batch Batch) {
err := batch.Write()
if err != nil {
panic(fmt.Sprint("batch write err", err))
}
}
//IteratorSeeker ...
type IteratorSeeker interface {
Rewind() bool
......
......@@ -256,28 +256,22 @@ func pruningFirstLevelNode(db dbm.DB, curHeight int64) {
}
func addLeafCountKeyToSecondLevel(db dbm.DB, kvs []*types.KeyValue, batch dbm.Batch) {
var err error
batch.Reset()
for _, kv := range kvs {
batch.Delete(kv.Key)
batch.Set(genOldLeafCountKeyFromKey(kv.Key), kv.Value)
if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
batch.Reset()
}
}
if err = batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
}
func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.Batch) {
if len(mp) == 0 {
return
}
var err error
batch.Reset()
for key, vals := range mp {
if len(vals) > 1 && vals[1].height != vals[0].height { //防止相同高度时候出现的误删除
......@@ -297,9 +291,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.
batch.Delete(leafCountKey) // 叶子计数节点
batch.Delete(val.hash) // 叶子节点hash值
if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
batch.Reset()
}
}
......@@ -307,9 +299,7 @@ func deleteNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch dbm.
}
delete(mp, key)
}
if err = batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
}
func pruningSecondLevel(db dbm.DB, curHeight int64) {
......@@ -376,7 +366,6 @@ func deleteOldNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch d
if len(mp) == 0 {
return
}
var err error
batch.Reset()
for key, vals := range mp {
if len(vals) > 1 {
......@@ -411,15 +400,11 @@ func deleteOldNode(db dbm.DB, mp map[string][]hashData, curHeight int64, batch d
}
delete(mp, key)
if batch.ValueSize() > batchDataSize {
if err = batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
batch.Reset()
}
}
if err = batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
}
// PruningTreePrintDB pruning tree print db
......
......@@ -404,9 +404,7 @@ func (t *Tree) RemoveLeafCountKey(height int64) {
treelog.Debug("RemoveLeafCountKey:", "height", height, "key:", string(k), "hash:", common.ToHex(hash))
}
}
if err := batch.Write(); err != nil {
return
}
dbm.MustWrite(batch)
}
// Iterate 依次迭代遍历树的所有键
......@@ -711,14 +709,11 @@ func (ndb *nodeDB) Commit() error {
defer ndb.mtx.Unlock()
// Write saves
err := ndb.batch.Write()
if err != nil {
treelog.Error("Commit batch.Write err", "err", err)
}
dbm.MustWrite(ndb.batch)
ndb.batch = nil
ndb.orphans = make(map[string]struct{})
return err
return nil
}
// SetKVPair 设置kv对外接口
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment