Unverified Commit 1d82de89 authored by vipwzw's avatar vipwzw Committed by GitHub

Merge pull request #311 from lyh169/kvmvcc_mavl

Kvmvcc mavl
parents bb1e9ab9 aad61297
package init package init
import ( import (
_ "github.com/33cn/plugin/plugin/store/kvdb" //auto gen _ "github.com/33cn/plugin/plugin/store/kvdb" //auto gen
_ "github.com/33cn/plugin/plugin/store/kvmvcc" //auto gen _ "github.com/33cn/plugin/plugin/store/kvmvcc" //auto gen
_ "github.com/33cn/plugin/plugin/store/mpt" //auto gen _ "github.com/33cn/plugin/plugin/store/kvmvccmavl" //auto gen
_ "github.com/33cn/plugin/plugin/store/mpt" //auto gen
) )
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package kvmvccmavl kvmvcc+mavl接口
package kvmvccmavl
import (
"bytes"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
dbm "github.com/33cn/chain33/common/db"
clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/store"
"github.com/33cn/chain33/types"
"github.com/hashicorp/golang-lru"
)
var (
kmlog = log.New("module", "kvmvccMavl")
// ErrStateHashLost ...
ErrStateHashLost = errors.New("ErrStateHashLost")
kvmvccMavlFork int64 = 200 * 10000
isDelMavlData = false
delMavlDataHeight = kvmvccMavlFork + 10000
delMavlDataState int32
wg sync.WaitGroup
quit bool
)
const (
cacheSize = 2048 //可以缓存2048个roothash, height对
batchDataSize = 1024 * 1024 * 1
delMavlStateStart = 1
delMavlStateEnd = 0
)
// SetLogLevel set log level
func SetLogLevel(level string) {
clog.SetLogLevel(level)
}
// DisableLog disable log output
func DisableLog() {
kmlog.SetHandler(log.DiscardHandler())
}
func init() {
drivers.Reg("kvmvccmavl", New)
}
// KVmMavlStore provide kvmvcc and mavl store interface implementation
type KVmMavlStore struct {
*drivers.BaseStore
*KVMVCCStore
*MavlStore
cache *lru.Cache
}
type subKVMVCCConfig struct {
EnableMVCCIter bool `json:"enableMVCCIter"`
EnableMavlPrune bool `json:"enableMavlPrune"`
PruneHeight int32 `json:"pruneHeight"`
}
type subMavlConfig struct {
EnableMavlPrefix bool `json:"enableMavlPrefix"`
EnableMVCC bool `json:"enableMVCC"`
EnableMavlPrune bool `json:"enableMavlPrune"`
PruneHeight int32 `json:"pruneHeight"`
}
type subConfig struct {
EnableMVCCIter bool `json:"enableMVCCIter"`
EnableMavlPrefix bool `json:"enableMavlPrefix"`
EnableMVCC bool `json:"enableMVCC"`
EnableMavlPrune bool `json:"enableMavlPrune"`
PruneHeight int32 `json:"pruneHeight"`
}
// New construct KVMVCCStore module
func New(cfg *types.Store, sub []byte) queue.Module {
bs := drivers.NewBaseStore(cfg)
var kvms *KVmMavlStore
var subcfg subConfig
var subKVMVCCcfg subKVMVCCConfig
var subMavlcfg subMavlConfig
if sub != nil {
types.MustDecode(sub, &subcfg)
subKVMVCCcfg.EnableMVCCIter = subcfg.EnableMVCCIter
subKVMVCCcfg.EnableMavlPrune = subcfg.EnableMavlPrune
subKVMVCCcfg.PruneHeight = subcfg.PruneHeight
subMavlcfg.EnableMavlPrefix = subcfg.EnableMavlPrefix
subMavlcfg.EnableMVCC = subcfg.EnableMVCC
subMavlcfg.EnableMavlPrune = subcfg.EnableMavlPrune
subMavlcfg.PruneHeight = subcfg.PruneHeight
}
cache, err := lru.New(cacheSize)
if err != nil {
panic("new KVmMavlStore fail")
}
kvms = &KVmMavlStore{bs, NewKVMVCC(&subKVMVCCcfg, bs.GetDB()),
NewMavl(&subMavlcfg, bs.GetDB()), cache}
// 查询是否已经删除mavl
_, err = bs.GetDB().Get(genDelMavlKey(mvccPrefix))
if err == nil {
isDelMavlData = true
}
bs.SetChild(kvms)
return kvms
}
// Close the KVmMavlStore module
func (kvmMavls *KVmMavlStore) Close() {
quit = true
wg.Wait()
kvmMavls.KVMVCCStore.Close()
kvmMavls.MavlStore.Close()
kvmMavls.BaseStore.Close()
kmlog.Info("store kvmMavls closed")
}
// Set kvs with statehash to KVmMavlStore
func (kvmMavls *KVmMavlStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
if datas.Height < kvmvccMavlFork {
hash, err := kvmMavls.MavlStore.Set(datas, sync)
if err != nil {
return hash, err
}
_, err = kvmMavls.KVMVCCStore.Set(datas, hash, sync)
if err != nil {
return hash, err
}
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
return hash, err
}
// 仅仅做kvmvcc
hash, err := kvmMavls.KVMVCCStore.Set(datas, nil, sync)
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
// 删除Mavl数据
if datas.Height > delMavlDataHeight && !isDelMavlData && !isDelMavling() {
wg.Add(1)
go DelMavl(kvmMavls.GetDB())
}
return hash, err
}
// Get kvs with statehash from KVmMavlStore
func (kvmMavls *KVmMavlStore) Get(datas *types.StoreGet) [][]byte {
return kvmMavls.KVMVCCStore.Get(datas)
}
// MemSet set kvs to the mem of KVmMavlStore module and return the StateHash
func (kvmMavls *KVmMavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
if datas.Height < kvmvccMavlFork {
hash, err := kvmMavls.MavlStore.MemSet(datas, sync)
if err != nil {
return hash, err
}
_, err = kvmMavls.KVMVCCStore.MemSet(datas, hash, sync)
if err != nil {
return hash, err
}
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
return hash, err
}
// 仅仅做kvmvcc
hash, err := kvmMavls.KVMVCCStore.MemSet(datas, nil, sync)
if err == nil {
kvmMavls.cache.Add(string(hash), datas.Height)
}
// 删除Mavl数据
if datas.Height > delMavlDataHeight && !isDelMavlData && !isDelMavling() {
wg.Add(1)
go DelMavl(kvmMavls.GetDB())
}
return hash, err
}
// Commit kvs in the mem of KVmMavlStore module to state db and return the StateHash
func (kvmMavls *KVmMavlStore) Commit(req *types.ReqHash) ([]byte, error) {
if value, ok := kvmMavls.cache.Get(string(req.Hash)); ok {
if value.(int64) < kvmvccMavlFork {
hash, err := kvmMavls.MavlStore.Commit(req)
if err != nil {
return hash, err
}
_, err = kvmMavls.KVMVCCStore.Commit(req)
return hash, err
}
return kvmMavls.KVMVCCStore.Commit(req)
}
return kvmMavls.KVMVCCStore.Commit(req)
}
// Rollback kvs in the mem of KVmMavlStore module and return the StateHash
func (kvmMavls *KVmMavlStore) Rollback(req *types.ReqHash) ([]byte, error) {
if value, ok := kvmMavls.cache.Get(string(req.Hash)); ok {
if value.(int64) < kvmvccMavlFork {
hash, err := kvmMavls.MavlStore.Rollback(req)
if err != nil {
return hash, err
}
_, err = kvmMavls.KVMVCCStore.Rollback(req)
return hash, err
}
return kvmMavls.KVMVCCStore.Rollback(req)
}
return kvmMavls.KVMVCCStore.Rollback(req)
}
// IterateRangeByStateHash travel with Prefix by StateHash to get the latest version kvs.
func (kvmMavls *KVmMavlStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
if value, ok := kvmMavls.cache.Get(string(statehash)); ok {
if value.(int64) < kvmvccMavlFork {
kvmMavls.MavlStore.IterateRangeByStateHash(statehash, start, end, ascending, fn)
return
}
kvmMavls.KVMVCCStore.IterateRangeByStateHash(statehash, start, end, ascending, fn)
return
}
kvmMavls.KVMVCCStore.IterateRangeByStateHash(statehash, start, end, ascending, fn)
}
// ProcEvent handles supported events
func (kvmMavls *KVmMavlStore) ProcEvent(msg *queue.Message) {
msg.ReplyErr("KVmMavlStore", types.ErrActionNotSupport)
}
// Del set kvs to nil with StateHash
func (kvmMavls *KVmMavlStore) Del(req *types.StoreDel) ([]byte, error) {
if req.Height < kvmvccMavlFork {
hash, err := kvmMavls.MavlStore.Del(req)
if err != nil {
return hash, err
}
_, err = kvmMavls.KVMVCCStore.Del(req)
if err != nil {
return hash, err
}
if err == nil {
kvmMavls.cache.Remove(string(req.StateHash))
}
return hash, err
}
// 仅仅做kvmvcc
hash, err := kvmMavls.KVMVCCStore.Del(req)
if err == nil {
kvmMavls.cache.Remove(string(req.StateHash))
}
return hash, err
}
// DelMavl 数据库中mavl数据清除
// 达到kvmvccMavlFork + 100000 后触发清除
func DelMavl(db dbm.DB) {
defer wg.Done()
setDelMavl(delMavlStateStart)
defer setDelMavl(delMavlStateEnd)
isDel := delMavlData(db)
if isDel {
isDelMavlData = true
kmlog.Info("DelMavl success")
}
}
func delMavlData(db dbm.DB) bool {
it := db.Iterator(nil, nil, true)
defer it.Close()
batch := db.NewBatch(true)
for it.Rewind(); it.Valid(); it.Next() {
if quit {
return false
}
if !bytes.HasPrefix(it.Key(), mvccPrefix) { // 将非mvcc的mavl数据全部删除
batch.Delete(it.Key())
if batch.ValueSize() > batchDataSize {
batch.Write()
batch.Reset()
time.Sleep(time.Millisecond * 100)
}
}
}
batch.Set(genDelMavlKey(mvccPrefix), []byte(""))
batch.Write()
return true
}
func genDelMavlKey(prefix []byte) []byte {
delMavl := "--delMavlData--"
return []byte(fmt.Sprintf("%s%s", string(prefix), delMavl))
}
func isDelMavling() bool {
return atomic.LoadInt32(&delMavlDataState) == 1
}
func setDelMavl(state int32) {
atomic.StoreInt32(&delMavlDataState, state)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kvmvccmavl
import (
"encoding/json"
"io/ioutil"
"os"
"testing"
"time"
"fmt"
"bytes"
"github.com/33cn/chain33/account"
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db"
drivers "github.com/33cn/chain33/system/store"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const MaxKeylenth int = 64
func newStoreCfg(dir string) *types.Store {
return &types.Store{Name: "kvmvccMavl_test", Driver: "leveldb", DbPath: dir, DbCache: 100}
}
func newStoreCfgIter(dir string) (*types.Store, []byte) {
return &types.Store{Name: "kvmvccMavl_test", Driver: "leveldb", DbPath: dir, DbCache: 100}, enableConfig()
}
func TestKvmvccMavlNewClose(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
store.Close()
}
func TestKvmvccMavlSetGet(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
kvmvccMavlFork = 50
defer func() {
kvmvccMavlFork = 200 * 10000
}()
hash := drivers.EmptyRoot[:]
for i := 0; i < 100; i++ {
var kvs []*types.KeyValue
kvs = append(kvs, &types.KeyValue{Key: []byte(fmt.Sprintf("k%d", i)), Value: []byte(fmt.Sprintf("v%d", i))})
kvs = append(kvs, &types.KeyValue{Key: []byte(fmt.Sprintf("key%d", i)), Value: []byte(fmt.Sprintf("value%d", i))})
datas := &types.StoreSet{
StateHash: hash,
KV: kvs,
Height: int64(i)}
hash, err = store.Set(datas, true)
assert.Nil(t, err)
keys := [][]byte{[]byte(fmt.Sprintf("k%d", i)), []byte(fmt.Sprintf("key%d", i))}
get := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get)
assert.Len(t, values, 2)
assert.Equal(t, []byte(fmt.Sprintf("v%d", i)), values[0])
assert.Equal(t, []byte(fmt.Sprintf("value%d", i)), values[1])
}
}
func TestKvmvccMavlMemSet(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
kvmvccMavlFork = 50
defer func() {
kvmvccMavlFork = 200 * 10000
}()
hash := drivers.EmptyRoot[:]
for i := 0; i < 100; i++ {
var kvs []*types.KeyValue
kvs = append(kvs, &types.KeyValue{Key: []byte(fmt.Sprintf("k%d", i)), Value: []byte(fmt.Sprintf("v%d", i))})
kvs = append(kvs, &types.KeyValue{Key: []byte(fmt.Sprintf("key%d", i)), Value: []byte(fmt.Sprintf("value%d", i))})
datas := &types.StoreSet{
StateHash: hash,
KV: kvs,
Height: int64(i)}
hash, err = store.MemSet(datas, true)
assert.Nil(t, err)
actHash, _ := store.Commit(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash)
keys := [][]byte{[]byte(fmt.Sprintf("k%d", i)), []byte(fmt.Sprintf("key%d", i))}
get := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get)
assert.Len(t, values, 2)
assert.Equal(t, []byte(fmt.Sprintf("v%d", i)), values[0])
assert.Equal(t, []byte(fmt.Sprintf("value%d", i)), values[1])
}
notExistHash, _ := store.Commit(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash)
}
func TestKvmvccMavlCommit(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
// 设置分叉高度
forkHeight := 100
kvmvccMavlFork = int64(forkHeight)
defer func() {
kvmvccMavlFork = 200 * 10000
}()
frontHash := make([]byte, 0, 32)
var hash []byte
for i := 0; i < 200; i++ {
datas.Height = int64(i)
hash, err = store.MemSet(datas, true)
assert.Nil(t, err)
req := &types.ReqHash{
Hash: hash,
}
if i+1 == forkHeight {
frontHash = append(frontHash, hash...)
}
_, err = store.Commit(req)
assert.NoError(t, err, "NoError")
datas.StateHash = hash
}
if len(frontHash) > 0 {
get := &types.StoreGet{StateHash: frontHash, Keys: keys}
values := store.Get(get)
require.Equal(t, len(values), len(keys))
for i := range keys {
require.Equal(t, kv[i].Value, values[i])
}
}
if len(hash) > 0 {
get := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get)
require.Equal(t, len(values), len(keys))
for i := range keys {
require.Equal(t, kv[i].Value, values[i])
}
}
}
func TestKvmvccMavlRollback(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
hash, err := store.MemSet(datas, true)
assert.Nil(t, err)
keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get)
assert.Len(t, values, 2)
actHash, _ := store.Rollback(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash)
notExistHash, err := store.Rollback(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash)
assert.Equal(t, types.ErrHashNotFound.Error(), err.Error())
// 分叉之后
kvmvccMavlFork = 1
defer func() {
kvmvccMavlFork = 200 * 10000
}()
hash, err = store.MemSet(datas, true)
assert.Nil(t, err)
actHash, _ = store.Commit(&types.ReqHash{Hash: hash})
assert.Equal(t, hash, actHash)
var kv1 []*types.KeyValue
kv1 = append(kv1, &types.KeyValue{Key: []byte("mk3"), Value: []byte("v3")})
kv1 = append(kv1, &types.KeyValue{Key: []byte("mk4"), Value: []byte("v4")})
datas1 := &types.StoreSet{
StateHash: hash,
KV: kv1,
Height: 1}
hash1, err := store.MemSet(datas1, true)
assert.Nil(t, err)
keys1 := [][]byte{[]byte("mk3"), []byte("mk4")}
get1 := &types.StoreGet{StateHash: hash1, Keys: keys1}
values1 := store.Get(get1)
assert.Len(t, values1, 2)
actHash, _ = store.Rollback(&types.ReqHash{Hash: hash1})
assert.Equal(t, hash1, actHash)
notExistHash, err = store.Rollback(&types.ReqHash{Hash: drivers.EmptyRoot[:]})
assert.Nil(t, notExistHash)
assert.Equal(t, types.ErrHashNotFound.Error(), err.Error())
}
func TestKvmvccdbRollbackBatch(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v1")})
kv = append(kv, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v2")})
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
hash, err := store.MemSet(datas, true)
assert.Nil(t, err)
var kvset []*types.KeyValue
req := &types.ReqHash{Hash: hash}
hash1 := make([]byte, len(hash))
copy(hash1, hash)
store.Commit(req)
// 设置分叉高度
kvmvccMavlFork = 50
defer func() {
kvmvccMavlFork = 200 * 10000
}()
for i := 1; i <= 202; i++ {
kvset = nil
datas1 := &types.StoreSet{StateHash: hash1, KV: datas.KV, Height: datas.Height + int64(i)}
s1 := fmt.Sprintf("v1-%03d", datas.Height+int64(i))
s2 := fmt.Sprintf("v2-%03d", datas.Height+int64(i))
datas.KV[0].Value = []byte(s1)
datas.KV[1].Value = []byte(s2)
hash1 = calcHash(datas1)
//zzh
//kmlog.Debug("KVMVCCStore MemSet AddMVCC", "prestatehash", common.ToHex(datas.StateHash), "hash", common.ToHex(hash), "height", datas.Height)
kmlog.Info("KVMVCCStore MemSet AddMVCC for 202", "prestatehash", common.ToHex(datas1.StateHash), "hash", common.ToHex(hash1), "height", datas1.Height)
kvlist, err := store.mvcc.AddMVCC(datas1.KV, hash1, datas1.StateHash, datas1.Height)
if err != nil {
kmlog.Info("KVMVCCStore MemSet AddMVCC failed for 202, continue")
continue
}
if len(kvlist) > 0 {
kvset = append(kvset, kvlist...)
}
store.kvsetmap[string(hash1)] = kvset
req := &types.ReqHash{Hash: hash1}
store.Commit(req)
}
maxVersion, err := store.mvcc.GetMaxVersion()
assert.Equal(t, err, nil)
assert.Equal(t, int64(202), maxVersion)
keys := [][]byte{[]byte("mk1"), []byte("mk2")}
get1 := &types.StoreGet{StateHash: hash, Keys: keys}
values := store.Get(get1)
assert.Len(t, values, 2)
assert.Equal(t, []byte("v1"), values[0])
assert.Equal(t, []byte("v2"), values[1])
var kv2 []*types.KeyValue
kv2 = append(kv2, &types.KeyValue{Key: []byte("mk1"), Value: []byte("v11")})
kv2 = append(kv2, &types.KeyValue{Key: []byte("mk2"), Value: []byte("v22")})
//触发批量回滚
datas2 := &types.StoreSet{StateHash: hash, KV: kv2, Height: 1}
hash, err = store.MemSet(datas2, true)
assert.Nil(t, err)
req = &types.ReqHash{Hash: hash}
store.Commit(req)
maxVersion, err = store.mvcc.GetMaxVersion()
assert.Equal(t, nil, err)
assert.Equal(t, int64(3), maxVersion)
get2 := &types.StoreGet{StateHash: hash, Keys: keys}
values2 := store.Get(get2)
assert.Len(t, values, 2)
assert.Equal(t, values2[0], kv2[0].Value)
assert.Equal(t, values2[1], kv2[1].Value)
datas3 := &types.StoreSet{StateHash: hash, KV: kv2, Height: 2}
hash, err = store.MemSet(datas3, true)
assert.Nil(t, err)
req = &types.ReqHash{Hash: hash}
store.Commit(req)
maxVersion, err = store.mvcc.GetMaxVersion()
assert.Equal(t, nil, err)
assert.Equal(t, int64(2), maxVersion)
}
func enableConfig() []byte {
data, _ := json.Marshal(&subConfig{EnableMVCCIter: true})
return data
}
func TestIterateRangeByStateHash(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg, sub := newStoreCfgIter(dir)
store := New(storeCfg, sub).(*KVmMavlStore)
assert.NotNil(t, store)
execaddr := "0111vcBNSEA7fZhAdLJphDwQRQJa111"
addr := "06htvcBNSEA7fZhAdLJphDwQRQJaHpy"
addr1 := "16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"
addr2 := "26htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"
addr3 := "36htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"
addr4 := "46htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"
accCoin := account.NewCoinsAccount()
account1 := &types.Account{
Balance: 1000 * 1e8,
Addr: addr1,
}
account2 := &types.Account{
Balance: 900 * 1e8,
Addr: addr2,
}
account3 := &types.Account{
Balance: 800 * 1e8,
Addr: addr3,
}
account4 := &types.Account{
Balance: 700 * 1e8,
Addr: addr4,
}
set1 := accCoin.GetKVSet(account1)
set2 := accCoin.GetKVSet(account2)
set3 := accCoin.GetKVSet(account3)
set4 := accCoin.GetKVSet(account4)
set5 := accCoin.GetExecKVSet(execaddr, account4)
fmt.Println("---test case1-1 ---")
var kv []*types.KeyValue
kv = append(kv, &types.KeyValue{Key: set4[0].GetKey(), Value: set4[0].GetValue()})
kv = append(kv, &types.KeyValue{Key: set3[0].GetKey(), Value: set3[0].GetValue()})
kv = append(kv, &types.KeyValue{Key: set1[0].GetKey(), Value: set1[0].GetValue()})
kv = append(kv, &types.KeyValue{Key: set2[0].GetKey(), Value: set2[0].GetValue()})
kv = append(kv, &types.KeyValue{Key: set5[0].GetKey(), Value: set5[0].GetValue()})
for i := 0; i < len(kv); i++ {
fmt.Println("key:", string(kv[i].Key), "value:", string(kv[i].Value))
}
datas := &types.StoreSet{StateHash: drivers.EmptyRoot[:], KV: kv, Height: 0}
hash, err := store.MemSet(datas, true)
assert.Nil(t, err)
var kvset []*types.KeyValue
req := &types.ReqHash{Hash: hash}
hash1 := make([]byte, len(hash))
copy(hash1, hash)
store.Commit(req)
resp := &types.ReplyGetTotalCoins{}
resp.Count = 100000
store.IterateRangeByStateHash(hash, []byte("mavl-coins-bty-"), []byte("mavl-coins-bty-exec"), true, resp.IterateRangeByStateHash)
fmt.Println("resp.Num=", resp.Num)
fmt.Println("resp.Amount=", resp.Amount)
assert.Equal(t, int64(4), resp.Num)
assert.Equal(t, int64(340000000000), resp.Amount)
// 设置分叉高度
kvmvccMavlFork = 5
defer func() {
kvmvccMavlFork = 200 * 10000
}()
fmt.Println("---test case1-2 ---")
firstForkHash := drivers.EmptyRoot[:]
for i := 1; i <= 10; i++ {
kvset = nil
s1 := fmt.Sprintf("%03d", 11-i)
addrx := addr + s1
account := &types.Account{
Balance: ((1000 + int64(i)) * 1e8),
Addr: addrx,
}
set := accCoin.GetKVSet(account)
fmt.Println("key:", string(set[0].GetKey()), "value:", set[0].GetValue())
kvset = append(kvset, &types.KeyValue{Key: set[0].GetKey(), Value: set[0].GetValue()})
datas1 := &types.StoreSet{StateHash: hash1, KV: kvset, Height: datas.Height + int64(i)}
hash1, err = store.MemSet(datas1, true)
assert.Nil(t, err)
req := &types.ReqHash{Hash: hash1}
store.Commit(req)
if int(kvmvccMavlFork) == i {
firstForkHash = hash1
}
}
resp = &types.ReplyGetTotalCoins{}
resp.Count = 100000
store.IterateRangeByStateHash(hash1, []byte("mavl-coins-bty-"), []byte("mavl-coins-bty-exec"), true, resp.IterateRangeByStateHash)
fmt.Println("resp.Num=", resp.Num)
fmt.Println("resp.Amount=", resp.Amount)
assert.Equal(t, int64(14), resp.Num)
assert.Equal(t, int64(1345500000000), resp.Amount)
fmt.Println("---test case1-3 ---")
resp = &types.ReplyGetTotalCoins{}
resp.Count = 100000
store.IterateRangeByStateHash(hash1, []byte("mavl-coins-bty-06htvcBNSEA7fZhAdLJphDwQRQJaHpy003"), []byte("mavl-coins-bty-exec"), true, resp.IterateRangeByStateHash)
fmt.Println("resp.Num=", resp.Num)
fmt.Println("resp.Amount=", resp.Amount)
assert.Equal(t, int64(12), resp.Num)
assert.Equal(t, int64(1143600000000), resp.Amount)
fmt.Println("---test case1-4 ---")
resp = &types.ReplyGetTotalCoins{}
resp.Count = 2
store.IterateRangeByStateHash(hash1, []byte("mavl-coins-bty-06htvcBNSEA7fZhAdLJphDwQRQJaHpy003"), []byte("mavl-coins-bty-exec"), true, resp.IterateRangeByStateHash)
fmt.Println("resp.Num=", resp.Num)
fmt.Println("resp.Amount=", resp.Amount)
assert.Equal(t, int64(2), resp.Num)
assert.Equal(t, int64(201500000000), resp.Amount)
fmt.Println("---test case1-5 ---")
resp = &types.ReplyGetTotalCoins{}
resp.Count = 2
store.IterateRangeByStateHash(hash1, []byte("mavl-coins-bty-"), []byte("mavl-coins-bty-exec"), true, resp.IterateRangeByStateHash)
fmt.Println("resp.Num=", resp.Num)
fmt.Println("resp.Amount=", resp.Amount)
assert.Equal(t, int64(2), resp.Num)
assert.Equal(t, int64(201900000000), resp.Amount)
fmt.Println("---test case1-6 ---")
resp = &types.ReplyGetTotalCoins{}
resp.Count = 10000
store.IterateRangeByStateHash(firstForkHash, []byte("mavl-coins-bty-"), []byte("mavl-coins-bty-exec"), true, resp.IterateRangeByStateHash)
fmt.Println("resp.Num=", resp.Num)
fmt.Println("resp.Amount=", resp.Amount)
assert.Equal(t, int64(0), resp.Num)
assert.Equal(t, int64(0), resp.Amount)
}
func GetRandomString(length int) string {
return common.GetRandPrintString(20, length)
}
func TestDelMavlData(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg := newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
db := store.GetDB()
db.Set([]byte(mvccPrefix), []byte("value1"))
db.Set([]byte(fmt.Sprintf("%s123", mvccPrefix)), []byte("value2"))
db.Set([]byte(fmt.Sprintf("%s546", mvccPrefix)), []byte("value3"))
db.Set([]byte(fmt.Sprintf("123%s", mvccPrefix)), []byte("value4"))
db.Set([]byte("key11"), []byte("value11"))
db.Set([]byte("key22"), []byte("value22"))
quit = false
delMavlData(db)
v, err := db.Get([]byte(mvccPrefix))
require.NoError(t, err)
require.Equal(t, []byte("value1"), v)
v, err = db.Get([]byte(fmt.Sprintf("%s123", mvccPrefix)))
require.NoError(t, err)
require.Equal(t, []byte("value2"), v)
v, err = db.Get([]byte(fmt.Sprintf("%s546", mvccPrefix)))
require.NoError(t, err)
require.Equal(t, []byte("value3"), v)
_, err = db.Get([]byte(fmt.Sprintf("123%s", mvccPrefix)))
require.Error(t, err)
_, err = db.Get([]byte("key11"))
require.Error(t, err)
_, err = db.Get([]byte("key22"))
require.Error(t, err)
_, err = db.Get(genDelMavlKey(mvccPrefix))
require.NoError(t, err)
}
func TestPruning(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg := newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
kvmvccStore := NewKVMVCC(&subKVMVCCConfig{}, store.GetDB())
SetPruneHeight(10)
defer SetPruneHeight(0)
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
var hashes [][]byte
for i := 0; i < 100; i++ {
datas.Height = int64(i)
value = fmt.Sprintf("vv%d", i)
for j := 0; j < 30; j++ {
datas.KV[j].Value = []byte(value)
}
hash, err := kvmvccStore.MemSet(datas, nil, true)
require.NoError(t, err)
req := &types.ReqHash{
Hash: hash,
}
_, err = kvmvccStore.Commit(req)
require.NoError(t, err)
datas.StateHash = hash
hashes = append(hashes, hash)
}
pruningMVCC(store.GetDB(), 99)
//check
getDatas := &types.StoreGet{
StateHash: drivers.EmptyRoot[:],
Keys: keys,
}
for i := 0; i < len(hashes); i++ {
getDatas.StateHash = hashes[i]
values := store.Get(getDatas)
value = fmt.Sprintf("vv%d", i)
if i < 80 {
for _, v := range values {
require.Equal(t, []byte(nil), v)
}
}
if i > 90 {
for _, v := range values {
require.Equal(t, []byte(value), v)
}
}
}
}
func TestGetKeyVersion(t *testing.T) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(t, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg := newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(t, store)
mvcc := dbm.NewMVCC(store.GetDB())
kvs := []*types.KeyValue{
{Key: []byte("5"), Value: []byte("11")},
{Key: []byte("123"), Value: []byte("111")},
{Key: []byte(""), Value: []byte("1111")},
}
hash := []byte("12345678901234567890123456789012")
vsnkv, err := mvcc.AddMVCC(kvs, hash, nil, 0)
require.NoError(t, err)
for _, kv := range vsnkv {
if bytes.Contains(kv.Key, mvccData) && bytes.Contains(kv.Key, kvs[0].Key) {
k, h, err := getKeyVersion(kv.Key)
require.NoError(t, err)
require.Equal(t, k, kvs[0].Key)
require.Equal(t, h, int64(0))
continue
}
if bytes.Contains(kv.Key, mvccData) && bytes.Contains(kv.Key, kvs[1].Key) {
k, h, err := getKeyVersion(kv.Key)
require.NoError(t, err)
require.Equal(t, k, kvs[1].Key)
require.Equal(t, h, int64(0))
continue
}
if bytes.Contains(kv.Key, mvccData) {
k, h, err := getKeyVersion(kv.Key)
require.NoError(t, err)
require.Equal(t, k, kvs[2].Key)
require.Equal(t, h, int64(0))
}
}
}
func BenchmarkGetkmvccMavl(b *testing.B) { benchmarkGet(b, false) }
func BenchmarkGetkmvcc(b *testing.B) { benchmarkGet(b, true) }
func benchmarkGet(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var keys [][]byte
var hash = drivers.EmptyRoot[:]
for i := 0; i < b.N; i++ {
key := GetRandomString(MaxKeylenth)
value := fmt.Sprintf("%s%d", key, i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
if i%10000 == 0 {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
hash, err = store.Set(datas, true)
assert.Nil(b, err)
kv = nil
}
}
if kv != nil {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
hash, err = store.Set(datas, true)
assert.Nil(b, err)
//kv = nil
}
assert.Nil(b, err)
start := time.Now()
b.ResetTimer()
for _, key := range keys {
getData := &types.StoreGet{
StateHash: hash,
Keys: [][]byte{key}}
store.Get(getData)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkGet cost time is", end.Sub(start), "num is", b.N)
}
func BenchmarkStoreGetKvs4NkmvccMavl(b *testing.B) { benchmarkStoreGetKvs4N(b, false) }
func BenchmarkStoreGetKvs4Nkmvcc(b *testing.B) { benchmarkStoreGetKvs4N(b, true) }
func benchmarkStoreGetKvs4N(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
kvnum := 30
for i := 0; i < kvnum; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
hash, err := store.Set(datas, true)
assert.Nil(b, err)
getData := &types.StoreGet{
StateHash: hash,
Keys: keys}
start := time.Now()
b.ResetTimer()
for i := 0; i < b.N; i++ {
values := store.Get(getData)
assert.Len(b, values, kvnum)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkStoreGetKvs4N cost time is", end.Sub(start), "num is", b.N)
b.StopTimer()
}
func BenchmarkStoreGetKvsForNNkmvccMavl(b *testing.B) { benchmarkStoreGetKvsForNN(b, false) }
func BenchmarkStoreGetKvsForNNkmvcc(b *testing.B) { benchmarkStoreGetKvsForNN(b, true) }
func benchmarkStoreGetKvsForNN(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
var hashes [][]byte
for i := 0; i < b.N; i++ {
datas.Height = int64(i)
value = fmt.Sprintf("vv%d", i)
for j := 0; j < 30; j++ {
datas.KV[j].Value = []byte(value)
}
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
req := &types.ReqHash{
Hash: hash,
}
_, err = store.Commit(req)
assert.NoError(b, err, "NoError")
datas.StateHash = hash
hashes = append(hashes, hash)
}
start := time.Now()
b.ResetTimer()
getData := &types.StoreGet{
StateHash: hashes[0],
Keys: keys}
for i := 0; i < b.N; i++ {
getData.StateHash = hashes[i]
store.Get(getData)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkStoreGetKvsForNN cost time is", end.Sub(start), "num is", b.N)
b.StopTimer()
}
func BenchmarkStoreGetKvsFor10000kmvccMavl(b *testing.B) { benchmarkStoreGetKvsFor10000(b, false) }
func BenchmarkStoreGetKvsFor10000kmvcc(b *testing.B) { benchmarkStoreGetKvsFor10000(b, true) }
func benchmarkStoreGetKvsFor10000(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
var hashes [][]byte
blocks := 10000
times := 10000
start1 := time.Now()
for i := 0; i < blocks; i++ {
datas.Height = int64(i)
value = fmt.Sprintf("vv%d", i)
for j := 0; j < 30; j++ {
datas.KV[j].Value = []byte(value)
}
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
req := &types.ReqHash{
Hash: hash,
}
_, err = store.Commit(req)
assert.NoError(b, err, "NoError")
datas.StateHash = hash
hashes = append(hashes, hash)
}
end1 := time.Now()
start := time.Now()
b.ResetTimer()
getData := &types.StoreGet{
StateHash: hashes[0],
Keys: keys}
for i := 0; i < times; i++ {
getData.StateHash = hashes[i]
store.Get(getData)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkStoreGetKvsFor10000 MemSet&Commit cost time is ", end1.Sub(start1), "blocks is", blocks)
fmt.Println("kvmvcc BenchmarkStoreGetKvsFor10000 Get cost time is", end.Sub(start), "num is ", times, ",blocks is ", blocks)
b.StopTimer()
}
func BenchmarkGetIterkmvccMavl(b *testing.B) { benchmarkGetIter(b, false) }
func BenchmarkGetIterkmvcc(b *testing.B) { benchmarkGetIter(b, true) }
func benchmarkGetIter(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg, sub := newStoreCfgIter(dir)
store := New(storeCfg, sub).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var keys [][]byte
var hash = drivers.EmptyRoot[:]
for i := 0; i < b.N; i++ {
key := GetRandomString(MaxKeylenth)
value := fmt.Sprintf("%s%d", key, i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
if i%10000 == 0 {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
hash, err = store.Set(datas, true)
assert.Nil(b, err)
kv = nil
}
}
if kv != nil {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
hash, err = store.Set(datas, true)
assert.Nil(b, err)
//kv = nil
}
assert.Nil(b, err)
start := time.Now()
b.ResetTimer()
for _, key := range keys {
getData := &types.StoreGet{
StateHash: hash,
Keys: [][]byte{key}}
store.Get(getData)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkGet cost time is", end.Sub(start), "num is", b.N)
}
func BenchmarkSetkmvccMavl(b *testing.B) { benchmarkSet(b, false) }
func BenchmarkSetkmvcc(b *testing.B) { benchmarkSet(b, true) }
func benchmarkSet(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
b.Log(dir)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var keys [][]byte
var hash = drivers.EmptyRoot[:]
start := time.Now()
for i := 0; i < b.N; i++ {
key := GetRandomString(MaxKeylenth)
value := fmt.Sprintf("%s%d", key, i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
if i%10000 == 0 {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
hash, err = store.Set(datas, true)
assert.Nil(b, err)
kv = nil
}
}
if kv != nil {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
_, err = store.Set(datas, true)
assert.Nil(b, err)
//kv = nil
}
end := time.Now()
fmt.Println("mpt BenchmarkSet cost time is", end.Sub(start), "num is", b.N)
}
//上一个用例,一次性插入多对kv;本用例每次插入30对kv,分多次插入,测试性能表现。
func BenchmarkStoreSetkmvccMavl(b *testing.B) { benchmarkStoreSet(b, false) }
func BenchmarkStoreSetkmvcc(b *testing.B) { benchmarkStoreSet(b, true) }
func benchmarkStoreSet(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
for i := 0; i < b.N; i++ {
hash, err := store.Set(datas, true)
assert.Nil(b, err)
assert.NotNil(b, hash)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkSet cost time is", end.Sub(start), "num is", b.N)
}
func BenchmarkSetIterkmvccMavl(b *testing.B) { benchmarkSetIter(b, false) }
func BenchmarkSetIterkmvcc(b *testing.B) { benchmarkSetIter(b, true) }
func benchmarkSetIter(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg, sub := newStoreCfgIter(dir)
store := New(storeCfg, sub).(*KVmMavlStore)
assert.NotNil(b, store)
b.Log(dir)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var keys [][]byte
var hash = drivers.EmptyRoot[:]
start := time.Now()
for i := 0; i < b.N; i++ {
key := GetRandomString(MaxKeylenth)
value := fmt.Sprintf("%s%d", key, i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
if i%10000 == 0 {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
hash, err = store.Set(datas, true)
assert.Nil(b, err)
kv = nil
}
}
if kv != nil {
datas := &types.StoreSet{StateHash: hash, KV: kv, Height: 0}
_, err = store.Set(datas, true)
assert.Nil(b, err)
//kv = nil
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkSet cost time is", end.Sub(start), "num is", b.N)
}
//一次设定多对kv,测试一次的时间/多少对kv,来算平均一对kv的耗时。
func BenchmarkMemSetkmvccMavl(b *testing.B) { benchmarkMemSet(b, false) }
func BenchmarkMemSetkmvcc(b *testing.B) { benchmarkMemSet(b, true) }
func benchmarkMemSet(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < b.N; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
assert.NotNil(b, hash)
end := time.Now()
fmt.Println("kvmvcc BenchmarkMemSet cost time is", end.Sub(start), "num is", b.N)
}
//一次设定30对kv,设定N次,计算每次设定30对kv的耗时。
func BenchmarkStoreMemSetkmvccMavl(b *testing.B) { benchmarkStoreMemSet(b, false) }
func BenchmarkStoreMemSetkmvcc(b *testing.B) { benchmarkStoreMemSet(b, true) }
func benchmarkStoreMemSet(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
for i := 0; i < b.N; i++ {
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
assert.NotNil(b, hash)
req := &types.ReqHash{
Hash: hash}
store.Rollback(req)
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkStoreMemSet cost time is", end.Sub(start), "num is", b.N)
}
func BenchmarkCommitkmvccMavl(b *testing.B) { benchmarkCommit(b, false) }
func BenchmarkCommitkmvcc(b *testing.B) { benchmarkCommit(b, true) }
func benchmarkCommit(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < b.N; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
req := &types.ReqHash{
Hash: hash,
}
_, err = store.Commit(req)
assert.NoError(b, err, "NoError")
end := time.Now()
fmt.Println("kvmvcc BenchmarkCommit cost time is", end.Sub(start), "num is", b.N)
b.StopTimer()
}
func BenchmarkStoreCommitkmvccMavl(b *testing.B) { benchmarkStoreCommit(b, false) }
func BenchmarkStoreCommitkmvcc(b *testing.B) { benchmarkStoreCommit(b, true) }
func benchmarkStoreCommit(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
var storeCfg = newStoreCfg(dir)
store := New(storeCfg, nil).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < 30; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
for i := 0; i < b.N; i++ {
datas.Height = int64(i)
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
req := &types.ReqHash{
Hash: hash,
}
_, err = store.Commit(req)
assert.NoError(b, err, "NoError")
datas.StateHash = hash
}
end := time.Now()
fmt.Println("kvmvcc BenchmarkStoreCommit cost time is", end.Sub(start), "num is", b.N)
b.StopTimer()
}
func BenchmarkIterMemSetkmvccMavl(b *testing.B) { benchmarkIterMemSet(b, false) }
func BenchmarkIterMemSetkmvcc(b *testing.B) { benchmarkIterMemSet(b, true) }
//一次设定多对kv,测试一次的时间/多少对kv,来算平均一对kv的耗时。
func benchmarkIterMemSet(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg, sub := newStoreCfgIter(dir)
store := New(storeCfg, sub).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < b.N; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
assert.NotNil(b, hash)
end := time.Now()
fmt.Println("kvmvcc BenchmarkMemSet cost time is", end.Sub(start), "num is", b.N)
}
func BenchmarkIterCommitkmvccMavl(b *testing.B) { benchmarkIterCommit(b, false) }
func BenchmarkIterCommitkmvcc(b *testing.B) { benchmarkIterCommit(b, true) }
func benchmarkIterCommit(b *testing.B, isResetForkHeight bool) {
dir, err := ioutil.TempDir("", "example")
assert.Nil(b, err)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
storeCfg, sub := newStoreCfgIter(dir)
store := New(storeCfg, sub).(*KVmMavlStore)
assert.NotNil(b, store)
if isResetForkHeight {
kvmvccMavlFork = 0
defer func() {
kvmvccMavlFork = 200 * 10000
}()
}
var kv []*types.KeyValue
var key string
var value string
var keys [][]byte
for i := 0; i < b.N; i++ {
key = GetRandomString(MaxKeylenth)
value = fmt.Sprintf("v%d", i)
keys = append(keys, []byte(string(key)))
kv = append(kv, &types.KeyValue{Key: []byte(string(key)), Value: []byte(string(value))})
}
datas := &types.StoreSet{
StateHash: drivers.EmptyRoot[:],
KV: kv,
Height: 0}
start := time.Now()
b.ResetTimer()
hash, err := store.MemSet(datas, true)
assert.Nil(b, err)
req := &types.ReqHash{
Hash: hash,
}
_, err = store.Commit(req)
assert.NoError(b, err, "NoError")
end := time.Now()
fmt.Println("kvmvcc BenchmarkCommit cost time is", end.Sub(start), "num is", b.N)
b.StopTimer()
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kvmvccmavl
import (
"bytes"
"fmt"
"strconv"
"sync/atomic"
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto"
)
const (
pruningStateStart = 1
pruningStateEnd = 0
onceScanCount = 10000 // 单次扫描数目
onceCount = 1000 // 容器长度
levelPruningHeight = 100 * 10000
)
var (
maxRollbackNum = 200
// 是否开启裁剪
enablePrune bool
// 每个10000裁剪一次
pruneHeight = 10000
pruningState int32
)
var (
//同common/db中的mvcc相关的定义保持一致
mvccPrefix = []byte(".-mvcc-.")
//mvccMeta = append(mvccPrefix, []byte("m.")...)
mvccData = append(mvccPrefix, []byte("d.")...)
//mvccLast = append(mvccPrefix, []byte("l.")...)
//mvccMetaVersion = append(mvccMeta, []byte("version.")...)
//mvccMetaVersionKeyList = append(mvccMeta, []byte("versionkl.")...)
)
// KVMVCCStore provide kvmvcc store interface implementation
type KVMVCCStore struct {
db dbm.DB
mvcc dbm.MVCC
kvsetmap map[string][]*types.KeyValue
enableMVCCIter bool
enableMavlPrune bool
pruneHeight int32
}
// NewKVMVCC construct KVMVCCStore module
func NewKVMVCC(sub *subKVMVCCConfig, db dbm.DB) *KVMVCCStore {
var kvs *KVMVCCStore
enable := false
if sub != nil {
enable = sub.EnableMVCCIter
}
if enable {
kvs = &KVMVCCStore{db, dbm.NewMVCCIter(db), make(map[string][]*types.KeyValue),
true, sub.EnableMavlPrune, sub.PruneHeight}
} else {
kvs = &KVMVCCStore{db, dbm.NewMVCC(db), make(map[string][]*types.KeyValue),
false, sub.EnableMavlPrune, sub.PruneHeight}
}
EnablePrune(sub.EnableMavlPrune)
SetPruneHeight(int(sub.PruneHeight))
return kvs
}
// Close the KVMVCCStore module
func (mvccs *KVMVCCStore) Close() {
kmlog.Info("store kvdb closed")
}
// Set kvs with statehash to KVMVCCStore
func (mvccs *KVMVCCStore) Set(datas *types.StoreSet, hash []byte, sync bool) ([]byte, error) {
if hash == nil {
hash = calcHash(datas)
}
kvlist, err := mvccs.mvcc.AddMVCC(datas.KV, hash, datas.StateHash, datas.Height)
if err != nil {
return nil, err
}
mvccs.saveKVSets(kvlist)
return hash, nil
}
// Get kvs with statehash from KVMVCCStore
func (mvccs *KVMVCCStore) Get(datas *types.StoreGet) [][]byte {
values := make([][]byte, len(datas.Keys))
version, err := mvccs.mvcc.GetVersion(datas.StateHash)
if err != nil {
kmlog.Error("Get version by hash failed.", "hash", common.ToHex(datas.StateHash))
return values
}
for i := 0; i < len(datas.Keys); i++ {
value, err := mvccs.mvcc.GetV(datas.Keys[i], version)
if err != nil {
//kmlog.Error("GetV by Keys failed.", "Key", string(datas.Keys[i]), "version", version)
} else if value != nil {
values[i] = value
}
}
return values
}
// MemSet set kvs to the mem of KVMVCCStore module and return the StateHash
func (mvccs *KVMVCCStore) MemSet(datas *types.StoreSet, hash []byte, sync bool) ([]byte, error) {
kvset, err := mvccs.checkVersion(datas.Height)
if err != nil {
return nil, err
}
if hash == nil {
hash = calcHash(datas)
}
//kmlog.Debug("KVMVCCStore MemSet AddMVCC", "prestatehash", common.ToHex(datas.StateHash), "hash", common.ToHex(hash), "height", datas.Height)
kvlist, err := mvccs.mvcc.AddMVCC(datas.KV, hash, datas.StateHash, datas.Height)
if err != nil {
return nil, err
}
if len(kvlist) > 0 {
kvset = append(kvset, kvlist...)
}
mvccs.kvsetmap[string(hash)] = kvset
// 进行裁剪
if enablePrune && !isPruning() &&
pruneHeight != 0 &&
datas.Height%int64(pruneHeight) == 0 &&
datas.Height/int64(pruneHeight) > 1 {
wg.Add(1)
go pruning(mvccs.db, datas.Height)
}
return hash, nil
}
// Commit kvs in the mem of KVMVCCStore module to state db and return the StateHash
func (mvccs *KVMVCCStore) Commit(req *types.ReqHash) ([]byte, error) {
_, ok := mvccs.kvsetmap[string(req.Hash)]
if !ok {
kmlog.Error("store kvmvcc commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
//kmlog.Debug("KVMVCCStore Commit saveKVSets", "hash", common.ToHex(req.Hash))
mvccs.saveKVSets(mvccs.kvsetmap[string(req.Hash)])
delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil
}
// Rollback kvs in the mem of KVMVCCStore module and return the StateHash
func (mvccs *KVMVCCStore) Rollback(req *types.ReqHash) ([]byte, error) {
_, ok := mvccs.kvsetmap[string(req.Hash)]
if !ok {
kmlog.Error("store kvmvcc rollback", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
//kmlog.Debug("KVMVCCStore Rollback", "hash", common.ToHex(req.Hash))
delete(mvccs.kvsetmap, string(req.Hash))
return req.Hash, nil
}
// IterateRangeByStateHash travel with Prefix by StateHash to get the latest version kvs.
func (mvccs *KVMVCCStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
if !mvccs.enableMVCCIter {
panic("call IterateRangeByStateHash when disable mvcc iter")
}
//按照kv最新值来进行遍历处理,要求statehash必须是最新区块的statehash,否则不支持该接口
maxVersion, err := mvccs.mvcc.GetMaxVersion()
if err != nil {
kmlog.Error("KVMVCCStore IterateRangeByStateHash can't get max version, ignore the call.", "err", err)
return
}
version, err := mvccs.mvcc.GetVersion(statehash)
if err != nil {
kmlog.Error("KVMVCCStore IterateRangeByStateHash can't get version, ignore the call.", "stateHash", common.ToHex(statehash), "err", err)
return
}
if version != maxVersion {
kmlog.Error("KVMVCCStore IterateRangeByStateHash call failed for maxVersion does not match version.", "maxVersion", maxVersion, "version", version, "stateHash", common.ToHex(statehash))
return
}
//kmlog.Info("KVMVCCStore do the IterateRangeByStateHash")
listhelper := dbm.NewListHelper(mvccs.mvcc.(*dbm.MVCCIter))
listhelper.IteratorCallback(start, end, 0, 1, fn)
}
// ProcEvent handles supported events
func (mvccs *KVMVCCStore) ProcEvent(msg queue.Message) {
msg.ReplyErr("KVStore", types.ErrActionNotSupport)
}
// Del set kvs to nil with StateHash
func (mvccs *KVMVCCStore) Del(req *types.StoreDel) ([]byte, error) {
kvset, err := mvccs.mvcc.DelMVCC(req.StateHash, req.Height, true)
if err != nil {
kmlog.Error("store kvmvcc del", "err", err)
return nil, err
}
kmlog.Info("KVMVCCStore Del", "hash", common.ToHex(req.StateHash), "height", req.Height)
mvccs.saveKVSets(kvset)
return req.StateHash, nil
}
func (mvccs *KVMVCCStore) saveKVSets(kvset []*types.KeyValue) {
if len(kvset) == 0 {
return
}
storeBatch := mvccs.db.NewBatch(true)
for i := 0; i < len(kvset); i++ {
if kvset[i].Value == nil {
storeBatch.Delete(kvset[i].Key)
} else {
storeBatch.Set(kvset[i].Key, kvset[i].Value)
}
}
storeBatch.Write()
}
func (mvccs *KVMVCCStore) checkVersion(height int64) ([]*types.KeyValue, error) {
//检查新加入区块的height和现有的version的关系,来判断是否要回滚数据
maxVersion, err := mvccs.mvcc.GetMaxVersion()
if err != nil {
if err != types.ErrNotFound {
kmlog.Error("store kvmvcc checkVersion GetMaxVersion failed", "err", err)
panic(err)
} else {
maxVersion = -1
}
}
//kmlog.Debug("store kvmvcc checkVersion ", "maxVersion", maxVersion, "currentVersion", height)
var kvset []*types.KeyValue
if maxVersion < height-1 {
kmlog.Error("store kvmvcc checkVersion found statehash lost", "maxVersion", maxVersion, "height", height)
return nil, ErrStateHashLost
} else if maxVersion == height-1 {
return nil, nil
} else {
count := 1
for i := maxVersion; i >= height; i-- {
hash, err := mvccs.mvcc.GetVersionHash(i)
if err != nil {
kmlog.Warn("store kvmvcc checkVersion GetVersionHash failed", "height", i, "maxVersion", maxVersion)
continue
}
kvlist, err := mvccs.mvcc.DelMVCC(hash, i, false)
if err != nil {
kmlog.Warn("store kvmvcc checkVersion DelMVCC failed", "height", i, "err", err)
continue
}
kvset = append(kvset, kvlist...)
kmlog.Debug("store kvmvcc checkVersion DelMVCC4Height", "height", i, "maxVersion", maxVersion)
//为避免高度差过大时出现异常,做一个保护,一次最多回滚200个区块
count++
if count >= maxRollbackNum {
break
}
}
}
return kvset, nil
}
func calcHash(datas proto.Message) []byte {
b := types.Encode(datas)
return common.Sha256(b)
}
/*裁剪-------------------------------------------*/
// EnablePrune 使能裁剪
func EnablePrune(enable bool) {
enablePrune = enable
}
// SetPruneHeight 设置每次裁剪高度
func SetPruneHeight(height int) {
pruneHeight = height
}
func pruning(db dbm.DB, height int64) {
defer wg.Done()
pruningMVCC(db, height)
}
func pruningMVCC(db dbm.DB, height int64) {
setPruning(pruningStateStart)
defer setPruning(pruningStateEnd)
pruningFirst(db, height)
}
func pruningFirst(db dbm.DB, curHeight int64) {
it := db.Iterator(mvccData, nil, true)
defer it.Close()
var mp map[string][]int64
count := 0
batch := db.NewBatch(true)
for it.Rewind(); it.Valid(); it.Next() {
if quit {
//该处退出
return
}
if mp == nil {
mp = make(map[string][]int64, onceCount)
}
key, height, err := getKeyVersion(it.Key())
if err != nil {
continue
}
if curHeight < height+levelPruningHeight &&
curHeight >= height+int64(pruneHeight) {
mp[string(key)] = append(mp[string(key)], height)
count++
}
if len(mp) >= onceCount-1 || count > onceScanCount {
deleteOldKV(mp, curHeight, batch)
mp = nil
count = 0
}
}
if len(mp) > 0 {
deleteOldKV(mp, curHeight, batch)
mp = nil
_ = mp
}
}
func deleteOldKV(mp map[string][]int64, curHeight int64, batch dbm.Batch) {
if len(mp) == 0 {
return
}
batch.Reset()
for key, vals := range mp {
if len(vals) > 1 && vals[1] != vals[0] { //防止相同高度时候出现的误删除
for _, val := range vals[1:] { //从第二个开始判断
if curHeight >= val+int64(pruneHeight) {
batch.Delete(genKeyVersion([]byte(key), val)) // 删除老版本key
if batch.ValueSize() > batchDataSize {
batch.Write()
batch.Reset()
}
}
}
}
delete(mp, key)
}
batch.Write()
}
func genKeyVersion(key []byte, height int64) []byte {
b := append([]byte{}, mvccData...)
newkey := append(b, key...)
newkey = append(newkey, []byte(".")...)
newkey = append(newkey, pad(height)...)
return newkey
}
func getKeyVersion(vsnKey []byte) ([]byte, int64, error) {
if !bytes.Contains(vsnKey, mvccData) {
return nil, 0, types.ErrSize
}
if len(vsnKey) < len(mvccData)+1+20 {
return nil, 0, types.ErrSize
}
sLen := vsnKey[len(vsnKey)-20:]
iLen, err := strconv.Atoi(string(sLen))
if err != nil {
return nil, 0, types.ErrSize
}
k := bytes.TrimPrefix(vsnKey, mvccData)
key := k[:len(k)-1-20]
return key, int64(iLen), nil
}
func pad(version int64) []byte {
s := fmt.Sprintf("%020d", version)
return []byte(s)
}
func isPruning() bool {
return atomic.LoadInt32(&pruningState) == 1
}
func setPruning(state int32) {
atomic.StoreInt32(&pruningState, state)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package kvmvccmavl
import (
"sync"
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/system/store/mavl/db"
"github.com/33cn/chain33/types"
)
// MavlStore mavl store struct
type MavlStore struct {
db dbm.DB
trees *sync.Map
enableMavlPrefix bool
enableMVCC bool
enableMavlPrune bool
pruneHeight int32
}
// NewMavl new mavl store module
func NewMavl(sub *subMavlConfig, db dbm.DB) *MavlStore {
var subcfg subMavlConfig
if sub != nil {
subcfg.EnableMavlPrefix = sub.EnableMavlPrefix
subcfg.EnableMVCC = sub.EnableMVCC
subcfg.EnableMavlPrune = sub.EnableMavlPrune
subcfg.PruneHeight = sub.PruneHeight
}
mavls := &MavlStore{db, &sync.Map{}, subcfg.EnableMavlPrefix, subcfg.EnableMVCC, subcfg.EnableMavlPrune, subcfg.PruneHeight}
mavl.EnableMavlPrefix(subcfg.EnableMavlPrefix)
mavl.EnableMVCC(subcfg.EnableMVCC)
mavl.EnablePrune(subcfg.EnableMavlPrune)
mavl.SetPruneHeight(int(subcfg.PruneHeight))
return mavls
}
// Close close mavl store
func (mavls *MavlStore) Close() {
mavl.ClosePrune()
kmlog.Info("store mavl closed")
}
// Set set k v to mavl store db; sync is true represent write sync
func (mavls *MavlStore) Set(datas *types.StoreSet, sync bool) ([]byte, error) {
return mavl.SetKVPair(mavls.db, datas, sync)
}
// Get get values by keys
func (mavls *MavlStore) Get(datas *types.StoreGet) [][]byte {
var tree *mavl.Tree
var err error
values := make([][]byte, len(datas.Keys))
search := string(datas.StateHash)
if data, ok := mavls.trees.Load(search); ok {
tree = data.(*mavl.Tree)
} else {
tree = mavl.NewTree(mavls.db, true)
//get接口也应该传入高度
//tree.SetBlockHeight(datas.Height)
err = tree.Load(datas.StateHash)
kmlog.Debug("store mavl get tree", "err", err, "StateHash", common.ToHex(datas.StateHash))
}
if err == nil {
for i := 0; i < len(datas.Keys); i++ {
_, value, exit := tree.Get(datas.Keys[i])
if exit {
values[i] = value
}
}
}
return values
}
// MemSet set keys values to memcory mavl, return root hash and error
func (mavls *MavlStore) MemSet(datas *types.StoreSet, sync bool) ([]byte, error) {
beg := types.Now()
defer func() {
kmlog.Info("MemSet", "cost", types.Since(beg))
}()
if len(datas.KV) == 0 {
kmlog.Info("store mavl memset,use preStateHash as stateHash for kvset is null")
mavls.trees.Store(string(datas.StateHash), nil)
return datas.StateHash, nil
}
tree := mavl.NewTree(mavls.db, sync)
tree.SetBlockHeight(datas.Height)
err := tree.Load(datas.StateHash)
if err != nil {
return nil, err
}
for i := 0; i < len(datas.KV); i++ {
tree.Set(datas.KV[i].Key, datas.KV[i].Value)
}
hash := tree.Hash()
mavls.trees.Store(string(hash), tree)
return hash, nil
}
// Commit convert memcory mavl to storage db
func (mavls *MavlStore) Commit(req *types.ReqHash) ([]byte, error) {
beg := types.Now()
defer func() {
kmlog.Info("Commit", "cost", types.Since(beg))
}()
tree, ok := mavls.trees.Load(string(req.Hash))
if !ok {
kmlog.Error("store mavl commit", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
if tree == nil {
kmlog.Info("store mavl commit,do nothing for kvset is null")
mavls.trees.Delete(string(req.Hash))
return req.Hash, nil
}
hash := tree.(*mavl.Tree).Save()
if hash == nil {
kmlog.Error("store mavl commit", "err", types.ErrHashNotFound)
return nil, types.ErrDataBaseDamage
}
mavls.trees.Delete(string(req.Hash))
return req.Hash, nil
}
// Rollback 回退将缓存的mavl树删除掉
func (mavls *MavlStore) Rollback(req *types.ReqHash) ([]byte, error) {
beg := types.Now()
defer func() {
kmlog.Info("Rollback", "cost", types.Since(beg))
}()
_, ok := mavls.trees.Load(string(req.Hash))
if !ok {
kmlog.Error("store mavl rollback", "err", types.ErrHashNotFound)
return nil, types.ErrHashNotFound
}
mavls.trees.Delete(string(req.Hash))
return req.Hash, nil
}
// IterateRangeByStateHash 迭代实现功能; statehash:当前状态hash, start:开始查找的key, end: 结束的key, ascending:升序,降序, fn 迭代回调函数
func (mavls *MavlStore) IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool) {
mavl.IterateRangeByStateHash(mavls.db, statehash, start, end, ascending, fn)
}
// ProcEvent not support message
func (mavls *MavlStore) ProcEvent(msg queue.Message) {
msg.ReplyErr("Store", types.ErrActionNotSupport)
}
// Del ...
func (mavls *MavlStore) Del(req *types.StoreDel) ([]byte, error) {
//not support
return nil, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment