Unverified Commit cc23b645 authored by vipwzw's avatar vipwzw Committed by GitHub

Merge pull request #383 from lyh169/kvmvccmavl_exec

Kvmvccmavl exec
parents 61e31a93 26b186f3
......@@ -135,6 +135,8 @@ name="mavl"
driver="leveldb"
dbPath="datadir/mavltree"
dbCache=128
# store数据库版本
storedbVersion="1.0.0"
[store.sub.mavl]
enableMavlPrefix=false
......@@ -155,8 +157,6 @@ pruneHeight=10000
enableMemTree=true
# 是否使能mavl叶子节点数据载入内存
enableMemVal=true
# 是否使能升级kvmvcc(在mavl数据上)
enableUpdateKvmvcc=false
[wallet]
minFee=100000
......
......@@ -33,11 +33,6 @@ var (
delMavlDataState int32
wg sync.WaitGroup
quit bool
// 用来阻塞查看当前是否需要升级数据库
done chan struct{}
// 使能mavl在当前区块基础上升级kvmvcc
enableUpdateKvmvcc bool
)
const (
......@@ -59,7 +54,6 @@ func DisableLog() {
func init() {
drivers.Reg("kvmvccmavl", New)
done = make(chan struct{}, 1)
}
// KVmMavlStore provide kvmvcc and mavl store interface implementation
......@@ -96,8 +90,7 @@ type subConfig struct {
// 是否使能内存树
EnableMemTree bool `json:"enableMemTree"`
// 是否使能内存树中叶子节点
EnableMemVal bool `json:"enableMemVal"`
EnableUpdateKvmvcc bool `json:"enableUpdateKvmvcc"`
EnableMemVal bool `json:"enableMemVal"`
}
// New construct KVMVCCStore module
......@@ -120,10 +113,6 @@ func New(cfg *types.Store, sub []byte) queue.Module {
subMavlcfg.EnableMemVal = subcfg.EnableMemVal
}
if subcfg.EnableUpdateKvmvcc {
enableUpdateKvmvcc = true
}
bs := drivers.NewBaseStore(cfg)
cache, err := lru.New(cacheSize)
if err != nil {
......@@ -261,41 +250,10 @@ func (kvmMavls *KVmMavlStore) IterateRangeByStateHash(statehash []byte, start []
// ProcEvent handles supported events
func (kvmMavls *KVmMavlStore) ProcEvent(msg *queue.Message) {
//msg.ReplyErr("KVmMavlStore", types.ErrActionNotSupport)
client := kvmMavls.GetQueueClient()
if msg != nil && msg.Ty == types.EventReExecBlock {
reData := msg.GetData().(*types.ReplyString)
if reData.Data == "over" {
kmlog.Info("ProcEvent update store over")
msg.ReplyErr("KVmMavlStore", nil)
done <- struct{}{}
return
}
} else if msg == nil {
if !enableUpdateKvmvcc {
return
}
height, err := kvmMavls.KVMVCCStore.GetMaxVersion()
if err != nil {
height = 0
} else {
height++
}
msg1 := client.NewMessage("blockchain", types.EventReExecBlock, &types.ReqInt{Height: height})
err = client.Send(msg1, true)
if err != nil {
return
}
resp, err := client.Wait(msg1)
if err != nil {
return
}
data := resp.GetData().(*types.ReplyString)
if data.Data == "need" {
//进程阻塞
<-done
}
if msg == nil {
return
}
msg.ReplyErr("KVmMavlStore", types.ErrActionNotSupport)
}
// MemSetUpgrade set kvs to the mem of KVmMavlStore module not cache the tree and return the StateHash
......
......@@ -19,11 +19,9 @@ import (
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
qmocks "github.com/33cn/chain33/queue/mocks"
drivers "github.com/33cn/chain33/system/store"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
......@@ -539,10 +537,6 @@ func TestIterateRangeByStateHash(t *testing.T) {
assert.Equal(t, int64(0), resp.Amount)
}
type testClient struct {
qmocks.Client
}
func TestProcEvent(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
......@@ -552,19 +546,6 @@ func TestProcEvent(t *testing.T) {
store := New(storeCfg, sub).(*KVmMavlStore)
assert.NotNil(t, store)
client := &qmocks.Client{}
client.On("Send", mock.Anything, mock.Anything).Return(nil)
client.On("Sub", mock.Anything, mock.Anything).Return(nil)
client.On("Recv", mock.Anything, mock.Anything).Return(nil)
client.On("NewMessage", mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
client.On("Wait", mock.Anything).Return(&queue.Message{Data: &types.ReplyString{Data: "other"}}, nil).Once()
store.SetQueueClient(client)
store.ProcEvent(nil)
enableUpdateKvmvcc = true
defer func() {
enableUpdateKvmvcc = false
}()
store.ProcEvent(nil)
store.ProcEvent(&queue.Message{})
}
......
......@@ -73,7 +73,7 @@ func (chain *BlockChain) ReExecBlock(startHeight, curHeight int64) {
}
prevStateHash = block.StateHash
//更新高度
err = chain.upgradeMeta(startHeight)
err = chain.upgradeMeta(i)
if err != nil {
panic(err)
}
......
......@@ -136,7 +136,7 @@ func RunChain33(name string) {
version.SetLocalDBVersion(cfg.Store.LocalDBVersion)
version.SetStoreDBVersion(cfg.Store.StoreDBVersion)
version.SetAppVersion(cfg.Version)
log.Info(cfg.Title + "-app:" + version.GetAppVersion() + " chain33:" + version.GetVersion() + " localdb:" + version.GetLocalDBVersion())
log.Info(cfg.Title + "-app:" + version.GetAppVersion() + " chain33:" + version.GetVersion() + " localdb:" + version.GetLocalDBVersion() + " storedb:" + version.GetStoreDBVersion())
log.Info("loading queue")
q := queue.New("channel")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment