Commit 20472856 authored by liuyuhang's avatar liuyuhang Committed by vipwzw

将本地更新的chain33部分合并到plugin里面

parent 54ea2ad8
......@@ -146,6 +146,18 @@ enableMemTree=true
# 是否使能mavl叶子节点数据载入内存
enableMemVal=true
[store.sub.kvmvccmavl]
enableMavlPrefix=false
enableMVCC=false
enableMavlPrune=false
pruneHeight=10000
# 是否使能mavl数据载入内存
enableMemTree=true
# 是否使能mavl叶子节点数据载入内存
enableMemVal=true
# 是否使能升级kvmvcc(在mavl数据上)
enableUpdateKvmvcc=false
[wallet]
minFee=100000
driver="leveldb"
......
......@@ -121,6 +121,18 @@ func (kvs *KVStore) Commit(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// MemSetUpgrade set kvs to the mem of KVStore
func (kvs *KVStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
//not support
return nil, nil
}
// CommitUpgrade kvs in the mem of KVStore
func (kvs *KVStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
//not support
return nil, nil
}
// Rollback kvs in the mem of KVStore
func (kvs *KVStore) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := kvs.cache[string(req.Hash)]
......@@ -141,6 +153,9 @@ func (kvs *KVStore) IterateRangeByStateHash(statehash []byte, start []byte, end
// ProcEvent handles supported events
func (kvs *KVStore) ProcEvent(msg *queue.Message) {
if msg == nil {
return
}
msg.ReplyErr("KVStore", types.ErrActionNotSupport)
}
......
......@@ -131,6 +131,18 @@ func (mvccs *KVMVCCStore) Commit(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// MemSetUpgrade set kvs to the mem of KVMVCCStore module and return the StateHash
func (mvccs *KVMVCCStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
//not support
return nil, nil
}
// CommitUpgrade kvs in the mem of KVMVCCStore module to state db and return the StateHash
func (mvccs *KVMVCCStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
//not support
return nil, nil
}
// Rollback kvs in the mem of KVMVCCStore module and return the StateHash
func (mvccs *KVMVCCStore) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := mvccs.kvsetmap[string(req.Hash)]
......@@ -175,6 +187,9 @@ func (mvccs *KVMVCCStore) IterateRangeByStateHash(statehash []byte, start []byte
// ProcEvent handles supported events
func (mvccs *KVMVCCStore) ProcEvent(msg *queue.Message) {
if msg == nil {
return
}
msg.ReplyErr("KVStore", types.ErrActionNotSupport)
}
......
......@@ -33,6 +33,11 @@ var (
delMavlDataState int32
wg sync.WaitGroup
quit bool
// 用来阻塞查看当前是否需要升级数据库
done chan struct{}
// 使能mavl在当前区块基础上升级kvmvcc
enableUpdateKvmvcc bool
)
const (
......@@ -54,6 +59,7 @@ func DisableLog() {
func init() {
drivers.Reg("kvmvccmavl", New)
done = make(chan struct{}, 1)
}
// KVmMavlStore provide kvmvcc and mavl store interface implementation
......@@ -75,6 +81,10 @@ type subMavlConfig struct {
EnableMVCC bool `json:"enableMVCC"`
EnableMavlPrune bool `json:"enableMavlPrune"`
PruneHeight int32 `json:"pruneHeight"`
// 是否使能内存树
EnableMemTree bool `json:"enableMemTree"`
// 是否使能内存树中叶子节点
EnableMemVal bool `json:"enableMemVal"`
}
type subConfig struct {
......@@ -83,11 +93,15 @@ type subConfig struct {
EnableMVCC bool `json:"enableMVCC"`
EnableMavlPrune bool `json:"enableMavlPrune"`
PruneHeight int32 `json:"pruneHeight"`
// 是否使能内存树
EnableMemTree bool `json:"enableMemTree"`
// 是否使能内存树中叶子节点
EnableMemVal bool `json:"enableMemVal"`
EnableUpdateKvmvcc bool `json:"enableUpdateKvmvcc"`
}
// New construct KVMVCCStore module
func New(cfg *types.Store, sub []byte) queue.Module {
bs := drivers.NewBaseStore(cfg)
var kvms *KVmMavlStore
var subcfg subConfig
var subKVMVCCcfg subKVMVCCConfig
......@@ -102,7 +116,15 @@ func New(cfg *types.Store, sub []byte) queue.Module {
subMavlcfg.EnableMVCC = subcfg.EnableMVCC
subMavlcfg.EnableMavlPrune = subcfg.EnableMavlPrune
subMavlcfg.PruneHeight = subcfg.PruneHeight
subMavlcfg.EnableMemTree = subcfg.EnableMemTree
subMavlcfg.EnableMemVal = subcfg.EnableMemVal
}
if subcfg.EnableUpdateKvmvcc {
enableUpdateKvmvcc = true
}
bs := drivers.NewBaseStore(cfg)
cache, err := lru.New(cacheSize)
if err != nil {
panic("new KVmMavlStore fail")
......@@ -239,7 +261,70 @@ func (kvmMavls *KVmMavlStore) IterateRangeByStateHash(statehash []byte, start []
// ProcEvent handles supported events
func (kvmMavls *KVmMavlStore) ProcEvent(msg *queue.Message) {
msg.ReplyErr("KVmMavlStore", types.ErrActionNotSupport)
//msg.ReplyErr("KVmMavlStore", types.ErrActionNotSupport)
client := kvmMavls.GetQueueClient()
if msg != nil && msg.Ty == types.EventReExecBlock {
reData := msg.GetData().(*types.ReplyString)
if reData.Data == "over" {
kmlog.Info("ProcEvent update store over")
msg.ReplyErr("KVmMavlStore", nil)
done <- struct{}{}
return
}
} else if msg == nil {
if !enableUpdateKvmvcc {
return
}
height, err := kvmMavls.KVMVCCStore.GetMaxVersion()
if err != nil {
height = 0
} else {
height++
}
msg1 := client.NewMessage("blockchain", types.EventReExecBlock, &types.ReqInt{Height: height})
err = client.Send(msg1, true)
if err != nil {
return
}
resp, err := client.Wait(msg1)
if err != nil {
return
}
data := resp.GetData().(*types.ReplyString)
if data.Data == "need" {
//进程阻塞
<-done
}
}
}
// MemSetUpgrade set kvs to the mem of KVmMavlStore module not cache the tree and return the StateHash
func (kvmMavls *KVmMavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
if datas.Height < kvmvccMavlFork {
hash, err := kvmMavls.MavlStore.MemSetUpgrade(datas, sync)
if err != nil {
return hash, err
}
_, err = kvmMavls.KVMVCCStore.MemSet(datas, hash, sync)
if err != nil {
return hash, err
}
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
return hash, err
}
// 仅仅做kvmvcc
hash, err := kvmMavls.KVMVCCStore.MemSet(datas, nil, sync)
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
return hash, err
}
// CommitUpgrade kvs in the mem of KVmMavlStore module to state db and return the StateHash
func (kvmMavls *KVmMavlStore) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
return kvmMavls.KVMVCCStore.Commit(req)
}
// Del set kvs to nil with StateHash
......
......@@ -229,6 +229,11 @@ func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue) {
storeBatch.Write()
}
// GetMaxVersion 获取当前最大高度
func (mvccs *KVMVCCStore) GetMaxVersion() (int64, error) {
return mvccs.mvcc.GetMaxVersion()
}
func (mvccs *KVMVCCStore) checkVersion(height int64) ([]*types.KeyValue, error) {
//检查新加入区块的height和现有的version的关系,来判断是否要回滚数据
maxVersion, err := mvccs.mvcc.GetMaxVersion()
......
......@@ -32,12 +32,16 @@ func NewMavl(sub *subMavlConfig, db dbm.DB) *MavlStore {
subcfg.EnableMVCC = sub.EnableMVCC
subcfg.EnableMavlPrune = sub.EnableMavlPrune
subcfg.PruneHeight = sub.PruneHeight
subcfg.EnableMemTree = sub.EnableMemTree
subcfg.EnableMemVal = sub.EnableMemVal
}
mavls := &MavlStore{db, &sync.Map{}, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight}
mavl.EnableMavlPrefix(subcfg.EnableMavlPrefix)
mavl.EnableMVCC(subcfg.EnableMVCC)
mavl.EnablePrune(subcfg.EnableMavlPrune)
mavl.SetPruneHeight(int(subcfg.PruneHeight))
mavl.EnableMemTree(subcfg.EnableMemTree)
mavl.EnableMemVal(subcfg.EnableMemVal)
return mavls
}
......@@ -103,6 +107,29 @@ func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error)
return hash, nil
}
// MemSetUpgrade 计算hash之后不在内存中存储树
func (mavls *MavlStore) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
kmlog.Info("MemSet", "cost", types.Since(beg))
}()
if len(datas.KV) == 0 {
kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
return datas.StateHash, nil
}
tree := mavl.NewTree(mavls.db, sync)
tree.SetBlockHeight(datas.Height)
err := tree.Load(datas.StateHash)
if err != nil {
return nil, err
}
for i := 0; i < len(datas.KV); i++ {
tree.Set(datas.KV[i].Key, datas.KV[i].Value)
}
hash := tree.Hash()
return hash, nil
}
// Commit convert memcory mavl to storage db
func (mavls *MavlStore) Commit(req *types.ReqHash) ([]byte, error) {
beg := types.Now()
......
......@@ -135,6 +135,18 @@ func (mpts *Store) Commit(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// MemSetUpgrade set keys values to memcory mpt, return root hash and error
func (mpts *Store) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
//not support
return nil, nil
}
// CommitUpgrade convert memcory mpt to storage db
func (mpts *Store) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
//not support
return nil, nil
}
// Rollback 回退将缓存的mpt树删除掉
func (mpts *Store) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := mpts.trees[string(req.Hash)]
......@@ -159,5 +171,8 @@ func (mpts *Store) IterateRangeByStateHash(statehash []byte, start []byte, end [
// ProcEvent not support message
func (mpts *Store) ProcEvent(msg *queue.Message) {
if msg == nil {
return
}
msg.ReplyErr("Store", types.ErrActionNotSupport)
}
......@@ -100,7 +100,6 @@ type BlockChain struct {
isParaChain bool //是否是平行链。平行链需要记录Sequence信息
isStrongConsistency bool
//lock
cachelock sync.Mutex
synBlocklock sync.Mutex
peerMaxBlklock sync.Mutex
castlock sync.Mutex
......
......@@ -127,6 +127,8 @@ func TestBlockChain(t *testing.T) {
testCheckBlock(t, blockchain)
testWriteBlockToDbTemp(t, blockchain)
testReadBlockToExec(t, blockchain)
testReExecBlock(t, blockchain)
testReExecBlockMsg(t, mock33, blockchain)
}
func testProcAddBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, blockchain *blockchain.BlockChain) {
......@@ -1109,3 +1111,21 @@ func testWriteBlockToDbTemp(t *testing.T, chain *blockchain.BlockChain) {
}
chainlog.Info("WriteBlockToDbTemp end ---------------------")
}
func testReExecBlock(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("ReExecBlock begin ---------------------")
curheight := chain.GetBlockHeight()
chain.ProcessReExecBlock(0, curheight)
chainlog.Info("ReExecBlock end ---------------------")
}
func testReExecBlockMsg(t *testing.T, mock33 *testnode.Chain33Mock, chain *blockchain.BlockChain) {
var err error
client := mock33.GetClient()
msg1 := client.NewMessage("blockchain", types.EventReExecBlock, &types.ReqInt{Height: 8})
err = client.Send(msg1, true)
require.NoError(t, err)
_, err = client.Wait(msg1)
require.NoError(t, err)
time.Sleep(time.Millisecond * 20)
}
......@@ -14,3 +14,8 @@ import (
func execBlock(client queue.Client, prevStateRoot []byte, block *types.Block, errReturn bool, sync bool) (*types.BlockDetail, []*types.Transaction, error) {
return util.ExecBlock(client, prevStateRoot, block, errReturn, sync, true)
}
//从本地执行区块
func execBlockUpgrade(client queue.Client, prevStateRoot []byte, block *types.Block, sync bool) error {
return util.ExecBlockUpgrade(client, prevStateRoot, block, sync)
}
......@@ -90,6 +90,8 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventGetSeqCBLastNum:
go chain.processMsg(msg, reqnum, chain.getSeqCBLastNum)
case types.EventReExecBlock:
go chain.processMsg(msg, reqnum, chain.reExecBlock)
default:
go chain.processMsg(msg, reqnum, chain.unknowMsg)
}
......@@ -133,6 +135,17 @@ func (chain *BlockChain) getSeqCBLastNum(msg *queue.Message) {
msg.Reply(chain.client.NewMessage("rpc", types.EventGetSeqCBLastNum, lastNum))
}
func (chain *BlockChain) reExecBlock(msg *queue.Message) {
data := (msg.Data).(*types.ReqInt)
curHeight := chain.GetBlockHeight()
if curHeight < data.Height {
msg.Reply(chain.client.NewMessage("store", types.EventReExecBlock, &types.ReplyString{Data: "none"}))
return
}
msg.Reply(chain.client.NewMessage("store", types.EventReExecBlock, &types.ReplyString{Data: "need"}))
chain.ProcessReExecBlock(data.Height, curHeight)
}
func (chain *BlockChain) queryTx(msg *queue.Message) {
txhash := (msg.Data).(*types.ReqHash)
TransactionDetail, err := chain.ProcQueryTxMsg(txhash.Hash)
......@@ -162,16 +175,16 @@ func (chain *BlockChain) addBlock(msg *queue.Message) {
blockpid := msg.Data.(*types.BlockPid)
//chainlog.Error("addBlock", "height", blockpid.Block.Height, "pid", blockpid.Pid)
if chain.GetDownloadSyncStatus() {
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockpid.Block.GetHeight())
}
err := chain.WriteBlockToDbTemp(blockpid.Block)
if err != nil {
chainlog.Error("WriteBlockToDbTemp", "height", blockpid.Block.Height, "err", err.Error())
reply.IsOk = false
reply.Msg = []byte(err.Error())
}
//downLoadTask 运行时设置对应的blockdone
if chain.downLoadTask.InProgress() {
chain.downLoadTask.Done(blockpid.Block.GetHeight())
}
} else {
_, err := chain.ProcAddBlockMsg(false, &types.BlockDetail{Block: blockpid.Block}, blockpid.Pid)
if err != nil {
......
......@@ -10,6 +10,8 @@ import (
"math/big"
"sync/atomic"
"fmt"
"github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/difficulty"
......@@ -599,6 +601,40 @@ func (b *BlockChain) ProcessDelParaChainBlock(broadcast bool, blockdetail *types
return nil, true, false, nil
}
// ProcessReExecBlock 从对应高度本地重新执行区块
func (b *BlockChain) ProcessReExecBlock(startHeight, curHeight int64) {
var prevStateHash []byte
if startHeight > 0 {
blockdetail, err := b.GetBlock(startHeight - 1)
if err != nil {
panic(fmt.Sprintf("get height=%d err, this not allow fail", startHeight-1))
}
prevStateHash = blockdetail.Block.StateHash
}
for i := startHeight; i <= curHeight; i++ {
blockdetail, err := b.GetBlock(i)
if err != nil {
panic(fmt.Sprintf("get height=%d err, this not allow fail", i))
}
block := blockdetail.Block
err = execBlockUpgrade(b.client, prevStateHash, block, false)
if err != nil {
panic(fmt.Sprintf("execBlockEx height=%d err=%s, this not allow fail", i, err.Error()))
}
prevStateHash = block.StateHash
}
// 通知执行结束
msg := b.client.NewMessage("store", types.EventReExecBlock, &types.ReplyString{Data: "over"})
err := b.client.Send(msg, true)
if err != nil {
return
}
_, err = b.client.Wait(msg)
if err != nil {
return
}
}
// IsRecordFaultErr 检测此错误是否要记录到故障错误中
func IsRecordFaultErr(err error) bool {
return err != types.ErrFutureBlock && !api.IsGrpcError(err) && !api.IsQueueError(err)
......
......@@ -49,6 +49,10 @@ type SubStore interface {
Del(req *types.StoreDel) ([]byte, error)
IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool)
ProcEvent(msg *queue.Message)
//用升级本地交易构建store
MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error)
CommitUpgrade(hash *types.ReqHash) ([]byte, error)
}
// BaseStore 基础的store结构体
......@@ -82,6 +86,9 @@ func (store *BaseStore) SetQueueClient(c queue.Client) {
}
store.done <- struct{}{}
}()
if store.child != nil {
store.child.ProcEvent(nil)
}
}
//Wait wait for basestore ready
......@@ -108,7 +115,13 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
} else if msg.Ty == types.EventStoreMemSet { //只是在内存中set 一下,并不改变状态
go func() {
datas := msg.GetData().(*types.StoreSetWithSync)
hash, err := store.child.MemSet(datas.Storeset, datas.Sync)
var hash []byte
var err error
if datas.Upgrade {
hash, err = store.child.MemSetUpgrade(datas.Storeset, datas.Sync)
} else {
hash, err = store.child.MemSet(datas.Storeset, datas.Sync)
}
if err != nil {
msg.Reply(client.NewMessage("", types.EventStoreSetReply, err))
return
......@@ -118,7 +131,13 @@ func (store *BaseStore) processMessage(msg *queue.Message) {
} else if msg.Ty == types.EventStoreCommit { //把内存中set 的交易 commit
go func() {
req := msg.GetData().(*types.ReqHash)
hash, err := store.child.Commit(req)
var hash []byte
var err error
if req.Upgrade {
hash, err = store.child.CommitUpgrade(req)
} else {
hash, err = store.child.Commit(req)
}
if hash == nil {
msg.Reply(client.NewMessage("", types.EventStoreCommit, types.ErrHashNotFound))
if err == types.ErrDataBaseDamage { //如果是数据库写失败,需要上报给用户
......
......@@ -36,6 +36,14 @@ func (s *storeChild) Commit(hash *types.ReqHash) ([]byte, error) {
return []byte{}, nil
}
func (s *storeChild) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
return []byte{}, nil
}
func (s *storeChild) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
return []byte{}, nil
}
func (s *storeChild) Rollback(req *types.ReqHash) ([]byte, error) {
return []byte{}, nil
}
......
......@@ -218,6 +218,9 @@ func (node *Node) Hash(t *Tree) []byte {
}
}
if enableMemTree {
updateLocalMemTree(t, node)
}
return node.hash
}
......
......@@ -168,6 +168,16 @@ func (t *Tree) Hash() []byte {
return nil
}
hash := t.root.Hash(t)
// 更新memTree
if enableMemTree && memTree != nil {
for k := range t.obsoleteNode {
memTree.Delete(k)
}
for k, v := range t.updateNode {
memTree.Add(k, v)
}
treelog.Debug("Tree.Hash", "memTree len", memTree.Len(), "tree height", t.blockHeight)
}
return hash
}
......@@ -193,16 +203,6 @@ func (t *Tree) Save() []byte {
treelog.Error("Tree.Save", "saveRootHash err", err)
}
}
// 更新memTree
if enableMemTree && memTree != nil {
for k := range t.obsoleteNode {
memTree.Delete(k)
}
for k, v := range t.updateNode {
memTree.Add(k, v)
}
treelog.Debug("Tree.Save", "memTree len", memTree.Len(), "tree height", t.blockHeight)
}
beg := types.Now()
err := t.ndb.Commit()
......@@ -530,11 +530,6 @@ func (ndb *nodeDB) SaveNode(t *Tree, node *Node) {
node.persisted = true
ndb.cacheNode(node)
delete(ndb.orphans, string(node.hash))
//treelog.Debug("SaveNode", "hash", node.hash, "height", node.height, "value", node.value)
// Save node hashInt64 to localmem
if enableMemTree {
updateLocalMemTree(t, node)
}
}
func getNode4MemTree(hash []byte) (*Node, error) {
......
......@@ -172,6 +172,35 @@ func (mavls *Store) Commit(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// MemSetUpgrade cacl mavl, but not store tree, return root hash and error
func (mavls *Store) MemSetUpgrade(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
mlog.Info("MemSet", "cost", types.Since(beg))
}()
if len(datas.KV) == 0 {
mlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
mavls.trees.Store(string(datas.StateHash), nil)
return datas.StateHash, nil
}
tree := mavl.NewTree(mavls.GetDB(), sync)
tree.SetBlockHeight(datas.Height)
err := tree.Load(datas.StateHash)
if err != nil {
return nil, err
}
for i := 0; i < len(datas.KV); i++ {
tree.Set(datas.KV[i].Key, datas.KV[i].Value)
}
hash := tree.Hash()
return hash, nil
}
// CommitUpgrade convert memcory mavl to storage db
func (mavls *Store) CommitUpgrade(req *types.ReqHash) ([]byte, error) {
return req.Hash, nil
}
// Rollback 回退将缓存的mavl树删除掉
func (mavls *Store) Rollback(req *types.ReqHash) ([]byte, error) {
beg := types.Now()
......@@ -194,6 +223,9 @@ func (mavls *Store) IterateRangeByStateHash(statehash []byte, start []byte, end
// ProcEvent not support message
func (mavls *Store) ProcEvent(msg *queue.Message) {
if msg == nil {
return
}
msg.ReplyErr("Store", types.ErrActionNotSupport)
}
......
......@@ -14,6 +14,7 @@ import (
"github.com/33cn/chain33/account"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/store"
mavldb "github.com/33cn/chain33/system/store/mavl/db"
"github.com/33cn/chain33/types"
......@@ -116,6 +117,35 @@ func TestKvdbMemSet(t *testing.T) {
assert.Nil(t, notExistHash)
}
func TestKvdbMemSetUpgrade(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*Store)
assert.NotNil(t, store)
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
}
hash, err := store.MemSetUpgrade(datas, true)
assert.Nil(t, err)
keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1)
assert.Len(t, values, 2)
hash1, err := store.CommitUpgrade(&types.ReqHash{Hash: hash})
assert.Nil(t, err)
assert.Equal(t, hash, hash1)
}
func TestKvdbRollback(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
......@@ -146,6 +176,31 @@ func TestKvdbRollback(t *testing.T) {
assert.Nil(t, notExistHash)
}
func TestProcEvent(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*Store)
assert.NotNil(t, store)
store.ProcEvent(nil)
store.ProcEvent(&queue.Message{})
}
func TestDel(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*Store)
assert.NotNil(t, store)
store.Del(nil)
}
var checkKVResult []*types.KeyValue
func checkKV(k, v []byte) bool {
......
......@@ -265,6 +265,7 @@ func (m *Int64) GetData() int64 {
type ReqHash struct {
Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
Upgrade bool `protobuf:"varint,2,opt,name=upgrade,proto3" json:"upgrade,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -302,6 +303,13 @@ func (m *ReqHash) GetHash() []byte {
return nil
}
func (m *ReqHash) GetUpgrade() bool {
if m != nil {
return m.Upgrade
}
return false
}
type ReplyHash struct {
Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
......@@ -781,33 +789,34 @@ func init() {
func init() { proto.RegisterFile("common.proto", fileDescriptor_555bd8c177793206) }
var fileDescriptor_555bd8c177793206 = []byte{
// 445 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0x51, 0x6b, 0xdb, 0x30,
0x10, 0x26, 0x71, 0x93, 0xc6, 0x57, 0x3f, 0x0c, 0x33, 0x86, 0xe9, 0x5a, 0x9a, 0x69, 0x1b, 0xe4,
0x65, 0x0d, 0xcc, 0x63, 0x3f, 0x60, 0xec, 0x61, 0xa1, 0x90, 0x81, 0x5a, 0xca, 0xd8, 0x9b, 0xe2,
0xca, 0xb1, 0x88, 0x2c, 0xd9, 0x91, 0x32, 0xea, 0x7f, 0x3f, 0xee, 0xa2, 0x64, 0x1e, 0x65, 0xeb,
0xdb, 0xf7, 0xe9, 0xee, 0xf4, 0x7d, 0xf7, 0x09, 0x41, 0x52, 0xd8, 0xba, 0xb6, 0xe6, 0xba, 0xd9,
0x5a, 0x6f, 0xd3, 0x91, 0xef, 0x1a, 0xe9, 0xd8, 0x07, 0x18, 0x71, 0xd9, 0xe8, 0x2e, 0x4d, 0xe1,
0x44, 0xb9, 0xef, 0x9b, 0x6c, 0x30, 0x1d, 0xcc, 0x26, 0x9c, 0x70, 0xfa, 0x02, 0xa2, 0xda, 0xad,
0xb3, 0xe1, 0x74, 0x30, 0x4b, 0x38, 0x42, 0x76, 0x05, 0x31, 0x97, 0xed, 0xad, 0xdf, 0x2a, 0xb3,
0xc6, 0x91, 0x07, 0xe1, 0x05, 0x8d, 0xc4, 0x9c, 0x30, 0x7b, 0x03, 0x67, 0x74, 0xdf, 0x7f, 0x5a,
0xde, 0x41, 0xd2, 0x6b, 0x71, 0xe9, 0x4b, 0x18, 0xe1, 0xb9, 0xcb, 0x06, 0xd3, 0x68, 0x16, 0xf3,
0x3d, 0x61, 0x53, 0x18, 0x73, 0xd9, 0x2e, 0x8c, 0x4f, 0x5f, 0xc1, 0xb8, 0x92, 0x6a, 0x5d, 0x79,
0xba, 0x25, 0xe2, 0x81, 0xb1, 0xd7, 0x30, 0x5a, 0x18, 0xff, 0xf9, 0xd3, 0x5f, 0x22, 0x51, 0x10,
0xb9, 0x84, 0x53, 0x2e, 0xdb, 0x6f, 0xc2, 0x55, 0x58, 0xae, 0x84, 0xab, 0xa8, 0x9c, 0x70, 0xc2,
0xfb, 0x3d, 0x1a, 0xdd, 0xfd, 0xb3, 0x61, 0x42, 0xf2, 0x4b, 0xa5, 0xd9, 0x5b, 0x5a, 0x19, 0x1b,
0xa5, 0x23, 0x2f, 0x84, 0xc8, 0x6c, 0xc2, 0x03, 0x63, 0xef, 0xc3, 0xda, 0xcf, 0xb4, 0x7d, 0x84,
0xc9, 0x8d, 0xec, 0xee, 0x85, 0xde, 0x49, 0x0c, 0x77, 0x23, 0xbb, 0x20, 0x8a, 0x10, 0x83, 0xf8,
0x85, 0xa5, 0x10, 0xf8, 0x9e, 0xb0, 0x0b, 0x18, 0xdf, 0x3d, 0x3e, 0xf1, 0x19, 0x07, 0x9f, 0x3f,
0x00, 0xee, 0x54, 0x2d, 0x6f, 0xbd, 0xf0, 0x3b, 0x97, 0x66, 0x70, 0x6a, 0x7c, 0x83, 0x07, 0xa1,
0xe9, 0x40, 0xd3, 0x0b, 0x88, 0xb5, 0x2d, 0x84, 0xa6, 0xda, 0x90, 0x6a, 0x7f, 0x0e, 0x28, 0x41,
0x55, 0x96, 0x59, 0x14, 0x12, 0x54, 0x65, 0xc9, 0xce, 0x29, 0x81, 0x1b, 0xd9, 0x3d, 0x75, 0xca,
0x5a, 0x5c, 0xb7, 0xe5, 0xc2, 0x3c, 0x90, 0xb1, 0x73, 0x98, 0xc8, 0x47, 0x59, 0x2c, 0xc5, 0x51,
0xf7, 0xc8, 0x7b, 0xaf, 0x37, 0xec, 0xbf, 0x1e, 0xce, 0xac, 0xb4, 0x2d, 0x36, 0xcb, 0x5d, 0x1d,
0x64, 0x8f, 0xfc, 0xb8, 0xe8, 0x49, 0xef, 0x41, 0x14, 0x9c, 0xdd, 0xcb, 0xad, 0x53, 0xd6, 0x2c,
0x4c, 0x69, 0x31, 0x2b, 0xaf, 0xbc, 0x3e, 0xe8, 0xed, 0x09, 0x3a, 0x15, 0x4d, 0x13, 0xf6, 0x43,
0x88, 0x89, 0x14, 0x95, 0x50, 0x26, 0xcf, 0x49, 0x25, 0xe6, 0x07, 0x8a, 0x15, 0x0a, 0xe0, 0xeb,
0x8a, 0x74, 0x62, 0x7e, 0xa0, 0x5f, 0xae, 0x7e, 0x5e, 0xae, 0x95, 0xaf, 0x76, 0xab, 0xeb, 0xc2,
0xd6, 0xf3, 0x3c, 0x2f, 0xcc, 0x3c, 0x0c, 0xcd, 0xe9, 0xd3, 0xac, 0xc6, 0xf4, 0x85, 0xf2, 0xdf,
0x01, 0x00, 0x00, 0xff, 0xff, 0x2c, 0x62, 0x19, 0x6f, 0x52, 0x03, 0x00, 0x00,
// 456 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x53, 0xdf, 0x8b, 0xd3, 0x40,
0x10, 0xa6, 0xcd, 0xb5, 0x97, 0xcc, 0xe5, 0x41, 0x82, 0x48, 0x38, 0x4f, 0xae, 0xae, 0x0a, 0x7d,
0xf1, 0x0a, 0x46, 0xf4, 0x5d, 0x7c, 0xb0, 0x1c, 0x54, 0xd8, 0x3b, 0x0e, 0xf1, 0x6d, 0x9b, 0x6e,
0x92, 0xa5, 0xc9, 0x26, 0xe9, 0x6e, 0xe4, 0xf2, 0xdf, 0xcb, 0x4c, 0x37, 0xb5, 0x52, 0xf4, 0xde,
0xe6, 0xdb, 0x99, 0xc9, 0xf7, 0x63, 0x08, 0x84, 0x69, 0x5d, 0x55, 0xb5, 0xbe, 0x69, 0x76, 0xb5,
0xad, 0xa3, 0x89, 0xed, 0x1b, 0x69, 0xd8, 0x7b, 0x98, 0x70, 0xd9, 0x94, 0x7d, 0x14, 0xc1, 0x99,
0x32, 0xdf, 0xb7, 0xf1, 0x68, 0x36, 0x9a, 0xfb, 0x9c, 0xea, 0xe8, 0x19, 0x78, 0x95, 0xc9, 0xe3,
0xf1, 0x6c, 0x34, 0x0f, 0x39, 0x96, 0xec, 0x1a, 0x02, 0x2e, 0xdb, 0x3b, 0xbb, 0x53, 0x3a, 0xc7,
0x95, 0x8d, 0xb0, 0x82, 0x56, 0x02, 0x4e, 0x35, 0x7b, 0x0d, 0x17, 0xf4, 0xbd, 0xff, 0x8c, 0xbc,
0x85, 0xf0, 0x68, 0xc4, 0x44, 0xcf, 0x61, 0x82, 0xef, 0x26, 0x1e, 0xcd, 0xbc, 0x79, 0xc0, 0xf7,
0x80, 0xcd, 0x60, 0xca, 0x65, 0xbb, 0xd4, 0x36, 0x7a, 0x01, 0xd3, 0x42, 0xaa, 0xbc, 0xb0, 0xf4,
0x15, 0x8f, 0x3b, 0xc4, 0x5e, 0xc2, 0x64, 0xa9, 0xed, 0xa7, 0x8f, 0x7f, 0x91, 0x78, 0x8e, 0xe4,
0x33, 0x9c, 0x73, 0xd9, 0x7e, 0x13, 0xa6, 0xc0, 0x76, 0x21, 0x4c, 0x41, 0xed, 0x90, 0x53, 0x1d,
0xc5, 0x70, 0xde, 0x35, 0xf9, 0x4e, 0x6c, 0x24, 0xb9, 0xf3, 0xf9, 0x00, 0xf7, 0x0e, 0x9b, 0xb2,
0xff, 0xd7, 0x2a, 0xf3, 0x49, 0xd8, 0x4a, 0x95, 0xec, 0x0d, 0x85, 0x81, 0x83, 0xd2, 0x90, 0x4a,
0xaa, 0xc8, 0x46, 0xc8, 0x1d, 0x62, 0xef, 0x5c, 0x20, 0x4f, 0x8c, 0x7d, 0x00, 0xff, 0x56, 0xf6,
0x0f, 0xa2, 0xec, 0x24, 0xc6, 0xbe, 0x95, 0xbd, 0x23, 0xc5, 0x12, 0x23, 0xfa, 0x85, 0x2d, 0x77,
0x8a, 0x3d, 0x60, 0x57, 0x30, 0xbd, 0x7f, 0x3c, 0xd1, 0x19, 0x38, 0x9d, 0x3f, 0x00, 0xee, 0x55,
0x25, 0xef, 0xac, 0xb0, 0x9d, 0x41, 0xc3, 0xda, 0x36, 0xf8, 0xe0, 0x86, 0x06, 0x18, 0x5d, 0x41,
0x50, 0xd6, 0xa9, 0x28, 0xa9, 0x37, 0xa6, 0xde, 0x9f, 0x07, 0xca, 0x56, 0x65, 0x59, 0xec, 0xb9,
0x6c, 0x55, 0x96, 0xb1, 0x4b, 0x4a, 0xe0, 0x56, 0xf6, 0xa7, 0x4a, 0x59, 0x8b, 0x76, 0x5b, 0x2e,
0xf4, 0x86, 0x84, 0x5d, 0x82, 0x2f, 0x1f, 0x65, 0xba, 0x12, 0x07, 0xde, 0x03, 0x3e, 0xba, 0xeb,
0xf8, 0xf8, 0xae, 0xb8, 0xb3, 0x2e, 0xeb, 0x74, 0xbb, 0xea, 0x2a, 0x47, 0x7b, 0xc0, 0x07, 0xa3,
0x67, 0x47, 0x07, 0x51, 0x70, 0xf1, 0x20, 0x77, 0x46, 0xd5, 0x7a, 0xa9, 0xb3, 0x1a, 0xb3, 0xb2,
0xca, 0x96, 0x03, 0xdf, 0x1e, 0xa0, 0x52, 0xd1, 0x34, 0xce, 0x1f, 0x96, 0x98, 0x48, 0x5a, 0x08,
0xa5, 0x93, 0x84, 0x58, 0x02, 0x3e, 0x40, 0xec, 0x50, 0x00, 0x5f, 0xd7, 0xc4, 0x13, 0xf0, 0x01,
0x7e, 0xb9, 0xfe, 0xf9, 0x2a, 0x57, 0xb6, 0xe8, 0xd6, 0x37, 0x69, 0x5d, 0x2d, 0x92, 0x24, 0xd5,
0x0b, 0xb7, 0xb4, 0xa0, 0xdf, 0x69, 0x3d, 0xa5, 0x9f, 0x2b, 0xf9, 0x1d, 0x00, 0x00, 0xff, 0xff,
0x05, 0xe6, 0x9f, 0x5b, 0x6c, 0x03, 0x00, 0x00,
}
......@@ -591,6 +591,7 @@ func (m *StoreDel) GetHeight() int64 {
type StoreSetWithSync struct {
Storeset *StoreSet `protobuf:"bytes,1,opt,name=storeset,proto3" json:"storeset,omitempty"`
Sync bool `protobuf:"varint,2,opt,name=sync,proto3" json:"sync,omitempty"`
Upgrade bool `protobuf:"varint,3,opt,name=upgrade,proto3" json:"upgrade,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
......@@ -635,6 +636,13 @@ func (m *StoreSetWithSync) GetSync() bool {
return false
}
func (m *StoreSetWithSync) GetUpgrade() bool {
if m != nil {
return m.Upgrade
}
return false
}
type StoreGet struct {
StateHash []byte `protobuf:"bytes,1,opt,name=stateHash,proto3" json:"stateHash,omitempty"`
Keys [][]byte `protobuf:"bytes,2,rep,name=keys,proto3" json:"keys,omitempty"`
......@@ -1006,46 +1014,46 @@ func init() {
func init() { proto.RegisterFile("db.proto", fileDescriptor_8817812184a13374) }
var fileDescriptor_8817812184a13374 = []byte{
// 642 bytes of a gzipped FileDescriptorProto
// 654 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x55, 0xc1, 0x6a, 0xdb, 0x40,
0x10, 0x45, 0x92, 0x9d, 0x48, 0x93, 0xd0, 0x18, 0x11, 0x8a, 0x08, 0x29, 0x49, 0x75, 0x72, 0x29,
0x38, 0xa5, 0xee, 0xb1, 0x87, 0x26, 0x04, 0xd2, 0x62, 0xb7, 0x04, 0x19, 0x5c, 0xe8, 0xa1, 0xa0,
0xc8, 0xe3, 0x48, 0xc4, 0xde, 0x75, 0xb5, 0xab, 0x12, 0xf5, 0xd2, 0x8f, 0xe8, 0xa9, 0xbf, 0xd5,
0x2f, 0x2a, 0x3b, 0xbb, 0xb2, 0x64, 0x50, 0xe3, 0xe6, 0x36, 0x6f, 0xbc, 0x7a, 0xef, 0xcd, 0xdb,
0x59, 0x0c, 0xee, 0xec, 0x66, 0xb0, 0xca, 0xb9, 0xe4, 0x7e, 0x57, 0x96, 0x2b, 0x14, 0x47, 0xfb,
0x09, 0x5f, 0x2e, 0x39, 0xd3, 0xcd, 0xf0, 0x2b, 0xb8, 0x63, 0x8c, 0xe7, 0x9f, 0xf8, 0x0c, 0xfd,
0x1e, 0x38, 0x77, 0x58, 0x06, 0xd6, 0xa9, 0xd5, 0xdf, 0x8f, 0x54, 0xe9, 0x1f, 0x42, 0xf7, 0x7b,
0xbc, 0x28, 0x30, 0xb0, 0xa9, 0xa7, 0x81, 0xff, 0x14, 0x76, 0x52, 0xcc, 0x6e, 0x53, 0x19, 0x38,
0xa7, 0x56, 0xbf, 0x1b, 0x19, 0xe4, 0xfb, 0xd0, 0x11, 0xd9, 0x0f, 0x0c, 0x3a, 0xd4, 0xa5, 0x3a,
0xfc, 0x06, 0xde, 0x07, 0xc6, 0x30, 0x27, 0x81, 0x23, 0x70, 0x17, 0x38, 0x97, 0xef, 0x63, 0x91,
0x1a, 0x95, 0x35, 0xf6, 0x8f, 0xc1, 0xcb, 0x15, 0x0b, 0xfd, 0xa8, 0xe5, 0xea, 0xc6, 0xa3, 0x24,
0x0b, 0xf0, 0x3e, 0x9e, 0x4f, 0xc7, 0xd7, 0x39, 0xe7, 0x73, 0x2d, 0x19, 0xcf, 0x37, 0x25, 0x35,
0xf6, 0x5f, 0x01, 0x64, 0x95, 0x37, 0x11, 0xd8, 0xa7, 0x4e, 0x7f, 0xef, 0x75, 0x6f, 0x40, 0x29,
0x0d, 0xd6, 0xa6, 0xa3, 0xc6, 0x19, 0xc5, 0x96, 0x73, 0xae, 0x3d, 0x3a, 0x9a, 0xad, 0xc2, 0xe1,
0x6f, 0x0b, 0xbc, 0x89, 0xe4, 0x39, 0x3e, 0x2a, 0xcb, 0x66, 0x24, 0xce, 0x43, 0x91, 0x74, 0xfe,
0x1d, 0x49, 0xb7, 0x35, 0x92, 0x9d, 0x46, 0x24, 0xe7, 0x00, 0x63, 0x9e, 0xc4, 0x8b, 0xcb, 0x8b,
0x09, 0x4a, 0xff, 0x04, 0xec, 0xd1, 0xd4, 0xcc, 0x7b, 0x60, 0xe6, 0x1d, 0x61, 0x39, 0x55, 0x86,
0x22, 0x7b, 0x34, 0x55, 0x14, 0xf2, 0x3e, 0x9b, 0x11, 0xb1, 0x13, 0x51, 0x1d, 0xfe, 0x84, 0x3d,
0x43, 0x31, 0xce, 0x84, 0x54, 0xea, 0xab, 0x1c, 0xe7, 0xd9, 0xbd, 0x19, 0xd1, 0xa0, 0x6a, 0x6e,
0xbb, 0x9e, 0xfb, 0x18, 0xbc, 0x59, 0x96, 0x63, 0x22, 0x33, 0xce, 0xcc, 0xed, 0xd5, 0x0d, 0x95,
0x4a, 0xc2, 0x0b, 0x26, 0xcd, 0x0d, 0x6a, 0xd0, 0x6a, 0xe0, 0xcd, 0x7a, 0x86, 0x2b, 0xa4, 0x13,
0x77, 0x58, 0xea, 0x5b, 0xdb, 0x8f, 0xa8, 0x6e, 0xfd, 0xea, 0x05, 0x1c, 0xd0, 0x57, 0x11, 0xae,
0x16, 0x7a, 0x42, 0x65, 0x9d, 0xb2, 0xaf, 0x3e, 0x36, 0x28, 0x8c, 0xc1, 0xa5, 0xfb, 0x53, 0x11,
0x1d, 0x83, 0x27, 0x64, 0x2c, 0xb1, 0xb1, 0x37, 0x75, 0x63, 0x7b, 0x80, 0x9b, 0xeb, 0xea, 0x54,
0x77, 0x13, 0xbe, 0x33, 0x12, 0x97, 0xb8, 0xd8, 0x22, 0x51, 0x33, 0xd8, 0x1b, 0x0c, 0x13, 0xe8,
0x55, 0x26, 0x3f, 0x67, 0x32, 0x9d, 0x94, 0x2c, 0xf1, 0x5f, 0x82, 0x2b, 0x54, 0x4f, 0xa0, 0x24,
0xa2, 0xda, 0x54, 0x75, 0x34, 0x5a, 0x1f, 0xa0, 0xf5, 0x28, 0x59, 0x42, 0xb4, 0x6e, 0x44, 0x75,
0xf8, 0xd6, 0xd8, 0xba, 0xda, 0x3a, 0x79, 0x4b, 0xec, 0x2a, 0x62, 0xfa, 0xfa, 0x3f, 0x22, 0xfe,
0x55, 0xbd, 0x11, 0xda, 0xa1, 0x87, 0xa5, 0x0e, 0xa1, 0x2b, 0x64, 0x9c, 0xcb, 0xea, 0xbd, 0x10,
0x50, 0xfb, 0x85, 0x6c, 0x66, 0x9e, 0x8a, 0x2a, 0x95, 0x96, 0x28, 0xe6, 0x6a, 0x13, 0xf5, 0x13,
0x31, 0xa8, 0xde, 0x2c, 0xbd, 0x0e, 0xf5, 0x66, 0x2d, 0xf9, 0x4c, 0xbf, 0x0e, 0x27, 0xa2, 0x3a,
0xfc, 0x63, 0xc1, 0x93, 0xb5, 0x2b, 0x9a, 0xa2, 0x16, 0xb7, 0x5a, 0xc4, 0xed, 0x36, 0x71, 0xa7,
0x5d, 0xbc, 0xd3, 0x14, 0xef, 0x81, 0xc3, 0x8a, 0xa5, 0x31, 0xa4, 0xca, 0x36, 0x3b, 0x7e, 0x00,
0xbb, 0x0c, 0xef, 0xe5, 0x08, 0xcb, 0x60, 0x97, 0x48, 0x2b, 0xb8, 0x4e, 0xdf, 0x6d, 0x2c, 0x7d,
0x1d, 0xb5, 0xb7, 0x11, 0xf5, 0x73, 0xf0, 0xae, 0xf3, 0x82, 0xe1, 0x65, 0x2c, 0x63, 0x65, 0x27,
0x8d, 0x45, 0x2a, 0x02, 0x8b, 0xce, 0x68, 0x10, 0xf6, 0xcd, 0xd8, 0x74, 0x67, 0xd7, 0x9c, 0x2f,
0x1a, 0x64, 0x56, 0x93, 0xec, 0xe2, 0xe4, 0xcb, 0xb3, 0xdb, 0x4c, 0xa6, 0xc5, 0xcd, 0x20, 0xe1,
0xcb, 0xb3, 0xe1, 0x30, 0x61, 0x67, 0x49, 0x1a, 0x67, 0x6c, 0x38, 0x3c, 0xa3, 0x45, 0xbb, 0xd9,
0xa1, 0x7f, 0x93, 0xe1, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x4e, 0xf2, 0x6d, 0xb8, 0x6e, 0x06,
0x00, 0x00,
0x38, 0xa5, 0xee, 0xb1, 0x87, 0x26, 0x04, 0xd2, 0x62, 0xb7, 0x04, 0x05, 0x5c, 0xe8, 0xa1, 0xb0,
0x91, 0xc6, 0x91, 0x88, 0xbd, 0xeb, 0x4a, 0xab, 0x12, 0xf5, 0xd2, 0x8f, 0xe8, 0xa9, 0xbf, 0xd5,
0x2f, 0x2a, 0x3b, 0xbb, 0xb2, 0x64, 0x50, 0x93, 0xe6, 0x36, 0x6f, 0xbd, 0x7a, 0xef, 0xcd, 0x9b,
0x59, 0x0c, 0x6e, 0x72, 0x3d, 0x5a, 0xe5, 0x42, 0x0a, 0xbf, 0x2f, 0xab, 0x15, 0x16, 0x07, 0xbb,
0xb1, 0x58, 0x2e, 0x05, 0xd7, 0x87, 0xe1, 0x57, 0x70, 0xa7, 0xc8, 0xe6, 0x9f, 0x44, 0x82, 0xfe,
0x00, 0x9c, 0x5b, 0xac, 0x02, 0xeb, 0xd8, 0x1a, 0xee, 0x46, 0xaa, 0xf4, 0xf7, 0xa1, 0xff, 0x9d,
0x2d, 0x4a, 0x0c, 0x6c, 0x3a, 0xd3, 0xc0, 0x7f, 0x0a, 0x5b, 0x29, 0x66, 0x37, 0xa9, 0x0c, 0x9c,
0x63, 0x6b, 0xd8, 0x8f, 0x0c, 0xf2, 0x7d, 0xe8, 0x15, 0xd9, 0x0f, 0x0c, 0x7a, 0x74, 0x4a, 0x75,
0xf8, 0x0d, 0xbc, 0x0f, 0x9c, 0x63, 0x4e, 0x02, 0x07, 0xe0, 0x2e, 0x70, 0x2e, 0xdf, 0xb3, 0x22,
0x35, 0x2a, 0x6b, 0xec, 0x1f, 0x82, 0x97, 0x2b, 0x16, 0xfa, 0x51, 0xcb, 0x35, 0x07, 0x8f, 0x92,
0x2c, 0xc1, 0xfb, 0x78, 0x3a, 0x9b, 0x5e, 0xe6, 0x42, 0xcc, 0xb5, 0x24, 0x9b, 0x6f, 0x4a, 0x6a,
0xec, 0xbf, 0x02, 0xc8, 0x6a, 0x6f, 0x45, 0x60, 0x1f, 0x3b, 0xc3, 0x9d, 0xd7, 0x83, 0x11, 0xa5,
0x34, 0x5a, 0x9b, 0x8e, 0x5a, 0x77, 0x14, 0x5b, 0x2e, 0x84, 0xf6, 0xe8, 0x68, 0xb6, 0x1a, 0x87,
0xbf, 0x2d, 0xf0, 0xae, 0xa4, 0xc8, 0xf1, 0x51, 0x59, 0xb6, 0x23, 0x71, 0xee, 0x8b, 0xa4, 0xf7,
0xef, 0x48, 0xfa, 0x9d, 0x91, 0x6c, 0xb5, 0x22, 0x39, 0x05, 0x98, 0x8a, 0x98, 0x2d, 0xce, 0xcf,
0xae, 0x50, 0xfa, 0x47, 0x60, 0x4f, 0x66, 0xa6, 0xdf, 0x3d, 0xd3, 0xef, 0x04, 0xab, 0x99, 0x32,
0x14, 0xd9, 0x93, 0x99, 0xa2, 0x90, 0x77, 0x59, 0x42, 0xc4, 0x4e, 0x44, 0x75, 0xf8, 0x13, 0x76,
0x0c, 0xc5, 0x34, 0x2b, 0xa4, 0x52, 0x5f, 0xe5, 0x38, 0xcf, 0xee, 0x4c, 0x8b, 0x06, 0xd5, 0x7d,
0xdb, 0x4d, 0xdf, 0x87, 0xe0, 0x25, 0x59, 0x8e, 0xb1, 0xcc, 0x04, 0x37, 0xd3, 0x6b, 0x0e, 0x54,
0x2a, 0xb1, 0x28, 0xb9, 0x34, 0x13, 0xd4, 0xa0, 0xd3, 0xc0, 0x9b, 0x75, 0x0f, 0x17, 0x48, 0x37,
0x6e, 0xb1, 0xd2, 0x53, 0xdb, 0x8d, 0xa8, 0xee, 0xfc, 0xea, 0x05, 0xec, 0xd1, 0x57, 0x11, 0xae,
0x16, 0xba, 0x43, 0x65, 0x9d, 0xb2, 0xaf, 0x3f, 0x36, 0x28, 0x64, 0xe0, 0xd2, 0xfc, 0x54, 0x44,
0x87, 0xe0, 0x15, 0x92, 0x49, 0x6c, 0xed, 0x4d, 0x73, 0xf0, 0x70, 0x80, 0x9b, 0xeb, 0xea, 0xd4,
0xb3, 0x09, 0xdf, 0x19, 0x89, 0x73, 0x5c, 0x3c, 0x20, 0xd1, 0x30, 0xd8, 0x1b, 0x0c, 0x4b, 0x18,
0xd4, 0x26, 0x3f, 0x67, 0x32, 0xbd, 0xaa, 0x78, 0xec, 0xbf, 0x04, 0xb7, 0x50, 0x67, 0x05, 0x4a,
0x22, 0x6a, 0x4c, 0xd5, 0x57, 0xa3, 0xf5, 0x05, 0x5a, 0x8f, 0x8a, 0xc7, 0x44, 0xeb, 0x46, 0x54,
0xfb, 0x01, 0x6c, 0x97, 0xab, 0x9b, 0x9c, 0x25, 0x48, 0x7e, 0xdd, 0xa8, 0x86, 0xe1, 0x5b, 0x63,
0xf8, 0xe2, 0xc1, 0x4c, 0x3a, 0x06, 0xa2, 0xc2, 0xa7, 0xaf, 0xff, 0x23, 0xfc, 0x5f, 0xf5, 0xeb,
0xa1, 0xed, 0xba, 0x5f, 0x6a, 0x1f, 0xfa, 0x85, 0x64, 0xb9, 0xac, 0x5f, 0x12, 0x01, 0xb5, 0x79,
0xc8, 0x13, 0xf3, 0x88, 0x54, 0xa9, 0xb4, 0x8a, 0x72, 0xae, 0x76, 0x54, 0x3f, 0x1e, 0x83, 0x9a,
0x9d, 0xd3, 0x8b, 0xd2, 0xec, 0xdc, 0x52, 0x24, 0xfa, 0xdd, 0x38, 0x11, 0xd5, 0xe1, 0x1f, 0x0b,
0x9e, 0xac, 0x5d, 0x51, 0x17, 0x8d, 0xb8, 0xd5, 0x21, 0x6e, 0x77, 0x89, 0x3b, 0xdd, 0xe2, 0xbd,
0xb6, 0xf8, 0x00, 0x1c, 0x5e, 0x2e, 0x8d, 0x21, 0x55, 0x76, 0xd9, 0x51, 0x73, 0xe2, 0x78, 0x27,
0x27, 0x58, 0x05, 0xdb, 0x44, 0x5a, 0xc3, 0x75, 0xfa, 0x6e, 0xeb, 0x39, 0x34, 0x51, 0x7b, 0x1b,
0x51, 0x3f, 0x07, 0xef, 0x32, 0x2f, 0x39, 0x9e, 0x33, 0xc9, 0x94, 0x9d, 0x94, 0x15, 0x69, 0x11,
0x58, 0x74, 0x47, 0x83, 0x70, 0x68, 0xda, 0xa6, 0x99, 0x5d, 0x0a, 0xb1, 0x68, 0x91, 0x59, 0x6d,
0xb2, 0xb3, 0xa3, 0x2f, 0xcf, 0x6e, 0x32, 0x99, 0x96, 0xd7, 0xa3, 0x58, 0x2c, 0x4f, 0xc6, 0xe3,
0x98, 0x9f, 0xc4, 0x29, 0xcb, 0xf8, 0x78, 0x7c, 0x42, 0x2b, 0x78, 0xbd, 0x45, 0xff, 0x33, 0xe3,
0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xb7, 0xe6, 0x2e, 0x76, 0x88, 0x06, 0x00, 0x00,
}
......@@ -151,6 +151,8 @@ const (
EventGetProperFee = 140
EventReplyProperFee = 141
EventReExecBlock = 142
//exec
EventBlockChainQuery = 212
EventConsensusQuery = 213
......
......@@ -30,6 +30,7 @@ message Int64 {
message ReqHash {
bytes hash = 1;
bool upgrade = 2;
}
message ReplyHash {
......
......@@ -71,6 +71,7 @@ message StoreDel {
message StoreSetWithSync {
StoreSet storeset = 1;
bool sync = 2;
bool upgrade = 3;
}
message StoreGet {
......
......@@ -152,15 +152,15 @@ func RunChain33(name string) {
exec := executor.New(cfg.Exec, sub.Exec)
exec.SetQueueClient(q.Client())
log.Info("loading store module")
s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client())
log.Info("loading blockchain module")
chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client())
chain.UpgradeChain()
log.Info("loading store module")
s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client())
log.Info("loading consensus module")
cs := consensus.New(cfg.Consensus, sub.Consensus)
cs.SetQueueClient(q.Client())
......
......@@ -59,9 +59,9 @@ func ExecTx(client queue.Client, prevStateRoot []byte, block *types.Block) (*typ
}
//ExecKVMemSet : send kv values to memory store and set it in db
func ExecKVMemSet(client queue.Client, prevStateRoot []byte, height int64, kvset []*types.KeyValue, sync bool) ([]byte, error) {
func ExecKVMemSet(client queue.Client, prevStateRoot []byte, height int64, kvset []*types.KeyValue, sync bool, upgrade bool) ([]byte, error) {
set := &types.StoreSet{StateHash: prevStateRoot, KV: kvset, Height: height}
setwithsync := &types.StoreSetWithSync{Storeset: set, Sync: sync}
setwithsync := &types.StoreSetWithSync{Storeset: set, Sync: sync, Upgrade: upgrade}
msg := client.NewMessage("store", types.EventStoreMemSet, setwithsync)
err := client.Send(msg, true)
......@@ -77,8 +77,8 @@ func ExecKVMemSet(client queue.Client, prevStateRoot []byte, height int64, kvset
}
//ExecKVSetCommit : commit the data set opetation to db
func ExecKVSetCommit(client queue.Client, hash []byte) error {
req := &types.ReqHash{Hash: hash}
func ExecKVSetCommit(client queue.Client, hash []byte, upgrade bool) error {
req := &types.ReqHash{Hash: hash, Upgrade: upgrade}
msg := client.NewMessage("store", types.EventStoreCommit, req)
err := client.Send(msg, true)
if err != nil {
......
......@@ -312,7 +312,7 @@ func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, er
beg = types.Now()
block.TxHash = calcHash
var detail types.BlockDetail
calcHash, err = ExecKVMemSet(client, prevStateRoot, block.Height, kvset, sync)
calcHash, err = ExecKVMemSet(client, prevStateRoot, block.Height, kvset, sync, false)
if err != nil {
return nil, nil, err
}
......@@ -341,7 +341,7 @@ func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, er
}
ulog.Info("ExecBlock", "CheckBlock", types.Since(beg))
// 写数据库失败时需要及时返回错误,防止错误数据被写入localdb中CHAIN33-567
err = ExecKVSetCommit(client, block.StateHash)
err = ExecKVSetCommit(client, block.StateHash, false)
if err != nil {
return nil, nil, err
}
......@@ -350,6 +350,48 @@ func ExecBlock(client queue.Client, prevStateRoot []byte, block *types.Block, er
return &detail, deltx, nil
}
// ExecBlockUpgrade : just exec block
func ExecBlockUpgrade(client queue.Client, prevStateRoot []byte, block *types.Block, sync bool) error {
//发送执行交易给execs模块
//通过consensus module 再次检查
ulog.Debug("ExecBlockUpgrade", "height------->", block.Height, "ntx", len(block.Txs))
beg := types.Now()
beg1 := beg
defer func() {
ulog.Info("ExecBlockUpgrade", "height", block.Height, "ntx", len(block.Txs), "writebatchsync", sync, "cost", types.Since(beg1))
}()
//tx交易去重处理, 这个地方要查询数据库,需要一个更快的办法
cacheTxs := types.TxsToCache(block.Txs)
var err error
block.Txs = types.CacheToTxs(cacheTxs)
//println("1")
receipts, err := ExecTx(client, prevStateRoot, block)
if err != nil {
return err
}
ulog.Info("ExecBlockUpgrade", "ExecTx", types.Since(beg))
beg = types.Now()
var kvset []*types.KeyValue
for i := 0; i < len(receipts.Receipts); i++ {
receipt := receipts.Receipts[i]
kvset = append(kvset, receipt.KV...)
}
kvset = DelDupKey(kvset)
calcHash, err := ExecKVMemSet(client, prevStateRoot, block.Height, kvset, sync, true)
if err != nil {
return err
}
//println("2")
if !bytes.Equal(block.StateHash, calcHash) {
return types.ErrCheckStateHash
}
ulog.Info("ExecBlockUpgrade", "CheckBlock", types.Since(beg))
// 写数据库失败时需要及时返回错误,防止错误数据被写入localdb中CHAIN33-567
err = ExecKVSetCommit(client, calcHash, true)
return err
}
//CreateNewBlock : Create a New Block
func CreateNewBlock(parent *types.Block, txs []*types.Transaction) *types.Block {
newblock := &types.Block{}
......
......@@ -278,6 +278,18 @@ func TestExecBlock(t *testing.T) {
assert.NoError(t, err)
}
func TestExecBlockUpgrade(t *testing.T) {
client := &testClient{}
client.On("Send", mock.Anything, mock.Anything).Return(nil)
var txs []*types.Transaction
addr, priv := Genaddress()
tx := CreateCoinsTx(priv, addr, types.Coin)
tx.Sign(types.SECP256K1, priv)
txs = append(txs, tx)
err := ExecBlockUpgrade(client, nil, &types.Block{Txs: txs}, false)
assert.NoError(t, err)
}
func TestExecAndCheckBlock(t *testing.T) {
client := &testClient{}
client.On("Send", mock.Anything, mock.Anything).Return(nil)
......
......@@ -143,12 +143,12 @@ func SaveAccountTomavl(client queue.Client, prevStateRoot []byte, accs []*types.
kvs := accountdb.GetKVSet(acc)
kvset = append(kvset, kvs...)
}
hash, err := util.ExecKVMemSet(client, prevStateRoot, 0, kvset, true)
hash, err := util.ExecKVMemSet(client, prevStateRoot, 0, kvset, true, false)
if err != nil {
panic(err)
}
Statehash = hash
util.ExecKVSetCommit(client, Statehash)
util.ExecKVSetCommit(client, Statehash, false)
return hash
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment