Commit 8f5842a5 authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33 0227

parent 37db92bc
package blockchain
import (
"sync"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue"
......@@ -83,7 +81,7 @@ func (chain *BlockChain) localSet(msg *queue.Message) {
//创建 localdb transaction
func (chain *BlockChain) localNew(msg *queue.Message) {
tx := NewLocalDB(chain.blockStore.db)
tx := db.NewLocalDB(chain.blockStore.db)
id := common.StorePointer(tx)
msg.Reply(chain.client.NewMessage("", types.EventLocalNew, &types.Int64{Data: id}))
}
......@@ -155,160 +153,3 @@ func (chain *BlockChain) localPrefixCount(msg *queue.Message) {
counts := db.NewListHelper(chain.blockStore.db).PrefixCount(Prefix.Key)
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, &types.Int64{Data: counts}))
}
// LocalDB local db for store key value in local
type LocalDB struct {
txcache db.DB
cache db.DB
maindb db.DB
intx bool
mu sync.RWMutex
}
func newMemDB() db.DB {
memdb, err := db.NewGoMemDB("", "", 0)
if err != nil {
panic(err)
}
return memdb
}
// NewLocalDB new local db
func NewLocalDB(maindb db.DB) db.KVDB {
return &LocalDB{
cache: newMemDB(),
txcache: newMemDB(),
maindb: maindb,
}
}
// Get get value from local db
func (l *LocalDB) Get(key []byte) ([]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
value, err := l.get(key)
return value, err
}
func (l *LocalDB) get(key []byte) ([]byte, error) {
if l.intx && l.txcache != nil {
if value, err := l.txcache.Get(key); err == nil {
return value, nil
}
}
if value, err := l.cache.Get(key); err == nil {
return value, nil
}
value, err := l.maindb.Get(key)
if err != nil {
return nil, err
}
err = l.cache.Set(key, value)
if err != nil {
panic(err)
}
return value, nil
}
// Set set key value to local db
func (l *LocalDB) Set(key []byte, value []byte) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.intx {
if l.txcache == nil {
l.txcache = newMemDB()
}
setdb(l.txcache, key, value)
} else {
setdb(l.cache, key, value)
}
return nil
}
// List 从数据库中查询数据列表,set 中的cache 更新不会影响这个list
func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
dblist := make([]db.IteratorDB, 0)
if l.txcache != nil {
dblist = append(dblist, l.txcache)
}
if l.cache != nil {
dblist = append(dblist, l.cache)
}
if l.maindb != nil {
dblist = append(dblist, l.maindb)
}
mergedb := db.NewMergedIteratorDB(dblist)
it := db.NewListHelper(mergedb)
return it.List(prefix, key, count, direction), nil
}
// PrefixCount 从数据库中查询指定前缀的key的数量
func (l *LocalDB) PrefixCount(prefix []byte) (count int64) {
l.mu.RLock()
defer l.mu.RUnlock()
dblist := make([]db.IteratorDB, 0)
if l.txcache != nil {
dblist = append(dblist, l.txcache)
}
if l.cache != nil {
dblist = append(dblist, l.cache)
}
if l.maindb != nil {
dblist = append(dblist, l.maindb)
}
mergedb := db.NewMergedIteratorDB(dblist)
it := db.NewListHelper(mergedb)
return it.PrefixCount(prefix)
}
//Begin 开启内存事务处理
func (l *LocalDB) Begin() {
l.mu.Lock()
defer l.mu.Unlock()
l.intx = true
l.txcache = nil
}
// Rollback reset tx
func (l *LocalDB) Rollback() {
l.mu.Lock()
defer l.mu.Unlock()
l.resetTx()
}
// Commit canche tx
func (l *LocalDB) Commit() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.txcache == nil {
l.resetTx()
return nil
}
it := l.txcache.Iterator(nil, nil, false)
for it.Next() {
l.cache.Set(it.Key(), it.Value())
}
l.resetTx()
return nil
}
func (l *LocalDB) resetTx() {
l.intx = false
l.txcache = nil
}
func setdb(d db.DB, key []byte, value []byte) {
if value == nil {
err := d.Delete(key)
if err != nil {
return
}
} else {
err := d.Set(key, value)
if err != nil {
panic(err)
}
}
}
......@@ -59,9 +59,9 @@ func (db *ListHelper) List(prefix, key []byte, count, direction int32) (values [
if count == 1 && direction == ListSeek {
it := db.db.Iterator(prefix, nil, true)
defer it.Close()
it.Seek(key)
flag := it.Seek(key)
//判断是否相等
if !bytes.Equal(key, it.Key()) {
if !flag || !bytes.Equal(key, it.Key()) {
it.Next()
if !it.Valid() {
return nil
......@@ -117,7 +117,7 @@ func (db *ListHelper) IteratorScanFromFirst(prefix []byte, count int32) (values
values = nil
return
}
//println(string(it.Key()))
//println(string(it.Key()), string(value))
values = append(values, value)
i++
if i == count {
......
package db
import (
"sync"
)
// LocalDB local db for store key value in local
type LocalDB struct {
txcache DB
cache DB
maindb DB
intx bool
mu sync.RWMutex
}
func newMemDB() DB {
memdb, err := NewGoMemDB("", "", 0)
if err != nil {
panic(err)
}
return memdb
}
// NewLocalDB new local db
func NewLocalDB(maindb DB) KVDB {
return &LocalDB{
cache: newMemDB(),
txcache: newMemDB(),
maindb: maindb,
}
}
// Get get value from local db
func (l *LocalDB) Get(key []byte) ([]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
value, err := l.get(key)
return value, err
}
func (l *LocalDB) get(key []byte) ([]byte, error) {
if l.intx && l.txcache != nil {
if value, err := l.txcache.Get(key); err == nil {
return value, nil
}
}
if value, err := l.cache.Get(key); err == nil {
return value, nil
}
value, err := l.maindb.Get(key)
if err != nil {
return nil, err
}
err = l.cache.Set(key, value)
if err != nil {
panic(err)
}
return value, nil
}
// Set set key value to local db
func (l *LocalDB) Set(key []byte, value []byte) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.intx {
if l.txcache == nil {
l.txcache = newMemDB()
}
setdb(l.txcache, key, value)
} else {
setdb(l.cache, key, value)
}
return nil
}
// List 从数据库中查询数据列表,set 中的cache 更新不会影响这个list
func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
dblist := make([]IteratorDB, 0)
if l.txcache != nil {
dblist = append(dblist, l.txcache)
}
if l.cache != nil {
dblist = append(dblist, l.cache)
}
if l.maindb != nil {
dblist = append(dblist, l.maindb)
}
mergedb := NewMergedIteratorDB(dblist)
it := NewListHelper(mergedb)
return it.List(prefix, key, count, direction), nil
}
// PrefixCount 从数据库中查询指定前缀的key的数量
func (l *LocalDB) PrefixCount(prefix []byte) (count int64) {
l.mu.RLock()
defer l.mu.RUnlock()
dblist := make([]IteratorDB, 0)
if l.txcache != nil {
dblist = append(dblist, l.txcache)
}
if l.cache != nil {
dblist = append(dblist, l.cache)
}
if l.maindb != nil {
dblist = append(dblist, l.maindb)
}
mergedb := NewMergedIteratorDB(dblist)
it := NewListHelper(mergedb)
return it.PrefixCount(prefix)
}
//Begin 开启内存事务处理
func (l *LocalDB) Begin() {
l.mu.Lock()
defer l.mu.Unlock()
l.intx = true
l.txcache = nil
}
// Rollback reset tx
func (l *LocalDB) Rollback() {
l.mu.Lock()
defer l.mu.Unlock()
l.resetTx()
}
// Commit canche tx
func (l *LocalDB) Commit() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.txcache == nil {
l.resetTx()
return nil
}
it := l.txcache.Iterator(nil, nil, false)
for it.Next() {
l.cache.Set(it.Key(), it.Value())
}
l.resetTx()
return nil
}
func (l *LocalDB) resetTx() {
l.intx = false
l.txcache = nil
}
func setdb(d DB, key []byte, value []byte) {
if value == nil {
err := d.Delete(key)
if err != nil {
return
}
} else {
err := d.Set(key, value)
if err != nil {
panic(err)
}
}
}
......@@ -15,6 +15,7 @@ const (
dirSOI
dirEOI
dirForward
dirSeek
)
//合并错误列表
......@@ -75,7 +76,7 @@ func (i *mergedIterator) Rewind() bool {
}
}
i.dir = dirSOI
return i.next()
return i.next(false)
}
func (i *mergedIterator) Seek(key []byte) bool {
......@@ -85,7 +86,6 @@ func (i *mergedIterator) Seek(key []byte) bool {
i.err = ErrIterReleased
return false
}
for x, iter := range i.iters {
switch {
case iter.Seek(key):
......@@ -97,10 +97,18 @@ func (i *mergedIterator) Seek(key []byte) bool {
}
}
i.dir = dirSOI
return i.next()
if i.next(true) {
i.dir = dirSeek
return true
}
i.dir = dirSOI
return false
}
func (i *mergedIterator) compare(tkey []byte, key []byte) int {
func (i *mergedIterator) compare(tkey []byte, key []byte, ignoreReverse bool) int {
if ignoreReverse {
return i.cmp.Compare(tkey, key)
}
if tkey == nil && key != nil {
return 1
}
......@@ -114,10 +122,10 @@ func (i *mergedIterator) compare(tkey []byte, key []byte) int {
return result
}
func (i *mergedIterator) next() bool {
func (i *mergedIterator) next(ignoreReverse bool) bool {
var key []byte
for x, tkey := range i.keys {
if tkey != nil && (key == nil || i.compare(tkey, key) < 0) {
if tkey != nil && (key == nil || i.compare(tkey, key, ignoreReverse) < 0) {
key = tkey
i.index = x
}
......@@ -134,8 +142,15 @@ func (i *mergedIterator) next() bool {
}
func (i *mergedIterator) Next() bool {
for i.nextInternal() {
if i.compare(i.Key(), i.prevKey) != 0 {
for {
ok, isrewind := i.nextInternal()
if !ok {
break
}
if isrewind {
return true
}
if i.compare(i.Key(), i.prevKey, true) != 0 {
i.prevKey = cloneByte(i.Key())
return true
}
......@@ -143,30 +158,47 @@ func (i *mergedIterator) Next() bool {
return false
}
func (i *mergedIterator) nextInternal() bool {
func (i *mergedIterator) nextInternal() (bool, bool) {
if i.dir == dirEOI || i.err != nil {
return false
return false, false
} else if i.dir == dirReleased {
i.err = ErrIterReleased
return false
return false, false
}
switch i.dir {
case dirSOI:
return i.Rewind()
return i.Rewind(), true
case dirSeek:
if !i.reverse {
break
}
key := append([]byte{}, i.keys[i.index]...)
for x, iter := range i.iters {
if x == i.index {
continue
}
seek := iter.Seek(key)
switch {
case seek && iter.Next(), !seek && iter.Rewind():
i.keys[x] = assertKey(iter.Key())
case i.iterErr(iter):
return false, false
default:
i.keys[x] = nil
}
}
}
x := i.index
iter := i.iters[x]
switch {
case iter.Next():
i.keys[x] = assertKey(iter.Key())
case i.iterErr(iter):
return false
return false, false
default:
i.keys[x] = nil
}
return i.next()
return i.next(false), false
}
func (i *mergedIterator) Key() []byte {
......
......@@ -2,6 +2,7 @@ package db
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/assert"
......@@ -29,11 +30,76 @@ func TestMergeIter(t *testing.T) {
assert.Equal(t, 2, len(list0))
assert.Equal(t, "2", string(list0[0]))
assert.Equal(t, "1", string(list0[1]))
/*
list0 = it0.List(nil, nil, 100, 1)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "1", string(list0[0]))
assert.Equal(t, "2", string(list0[1]))
*/
}
list0 = it0.List(nil, nil, 100, 1)
func newGoLevelDB(t *testing.T) (DB, string) {
dir, err := ioutil.TempDir("", "goleveldb")
assert.Nil(t, err)
db, err := NewGoLevelDB("test", dir, 16)
assert.Nil(t, err)
return db, dir
}
func TestMergeIterSeek1(t *testing.T) {
db1 := newGoMemDB(t)
db1.Set([]byte("1"), []byte("1"))
it0 := NewListHelper(db1)
list0 := it0.List(nil, []byte("2"), 1, ListSeek)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "1", string(list0[0]))
assert.Equal(t, "2", string(list0[1]))
}
func TestMergeIterSeek(t *testing.T) {
db1 := newGoMemDB(t)
db2 := newGoMemDB(t)
db3, dir := newGoLevelDB(t)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
db1.Set([]byte("1"), []byte("1"))
db2.Set([]byte("3"), []byte("3"))
db3.Set([]byte("5"), []byte("5"))
//合并以后:
db := NewMergedIteratorDB([]IteratorDB{db1, db2, db3})
it0 := NewListHelper(db)
list0 := it0.List(nil, []byte("2"), 1, ListSeek)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "1", string(list0[1]))
list0 = it0.List(nil, []byte("3"), 1, ListSeek)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "3", string(list0[1]))
}
func TestMergeIterSeekPrefix(t *testing.T) {
db1 := newGoMemDB(t)
db2 := newGoMemDB(t)
db3, dir := newGoLevelDB(t)
defer os.RemoveAll(dir) // clean up
os.RemoveAll(dir) //删除已存在目录
db1.Set([]byte("key1"), []byte("1"))
db2.Set([]byte("key3"), []byte("3"))
db3.Set([]byte("key5"), []byte("5"))
//合并以后:
db := NewMergedIteratorDB([]IteratorDB{db1, db2, db3})
it0 := NewListHelper(db)
list0 := it0.List([]byte("key"), []byte("key2"), 1, ListSeek)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "1", string(list0[1]))
list0 = it0.List([]byte("key"), []byte("key3"), 1, ListSeek)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "3", string(list0[1]))
list0 = it0.List([]byte("key"), []byte("key6"), 1, ListSeek)
assert.Equal(t, 2, len(list0))
assert.Equal(t, "5", string(list0[1]))
}
func TestMergeIterDup1(t *testing.T) {
......
......@@ -117,12 +117,12 @@ func TestGetAllCoinsMVCCIter(t *testing.T) {
}
fmt.Println("---case 1-2----")
var match_values [][]byte
var matchValues [][]byte
listhelper.IteratorCallback([]byte("mavl-coins-bty-"), nil, 0, 1, func(key, value []byte) bool {
match_values = append(match_values, value)
matchValues = append(matchValues, value)
return false
})
values = match_values
values = matchValues
assert.Equal(t, "3", string(values[0]))
assert.Equal(t, "2", string(values[1]))
assert.Equal(t, "4", string(values[2]))
......@@ -150,12 +150,12 @@ func TestGetAllCoinsMVCCIter(t *testing.T) {
}
//m.PrintAll()
fmt.Println("---case 2-2----")
match_values = nil
matchValues = nil
listhelper.IteratorCallback(([]byte("mavl-coins-bty-")), []byte("mavl-coins-bty-exec-"), 0, 1, func(key, value []byte) bool {
match_values = append(match_values, value)
matchValues = append(matchValues, value)
return false
})
values = match_values
values = matchValues
for i := 0; i < len(values); i++ {
fmt.Println(string(values[i]))
......@@ -166,12 +166,12 @@ func TestGetAllCoinsMVCCIter(t *testing.T) {
assert.Equal(t, "4", string(values[2]))
fmt.Println("---case 2-3----")
match_values = nil
matchValues = nil
listhelper.IteratorCallback(([]byte("mavl-coins-bty-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTq")), []byte("mavl-coins-bty-exec-"), 0, 1, func(key, value []byte) bool {
match_values = append(match_values, value)
matchValues = append(matchValues, value)
return false
})
values = match_values
values = matchValues
for i := 0; i < len(values); i++ {
fmt.Println(string(values[i]))
......@@ -181,3 +181,57 @@ func TestGetAllCoinsMVCCIter(t *testing.T) {
assert.Equal(t, "2", string(values[0]))
assert.Equal(t, "4", string(values[1]))
}
func TestSimpleMVCCLocalDB(t *testing.T) {
//use leveldb
//use localdb
db3, dir := newGoLevelDB(t)
defer os.RemoveAll(dir) // clean up
kvdb := NewLocalDB(db3)
m := NewSimpleMVCC(kvdb)
kvlist, err := m.AddMVCC(KeyValueList([2]string{"mavl-coins-bty-exec-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp", "1"}, [2]string{"mavl-coins-bty-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTq", "2"}), hashN(0), nil, 0)
assert.Nil(t, err)
setKVList(kvdb, kvlist)
kvlist, err = m.AddMVCC(KeyValueList([2]string{"mavl-coins-bty-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp", "3"}, [2]string{"mavl-coins-bty-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTs", "4"}), hashN(1), hashN(0), 1)
assert.Nil(t, err)
setKVList(kvdb, kvlist)
kvlist, err = m.AddMVCC(KeyValueList([2]string{"mavl-coins-bty-exec-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp", "5"}, [2]string{"mavl-coins-bty-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTt", "6"}), hashN(2), hashN(1), 2)
assert.Nil(t, err)
setKVList(kvdb, kvlist)
values, err := kvdb.List([]byte(".-mvcc-.d.mavl-coins-bty-"), nil, 100, 1)
assert.Nil(t, err)
assert.Equal(t, 6, len(values))
assert.Equal(t, "3", string(values[0]))
assert.Equal(t, "2", string(values[1]))
assert.Equal(t, "4", string(values[2]))
assert.Equal(t, "6", string(values[3]))
assert.Equal(t, "1", string(values[4]))
assert.Equal(t, "5", string(values[5]))
v, err := m.GetV([]byte("mavl-coins-bty-exec-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"), 0)
assert.Nil(t, err)
assert.Equal(t, "1", string(v))
v, err = m.GetV([]byte("mavl-coins-bty-exec-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"), 1)
assert.Nil(t, err)
assert.Equal(t, "1", string(v))
v, err = m.GetV([]byte("mavl-coins-bty-exec-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"), 2)
assert.Nil(t, err)
assert.Equal(t, "5", string(v))
v, err = m.GetV([]byte("mavl-coins-bty-exec-16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"), 3)
assert.Nil(t, err)
assert.Equal(t, "5", string(v))
}
func setKVList(db KVDB, kvlist []*types.KeyValue) {
for _, v := range kvlist {
db.Set(v.Key, v.Value)
}
}
......@@ -271,7 +271,7 @@ func BenchmarkExecBlock(b *testing.B) {
block0 := mock33.GetBlock(0)
account := mock33.GetAccount(block0.StateHash, mock33.GetGenesisAddress())
assert.Equal(b, int64(10000000000000000), account.Balance)
b.StartTimer()
b.ResetTimer()
for i := 0; i < b.N; i++ {
util.ExecBlock(mock33.GetClient(), block0.StateHash, block, false, true)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment