Commit e1b404bd authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33

parent 4b31f0bb
...@@ -412,7 +412,7 @@ func (bs *BlockStore) Get(keys *types.LocalDBGet) *types.LocalReplyValue { ...@@ -412,7 +412,7 @@ func (bs *BlockStore) Get(keys *types.LocalDBGet) *types.LocalReplyValue {
for i := 0; i < len(keys.Keys); i++ { for i := 0; i < len(keys.Keys); i++ {
key := keys.Keys[i] key := keys.Keys[i]
value, err := bs.db.Get(key) value, err := bs.db.Get(key)
if err != nil { if err != nil && err != types.ErrNotFound {
storeLog.Error("Get", "error", err) storeLog.Error("Get", "error", err)
} }
reply.Values = append(reply.Values, value) reply.Values = append(reply.Values, value)
......
package blockchain package blockchain
import ( import (
"sync"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/db" "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
) )
var intransaction bool func (chain *BlockChain) procLocalDB(msgtype int64, msg queue.Message, reqnum chan struct{}) bool {
func (chain *BlockChain) procLocalDB(msgtype int64, msg queue.Message, reqnum chan struct{}) {
switch msgtype { switch msgtype {
case types.EventLocalGet: case types.EventLocalGet:
go chain.processMsg(msg, reqnum, chain.localGet) go chain.processMsg(msg, reqnum, chain.localGet)
case types.EventLocalSet: case types.EventLocalSet:
go chain.processMsg(msg, reqnum, chain.localSet) go chain.processMsg(msg, reqnum, chain.localSet)
case types.EventLocalBegin: case types.EventLocalBegin:
if intransaction {
go chain.processMsg(msg, reqnum, func(msg queue.Message) {
msg.Reply(chain.client.NewMessage("", types.EventLocalBegin, types.ErrLocalDBTxDupOpen))
})
return
}
intransaction = true
go chain.processMsg(msg, reqnum, chain.localBegin) go chain.processMsg(msg, reqnum, chain.localBegin)
case types.EventLocalCommit: case types.EventLocalCommit:
intransaction = false
go chain.processMsg(msg, reqnum, chain.localCommit) go chain.processMsg(msg, reqnum, chain.localCommit)
case types.EventLocalRollback: case types.EventLocalRollback:
intransaction = false
go chain.processMsg(msg, reqnum, chain.localRollback) go chain.processMsg(msg, reqnum, chain.localRollback)
case types.EventLocalList: case types.EventLocalList:
go chain.processMsg(msg, reqnum, chain.localList) go chain.processMsg(msg, reqnum, chain.localList)
case types.EventLocalPrefixCount: case types.EventLocalPrefixCount:
go chain.processMsg(msg, reqnum, chain.localPrefixCount) go chain.processMsg(msg, reqnum, chain.localPrefixCount)
case types.EventLocalNew:
go chain.processMsg(msg, reqnum, chain.localNew)
case types.EventLocalClose:
go chain.processMsg(msg, reqnum, chain.localClose)
default:
return false
} }
return true
} }
func (chain *BlockChain) localGet(msg queue.Message) { func (chain *BlockChain) localGet(msg queue.Message) {
...@@ -47,11 +45,12 @@ func (chain *BlockChain) localGet(msg queue.Message) { ...@@ -47,11 +45,12 @@ func (chain *BlockChain) localGet(msg queue.Message) {
tx, err := common.GetPointer(keys.Txid) tx, err := common.GetPointer(keys.Txid)
if err != nil { if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, err))
return
} }
var reply types.LocalReplyValue var reply types.LocalReplyValue
for i := 0; i < len(keys.Keys); i++ { for i := 0; i < len(keys.Keys); i++ {
key := keys.Keys[i] key := keys.Keys[i]
value, err := tx.(db.TxKV).Get(key) value, err := tx.(db.KVDB).Get(key)
if err != nil { if err != nil {
chainlog.Debug("localGet", "i", i, "key", string(key), "err", err) chainlog.Debug("localGet", "i", i, "key", string(key), "err", err)
} }
...@@ -72,7 +71,7 @@ func (chain *BlockChain) localSet(msg queue.Message) { ...@@ -72,7 +71,7 @@ func (chain *BlockChain) localSet(msg queue.Message) {
msg.Reply(chain.client.NewMessage("", types.EventLocalSet, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalSet, err))
return return
} }
tx := txp.(db.TxKV) tx := txp.(db.KVDB)
for i := 0; i < len(kvs.KV); i++ { for i := 0; i < len(kvs.KV); i++ {
err := tx.Set(kvs.KV[i].Key, kvs.KV[i].Value) err := tx.Set(kvs.KV[i].Key, kvs.KV[i].Value)
if err != nil { if err != nil {
...@@ -82,55 +81,71 @@ func (chain *BlockChain) localSet(msg queue.Message) { ...@@ -82,55 +81,71 @@ func (chain *BlockChain) localSet(msg queue.Message) {
msg.Reply(chain.client.NewMessage("", types.EventLocalSet, nil)) msg.Reply(chain.client.NewMessage("", types.EventLocalSet, nil))
} }
//创建 localdb transaction
func (chain *BlockChain) localNew(msg queue.Message) {
tx := NewLocalDB(chain.blockStore.db)
id := common.StorePointer(tx)
msg.Reply(chain.client.NewMessage("", types.EventLocalNew, &types.Int64{Data: id}))
}
//关闭 localdb transaction
func (chain *BlockChain) localClose(msg queue.Message) {
id := (msg.Data).(*types.Int64).Data
_, err := common.GetPointer(id)
common.RemovePointer(id)
msg.Reply(chain.client.NewMessage("", types.EventLocalClose, err))
}
func (chain *BlockChain) localBegin(msg queue.Message) { func (chain *BlockChain) localBegin(msg queue.Message) {
tx, err := chain.blockStore.db.BeginTx() id := (msg.Data).(*types.Int64).Data
tx, err := common.GetPointer(id)
if err != nil { if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalBegin, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalBegin, err))
return return
} }
id := common.StorePointer(tx) tx.(db.KVDB).Begin()
msg.Reply(chain.client.NewMessage("", types.EventLocalBegin, &types.Int64{Data: id})) msg.Reply(chain.client.NewMessage("", types.EventLocalBegin, nil))
} }
func (chain *BlockChain) localCommit(msg queue.Message) { func (chain *BlockChain) localCommit(msg queue.Message) {
id := (msg.Data).(*types.Int64).Data id := (msg.Data).(*types.Int64).Data
tx, err := common.GetPointer(id) tx, err := common.GetPointer(id)
common.RemovePointer(id)
if err != nil { if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalCommit, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalCommit, err))
return return
} }
err = tx.(db.TxKV).Commit() err = tx.(db.KVDB).Commit()
if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalCommit, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalCommit, err))
return
}
msg.Reply(chain.client.NewMessage("", types.EventLocalCommit, nil))
} }
func (chain *BlockChain) localRollback(msg queue.Message) { func (chain *BlockChain) localRollback(msg queue.Message) {
id := (msg.Data).(*types.Int64).Data id := (msg.Data).(*types.Int64).Data
tx, err := common.GetPointer(id) tx, err := common.GetPointer(id)
common.RemovePointer(id)
if err != nil { if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalRollback, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalRollback, err))
return return
} }
tx.(db.TxKV).Rollback() tx.(db.KVDB).Rollback()
msg.Reply(chain.client.NewMessage("", types.EventLocalRollback, nil)) msg.Reply(chain.client.NewMessage("", types.EventLocalRollback, nil))
} }
func (chain *BlockChain) localList(msg queue.Message) { func (chain *BlockChain) localList(msg queue.Message) {
q := (msg.Data).(*types.LocalDBList) q := (msg.Data).(*types.LocalDBList)
var itdb db.IteratorDB = chain.blockStore.db var values [][]byte
if q.Txid > 0 { if q.Txid > 0 {
tx, err := common.GetPointer(q.Txid) tx, err := common.GetPointer(q.Txid)
if err != nil { if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, err)) msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, err))
return
}
values, err = tx.(db.KVDB).List(q.Prefix, q.Key, q.Count, q.Direction)
if err != nil {
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, err))
return
} }
itdb = tx.(db.IteratorDB) } else {
values = db.NewListHelper(chain.blockStore.db).List(q.Prefix, q.Key, q.Count, q.Direction)
} }
values := db.NewListHelper(itdb).List(q.Prefix, q.Key, q.Count, q.Direction)
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, &types.LocalReplyValue{Values: values})) msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, &types.LocalReplyValue{Values: values}))
} }
...@@ -140,3 +155,160 @@ func (chain *BlockChain) localPrefixCount(msg queue.Message) { ...@@ -140,3 +155,160 @@ func (chain *BlockChain) localPrefixCount(msg queue.Message) {
counts := db.NewListHelper(chain.blockStore.db).PrefixCount(Prefix.Key) counts := db.NewListHelper(chain.blockStore.db).PrefixCount(Prefix.Key)
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, &types.Int64{Data: counts})) msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, &types.Int64{Data: counts}))
} }
// LocalDB local db for store key value in local
type LocalDB struct {
txcache db.DB
cache db.DB
maindb db.DB
intx bool
mu sync.RWMutex
}
func newMemDB() db.DB {
memdb, err := db.NewGoMemDB("", "", 0)
if err != nil {
panic(err)
}
return memdb
}
// NewLocalDB new local db
func NewLocalDB(maindb db.DB) db.KVDB {
return &LocalDB{
cache: newMemDB(),
txcache: newMemDB(),
maindb: maindb,
}
}
// Get get value from local db
func (l *LocalDB) Get(key []byte) ([]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
value, err := l.get(key)
return value, err
}
func (l *LocalDB) get(key []byte) ([]byte, error) {
if l.intx && l.txcache != nil {
if value, err := l.txcache.Get(key); err == nil {
return value, nil
}
}
if value, err := l.cache.Get(key); err == nil {
return value, nil
}
value, err := l.maindb.Get(key)
if err != nil {
return nil, err
}
err = l.cache.Set(key, value)
if err != nil {
panic(err)
}
return value, nil
}
// Set set key value to local db
func (l *LocalDB) Set(key []byte, value []byte) error {
l.mu.Lock()
defer l.mu.Unlock()
if l.intx {
if l.txcache == nil {
l.txcache = newMemDB()
}
setdb(l.txcache, key, value)
} else {
setdb(l.cache, key, value)
}
return nil
}
// List 从数据库中查询数据列表,set 中的cache 更新不会影响这个list
func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) {
l.mu.RLock()
defer l.mu.RUnlock()
dblist := make([]db.IteratorDB, 0)
if l.txcache != nil {
dblist = append(dblist, l.txcache)
}
if l.cache != nil {
dblist = append(dblist, l.cache)
}
if l.maindb != nil {
dblist = append(dblist, l.maindb)
}
mergedb := db.NewMergedIteratorDB(dblist)
it := db.NewListHelper(mergedb)
return it.List(prefix, key, count, direction), nil
}
// PrefixCount 从数据库中查询指定前缀的key的数量
func (l *LocalDB) PrefixCount(prefix []byte) (count int64) {
l.mu.RLock()
defer l.mu.RUnlock()
dblist := make([]db.IteratorDB, 0)
if l.txcache != nil {
dblist = append(dblist, l.txcache)
}
if l.cache != nil {
dblist = append(dblist, l.cache)
}
if l.maindb != nil {
dblist = append(dblist, l.maindb)
}
mergedb := db.NewMergedIteratorDB(dblist)
it := db.NewListHelper(mergedb)
return it.PrefixCount(prefix)
}
//Begin 开启内存事务处理
func (l *LocalDB) Begin() {
l.mu.Lock()
defer l.mu.Unlock()
l.intx = true
l.txcache = nil
}
// Rollback reset tx
func (l *LocalDB) Rollback() {
l.mu.Lock()
defer l.mu.Unlock()
l.resetTx()
}
// Commit canche tx
func (l *LocalDB) Commit() error {
l.mu.Lock()
defer l.mu.Unlock()
if l.txcache == nil {
l.resetTx()
return nil
}
it := l.txcache.Iterator(nil, nil, false)
for it.Next() {
l.cache.Set(it.Key(), it.Value())
}
l.resetTx()
return nil
}
func (l *LocalDB) resetTx() {
l.intx = false
l.txcache = nil
}
func setdb(d db.DB, key []byte, value []byte) {
if value == nil {
err := d.Delete(key)
if err != nil {
return
}
} else {
err := d.Set(key, value)
if err != nil {
panic(err)
}
}
}
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
func TestLocalDBRollback(t *testing.T) { func TestLocalDBRollback(t *testing.T) {
mock33 := testnode.New("", nil) mock33 := testnode.New("", nil)
defer mock33.Close() defer mock33.Close()
//测试localdb //测试localdb
api := mock33.GetAPI() api := mock33.GetAPI()
param := &types.LocalDBGet{} param := &types.LocalDBGet{}
...@@ -28,12 +27,19 @@ func TestLocalDBRollback(t *testing.T) { ...@@ -28,12 +27,19 @@ func TestLocalDBRollback(t *testing.T) {
assert.Equal(t, err, types.ErrNotSetInTransaction) assert.Equal(t, err, types.ErrNotSetInTransaction)
//set in transaction //set in transaction
id, err := api.LocalBegin(nil) id, err := api.LocalNew(nil)
assert.Nil(t, err)
assert.True(t, id.Data > 0)
err = api.LocalClose(id)
assert.Nil(t, err)
id, err = api.LocalNew(nil)
assert.Nil(t, err) assert.Nil(t, err)
assert.True(t, id.Data > 0) assert.True(t, id.Data > 0)
_, err = api.LocalBegin(nil) err = api.LocalBegin(id)
assert.Equal(t, err, types.ErrLocalDBTxDupOpen) assert.Nil(t, err)
param2 = &types.LocalDBSet{Txid: id.Data} param2 = &types.LocalDBSet{Txid: id.Data}
param2.KV = append(param2.KV, &types.KeyValue{Key: []byte("hello"), Value: []byte("world")}) param2.KV = append(param2.KV, &types.KeyValue{Key: []byte("hello"), Value: []byte("world")})
...@@ -87,7 +93,7 @@ func TestLocalDBRollback(t *testing.T) { ...@@ -87,7 +93,7 @@ func TestLocalDBRollback(t *testing.T) {
Direction: 1, Direction: 1,
} }
_, err = api.LocalList(list) _, err = api.LocalList(list)
assert.Equal(t, err, common.ErrPointerNotFound) assert.Nil(t, err)
list = &types.LocalDBList{ list = &types.LocalDBList{
Prefix: []byte("hello"), Prefix: []byte("hello"),
...@@ -117,12 +123,12 @@ func TestLocalDBCommit(t *testing.T) { ...@@ -117,12 +123,12 @@ func TestLocalDBCommit(t *testing.T) {
assert.Equal(t, err, types.ErrNotSetInTransaction) assert.Equal(t, err, types.ErrNotSetInTransaction)
//set in transaction //set in transaction
id, err := api.LocalBegin(nil) id, err := api.LocalNew(nil)
assert.Nil(t, err) assert.Nil(t, err)
assert.True(t, id.Data > 0) assert.True(t, id.Data > 0)
_, err = api.LocalBegin(nil) err = api.LocalBegin(id)
assert.Equal(t, err, types.ErrLocalDBTxDupOpen) assert.Nil(t, err)
param2 = &types.LocalDBSet{Txid: id.Data} param2 = &types.LocalDBSet{Txid: id.Data}
param2.KV = append(param2.KV, &types.KeyValue{Key: []byte("hello"), Value: []byte("world")}) param2.KV = append(param2.KV, &types.KeyValue{Key: []byte("hello"), Value: []byte("world")})
...@@ -167,6 +173,9 @@ func TestLocalDBCommit(t *testing.T) { ...@@ -167,6 +173,9 @@ func TestLocalDBCommit(t *testing.T) {
err = api.LocalCommit(id) err = api.LocalCommit(id)
assert.Nil(t, err) assert.Nil(t, err)
err = api.LocalClose(id)
assert.Nil(t, err)
list = &types.LocalDBList{ list = &types.LocalDBList{
Txid: id.Data, Txid: id.Data,
Prefix: []byte("hello"), Prefix: []byte("hello"),
...@@ -175,13 +184,12 @@ func TestLocalDBCommit(t *testing.T) { ...@@ -175,13 +184,12 @@ func TestLocalDBCommit(t *testing.T) {
_, err = api.LocalList(list) _, err = api.LocalList(list)
assert.Equal(t, err, common.ErrPointerNotFound) assert.Equal(t, err, common.ErrPointerNotFound)
//系统只读,无法写入数据
list = &types.LocalDBList{ list = &types.LocalDBList{
Prefix: []byte("hello"), Prefix: []byte("hello"),
Direction: 1, Direction: 1,
} }
values, err = api.LocalList(list) values, err = api.LocalList(list)
assert.Equal(t, err, nil) assert.Equal(t, err, nil)
assert.Equal(t, 2, len(values.Values)) assert.Equal(t, 0, len(values.Values))
assert.Equal(t, []byte("world"), values.Values[0])
assert.Equal(t, []byte("world2"), values.Values[1])
} }
...@@ -22,7 +22,9 @@ func (chain *BlockChain) ProcRecvMsg() { ...@@ -22,7 +22,9 @@ func (chain *BlockChain) ProcRecvMsg() {
msgtype := msg.Ty msgtype := msg.Ty
reqnum <- struct{}{} reqnum <- struct{}{}
atomic.AddInt32(&chain.runcount, 1) atomic.AddInt32(&chain.runcount, 1)
chain.procLocalDB(msgtype, msg, reqnum) if chain.procLocalDB(msgtype, msg, reqnum) {
continue
}
switch msgtype { switch msgtype {
case types.EventQueryTx: case types.EventQueryTx:
go chain.processMsg(msg, reqnum, chain.queryTx) go chain.processMsg(msg, reqnum, chain.queryTx)
......
...@@ -138,8 +138,16 @@ func (m *mockBlockChain) SetQueueClient(q queue.Queue) { ...@@ -138,8 +138,16 @@ func (m *mockBlockChain) SetQueueClient(q queue.Queue) {
} else { } else {
msg.ReplyErr("Do not support", types.ErrInvalidParam) msg.ReplyErr("Do not support", types.ErrInvalidParam)
} }
case types.EventLocalNew:
msg.Reply(client.NewMessage(blockchainKey, types.EventLocalNew, &types.Int64{Data: 9999}))
case types.EventLocalClose:
msg.Reply(client.NewMessage(blockchainKey, types.EventLocalClose, nil))
case types.EventLocalBegin: case types.EventLocalBegin:
msg.Reply(client.NewMessage(blockchainKey, types.EventLocalBegin, &types.Int64{Data: 9999})) if req, ok := msg.GetData().(*types.Int64); ok && req.Data == 9999 {
msg.Reply(client.NewMessage(blockchainKey, types.EventLocalBegin, nil))
} else {
msg.ReplyErr("transaction id must 9999", types.ErrInvalidParam)
}
case types.EventLocalCommit: case types.EventLocalCommit:
if req, ok := msg.GetData().(*types.Int64); ok && req.Data == 9999 { if req, ok := msg.GetData().(*types.Int64); ok && req.Data == 9999 {
msg.Reply(client.NewMessage(blockchainKey, types.EventLocalCommit, nil)) msg.Reply(client.NewMessage(blockchainKey, types.EventLocalCommit, nil))
......
...@@ -707,26 +707,31 @@ func (_m *QueueProtocolAPI) ListSeqCallBack() (*types.BlockSeqCBs, error) { ...@@ -707,26 +707,31 @@ func (_m *QueueProtocolAPI) ListSeqCallBack() (*types.BlockSeqCBs, error) {
} }
// LocalBegin provides a mock function with given fields: param // LocalBegin provides a mock function with given fields: param
func (_m *QueueProtocolAPI) LocalBegin(param *types.ReqNil) (*types.Int64, error) { func (_m *QueueProtocolAPI) LocalBegin(param *types.Int64) error {
ret := _m.Called(param) ret := _m.Called(param)
var r0 *types.Int64 var r0 error
if rf, ok := ret.Get(0).(func(*types.ReqNil) *types.Int64); ok { if rf, ok := ret.Get(0).(func(*types.Int64) error); ok {
r0 = rf(param) r0 = rf(param)
} else { } else {
if ret.Get(0) != nil { r0 = ret.Error(0)
r0 = ret.Get(0).(*types.Int64)
}
} }
var r1 error return r0
if rf, ok := ret.Get(1).(func(*types.ReqNil) error); ok { }
r1 = rf(param)
// LocalClose provides a mock function with given fields: param
func (_m *QueueProtocolAPI) LocalClose(param *types.Int64) error {
ret := _m.Called(param)
var r0 error
if rf, ok := ret.Get(0).(func(*types.Int64) error); ok {
r0 = rf(param)
} else { } else {
r1 = ret.Error(1) r0 = ret.Error(0)
} }
return r0, r1 return r0
} }
// LocalCommit provides a mock function with given fields: param // LocalCommit provides a mock function with given fields: param
...@@ -789,6 +794,29 @@ func (_m *QueueProtocolAPI) LocalList(param *types.LocalDBList) (*types.LocalRep ...@@ -789,6 +794,29 @@ func (_m *QueueProtocolAPI) LocalList(param *types.LocalDBList) (*types.LocalRep
return r0, r1 return r0, r1
} }
// LocalNew provides a mock function with given fields: param
func (_m *QueueProtocolAPI) LocalNew(param *types.ReqNil) (*types.Int64, error) {
ret := _m.Called(param)
var r0 *types.Int64
if rf, ok := ret.Get(0).(func(*types.ReqNil) *types.Int64); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.Int64)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqNil) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// LocalRollback provides a mock function with given fields: param // LocalRollback provides a mock function with given fields: param
func (_m *QueueProtocolAPI) LocalRollback(param *types.Int64) error { func (_m *QueueProtocolAPI) LocalRollback(param *types.Int64) error {
ret := _m.Called(param) ret := _m.Called(param)
......
...@@ -741,11 +741,11 @@ func (q *QueueProtocol) LocalSet(param *types.LocalDBSet) error { ...@@ -741,11 +741,11 @@ func (q *QueueProtocol) LocalSet(param *types.LocalDBSet) error {
return nil return nil
} }
//LocalBegin begin a transaction //LocalNew new a localdb object
func (q *QueueProtocol) LocalBegin(param *types.ReqNil) (*types.Int64, error) { func (q *QueueProtocol) LocalNew(param *types.ReqNil) (*types.Int64, error) {
msg, err := q.query(blockchainKey, types.EventLocalBegin, nil) msg, err := q.query(blockchainKey, types.EventLocalNew, nil)
if err != nil { if err != nil {
log.Error("LocalBegin", "Error", err.Error()) log.Error("LocalNew", "Error", err.Error())
return nil, err return nil, err
} }
if reply, ok := msg.GetData().(*types.Int64); ok { if reply, ok := msg.GetData().(*types.Int64); ok {
...@@ -754,6 +754,26 @@ func (q *QueueProtocol) LocalBegin(param *types.ReqNil) (*types.Int64, error) { ...@@ -754,6 +754,26 @@ func (q *QueueProtocol) LocalBegin(param *types.ReqNil) (*types.Int64, error) {
return nil, types.ErrTypeAsset return nil, types.ErrTypeAsset
} }
//LocalBegin begin a transaction
func (q *QueueProtocol) LocalBegin(param *types.Int64) error {
_, err := q.query(blockchainKey, types.EventLocalBegin, param)
if err != nil {
log.Error("LocalBegin", "Error", err.Error())
return err
}
return nil
}
//LocalClose begin a transaction
func (q *QueueProtocol) LocalClose(param *types.Int64) error {
_, err := q.query(blockchainKey, types.EventLocalClose, param)
if err != nil {
log.Error("LocalClose", "Error", err.Error())
return err
}
return nil
}
//LocalCommit commit a transaction //LocalCommit commit a transaction
func (q *QueueProtocol) LocalCommit(param *types.Int64) error { func (q *QueueProtocol) LocalCommit(param *types.Int64) error {
if param == nil { if param == nil {
......
...@@ -192,9 +192,11 @@ func testLocalList(t *testing.T, api client.QueueProtocolAPI) { ...@@ -192,9 +192,11 @@ func testLocalList(t *testing.T, api client.QueueProtocolAPI) {
} }
func testLocalTransaction(t *testing.T, api client.QueueProtocolAPI) { func testLocalTransaction(t *testing.T, api client.QueueProtocolAPI) {
txid, err := api.LocalBegin(nil) txid, err := api.LocalNew(nil)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, txid.Data, int64(9999)) assert.Equal(t, txid.Data, int64(9999))
err = api.LocalBegin(txid)
assert.Nil(t, err)
err = api.LocalCommit(txid) err = api.LocalCommit(txid)
assert.Nil(t, err) assert.Nil(t, err)
err = api.LocalRollback(txid) err = api.LocalRollback(txid)
...@@ -202,6 +204,8 @@ func testLocalTransaction(t *testing.T, api client.QueueProtocolAPI) { ...@@ -202,6 +204,8 @@ func testLocalTransaction(t *testing.T, api client.QueueProtocolAPI) {
param := &types.LocalDBSet{Txid: txid.Data} param := &types.LocalDBSet{Txid: txid.Data}
err = api.LocalSet(param) err = api.LocalSet(param)
assert.Nil(t, err) assert.Nil(t, err)
err = api.LocalClose(txid)
assert.Nil(t, err)
} }
func testIsNtpClockSync(t *testing.T, api client.QueueProtocolAPI) { func testIsNtpClockSync(t *testing.T, api client.QueueProtocolAPI) {
......
...@@ -43,8 +43,12 @@ type QueueProtocolAPI interface { ...@@ -43,8 +43,12 @@ type QueueProtocolAPI interface {
// +++++++++++++++ wallet interfaces begin // +++++++++++++++ wallet interfaces begin
// types.EventLocalGet // types.EventLocalGet
LocalGet(param *types.LocalDBGet) (*types.LocalReplyValue, error) LocalGet(param *types.LocalDBGet) (*types.LocalReplyValue, error)
// types.EventLocalNew
LocalNew(param *types.ReqNil) (*types.Int64, error)
// types.EventLocalClose
LocalClose(param *types.Int64) error
// types.EventLocalBeign // types.EventLocalBeign
LocalBegin(param *types.ReqNil) (*types.Int64, error) LocalBegin(param *types.Int64) error
// types.EventLocalCommit // types.EventLocalCommit
LocalCommit(param *types.Int64) error LocalCommit(param *types.Int64) error
// types.EventLocalRollback // types.EventLocalRollback
......
...@@ -253,7 +253,10 @@ func (im *importPackageStrategy) fetchPlugin(gitrepo, version string) error { ...@@ -253,7 +253,10 @@ func (im *importPackageStrategy) fetchPlugin(gitrepo, version string) error {
func (im *importPackageStrategy) fetchPluginPackage() error { func (im *importPackageStrategy) fetchPluginPackage() error {
mlog.Info("下载插件源码包") mlog.Info("下载插件源码包")
pwd := util.Pwd() pwd := util.Pwd()
os.Chdir(im.projRootPath) err := os.Chdir(im.projRootPath)
if err != nil {
return err
}
defer os.Chdir(pwd) defer os.Chdir(pwd)
for _, plugins := range im.items { for _, plugins := range im.items {
for _, plugin := range plugins { for _, plugin := range plugins {
......
...@@ -74,13 +74,19 @@ func main() { ...@@ -74,13 +74,19 @@ func main() {
func ioHeightAndIndex() error { func ioHeightAndIndex() error {
if _, err := os.Stat(heightFile); os.IsNotExist(err) { if _, err := os.Stat(heightFile); os.IsNotExist(err) {
f, _ := os.Create(heightFile) f, innerErr := os.Create(heightFile)
if innerErr != nil {
return innerErr
}
height := strconv.FormatInt(currentHeight, 10) height := strconv.FormatInt(currentHeight, 10)
index := strconv.FormatInt(currentIndex, 10) index := strconv.FormatInt(currentIndex, 10)
f.WriteString(height + " " + index) f.WriteString(height + " " + index)
f.Close() f.Close()
} }
f, _ := os.OpenFile(heightFile, os.O_RDWR, 0666) f, err := os.OpenFile(heightFile, os.O_RDWR, 0666)
if err != nil {
return err
}
defer f.Close() defer f.Close()
fileContent, err := ioutil.ReadFile(heightFile) fileContent, err := ioutil.ReadFile(heightFile)
if err != nil { if err != nil {
...@@ -201,7 +207,11 @@ func scanWrite() { ...@@ -201,7 +207,11 @@ func scanWrite() {
} }
var sent string var sent string
rpc.Call("Chain33.SendTransaction", paramsRaw, &sent) rpc.Call("Chain33.SendTransaction", paramsRaw, &sent)
f, _ := os.OpenFile(heightFile, os.O_RDWR, 0666) f, err := os.OpenFile(heightFile, os.O_RDWR, 0666)
if err != nil {
fmt.Fprintln(os.Stderr, err)
continue
}
height := strconv.FormatInt(currentHeight, 10) height := strconv.FormatInt(currentHeight, 10)
index := strconv.FormatInt(currentIndex, 10) index := strconv.FormatInt(currentIndex, 10)
f.WriteString(height + " " + index) f.WriteString(height + " " + index)
......
...@@ -22,6 +22,14 @@ var ErrPointerNotFound = errors.New("ErrPointerNotFound") ...@@ -22,6 +22,14 @@ var ErrPointerNotFound = errors.New("ErrPointerNotFound")
func init() { func init() {
random = rand.New(rand.NewSource(time.Now().UnixNano())) random = rand.New(rand.NewSource(time.Now().UnixNano()))
go func() {
for {
time.Sleep(60 * time.Second)
if len(globalPointerMap) > 10 {
println("<==>global pointer count = ", len(globalPointerMap))
}
}
}()
} }
//StorePointer 保存指针返回int64 //StorePointer 保存指针返回int64
......
...@@ -284,7 +284,10 @@ func (db *GoBadgerDB) NewBatch(sync bool) Batch { ...@@ -284,7 +284,10 @@ func (db *GoBadgerDB) NewBatch(sync bool) Batch {
//Set set //Set set
func (mBatch *GoBadgerDBBatch) Set(key, value []byte) { func (mBatch *GoBadgerDBBatch) Set(key, value []byte) {
mBatch.batch.Set(key, value) err := mBatch.batch.Set(key, value)
if err != nil {
blog.Error("Set", "error", err)
}
mBatch.size += len(value) mBatch.size += len(value)
mBatch.size += len(key) mBatch.size += len(key)
mBatch.len += len(value) mBatch.len += len(value)
...@@ -292,7 +295,10 @@ func (mBatch *GoBadgerDBBatch) Set(key, value []byte) { ...@@ -292,7 +295,10 @@ func (mBatch *GoBadgerDBBatch) Set(key, value []byte) {
//Delete 设置 //Delete 设置
func (mBatch *GoBadgerDBBatch) Delete(key []byte) { func (mBatch *GoBadgerDBBatch) Delete(key []byte) {
mBatch.batch.Delete(key) err := mBatch.batch.Delete(key)
if err != nil {
blog.Error("Delete", "error", err)
}
mBatch.size += len(key) mBatch.size += len(key)
mBatch.len++ mBatch.len++
} }
......
...@@ -38,26 +38,13 @@ func NewGoMemDB(name string, dir string, cache int) (*GoMemDB, error) { ...@@ -38,26 +38,13 @@ func NewGoMemDB(name string, dir string, cache int) (*GoMemDB, error) {
}, nil }, nil
} }
//CopyBytes 复制字节
func CopyBytes(b []byte) (copiedBytes []byte) {
/* 兼容leveldb
if b == nil {
return nil
}
*/
copiedBytes = make([]byte, len(b))
copy(copiedBytes, b)
return copiedBytes
}
//Get get //Get get
func (db *GoMemDB) Get(key []byte) ([]byte, error) { func (db *GoMemDB) Get(key []byte) ([]byte, error) {
v, err := db.db.Get(key) v, err := db.db.Get(key)
if err != nil { if err != nil {
return nil, ErrNotFoundInDb return nil, ErrNotFoundInDb
} }
return CopyBytes(v), nil return cloneByte(v), nil
} }
//Set set //Set set
...@@ -151,14 +138,14 @@ func (db *GoMemDB) NewBatch(sync bool) Batch { ...@@ -151,14 +138,14 @@ func (db *GoMemDB) NewBatch(sync bool) Batch {
} }
func (b *memBatch) Set(key, value []byte) { func (b *memBatch) Set(key, value []byte) {
b.writes = append(b.writes, kv{CopyBytes(key), CopyBytes(value)}) b.writes = append(b.writes, kv{cloneByte(key), cloneByte(value)})
b.size += len(value) b.size += len(value)
b.size += len(key) b.size += len(key)
b.len += len(value) b.len += len(value)
} }
func (b *memBatch) Delete(key []byte) { func (b *memBatch) Delete(key []byte) {
b.writes = append(b.writes, kv{CopyBytes(key), nil}) b.writes = append(b.writes, kv{cloneByte(key), nil})
b.size += len(key) b.size += len(key)
b.len++ b.len++
} }
......
...@@ -20,11 +20,6 @@ func TestStateDBGet(t *testing.T) { ...@@ -20,11 +20,6 @@ func TestStateDBGet(t *testing.T) {
testDBGet(t, db) testDBGet(t, db)
} }
func TestLocalDBGet(t *testing.T) {
db := NewLocalDB(nil)
testDBGet(t, db)
}
func testDBGet(t *testing.T, db dbm.KV) { func testDBGet(t *testing.T, db dbm.KV) {
err := db.Set([]byte("k1"), []byte("v1")) err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(t, err) assert.Nil(t, err)
...@@ -83,16 +78,6 @@ func TestStateDBTxGetOld(t *testing.T) { ...@@ -83,16 +78,6 @@ func TestStateDBTxGetOld(t *testing.T) {
db.Commit() db.Commit()
} }
func TestStateDBTxGet(t *testing.T) {
db := newStateDbForTest(types.GetFork("ForkExecRollback"))
testTxGet(t, db)
}
func TestLocalDBTxGet(t *testing.T) {
db := NewLocalDB(nil)
testTxGet(t, db)
}
func testTxGet(t *testing.T, db dbm.KV) { func testTxGet(t *testing.T, db dbm.KV) {
//新版本 //新版本
db.Begin() db.Begin()
...@@ -128,23 +113,7 @@ func testTxGet(t *testing.T, db dbm.KV) { ...@@ -128,23 +113,7 @@ func testTxGet(t *testing.T, db dbm.KV) {
assert.Equal(t, v, []byte("v11")) assert.Equal(t, v, []byte("v11"))
} }
func TestLocalDB(t *testing.T) { func TestStateDBTxGet(t *testing.T) {
db := NewLocalDB(nil) db := newStateDbForTest(types.GetFork("ForkExecRollback"))
err := db.Set([]byte("k1"), []byte("v1")) testTxGet(t, db)
assert.Nil(t, err)
v, err := db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
err = db.Set([]byte("k1"), []byte("v11"))
assert.Nil(t, err)
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v11"))
//beigin and rollback not imp
db.Begin()
db.Rollback()
db.Commit()
db.List([]byte("a"), []byte("b"), 1, 1)
} }
...@@ -44,11 +44,10 @@ type executorCtx struct { ...@@ -44,11 +44,10 @@ type executorCtx struct {
mainHeight int64 mainHeight int64
} }
func newExecutor(ctx *executorCtx, exec *Executor, txs []*types.Transaction, receipts []*types.ReceiptData) *executor { func newExecutor(ctx *executorCtx, exec *Executor, localdb dbm.KVDB, txs []*types.Transaction, receipts []*types.ReceiptData) *executor {
client := exec.client client := exec.client
enableMVCC := exec.pluginEnable["mvcc"] enableMVCC := exec.pluginEnable["mvcc"]
opt := &StateDBOption{EnableMVCC: enableMVCC, Height: ctx.height} opt := &StateDBOption{EnableMVCC: enableMVCC, Height: ctx.height}
localdb := NewLocalDB(client)
e := &executor{ e := &executor{
stateDB: NewStateDB(client, ctx.stateHash, localdb, opt), stateDB: NewStateDB(client, ctx.stateHash, localdb, opt),
localDB: localdb, localDB: localdb,
...@@ -225,7 +224,7 @@ func (e *executor) loadDriver(tx *types.Transaction, index int) (c drivers.Drive ...@@ -225,7 +224,7 @@ func (e *executor) loadDriver(tx *types.Transaction, index int) (c drivers.Drive
return exec return exec
} }
func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Receipt, error) { func (e *executor) execTxGroup(exec *Executor, txs []*types.Transaction, index int) ([]*types.Receipt, error) {
txgroup := &types.Transactions{Txs: txs} txgroup := &types.Transactions{Txs: txs}
err := e.checkTxGroup(txgroup, index) err := e.checkTxGroup(txgroup, index)
if err != nil { if err != nil {
...@@ -239,6 +238,7 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re ...@@ -239,6 +238,7 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re
//如果系统执行失败,回滚到这个状态 //如果系统执行失败,回滚到这个状态
rollbackLog := copyReceipt(feelog) rollbackLog := copyReceipt(feelog)
e.stateDB.Begin() e.stateDB.Begin()
e.localDB.Begin()
receipts := make([]*types.Receipt, len(txs)) receipts := make([]*types.Receipt, len(txs))
for i := 1; i < len(txs); i++ { for i := 1; i < len(txs); i++ {
receipts[i] = &types.Receipt{Ty: types.ExecPack} receipts[i] = &types.Receipt{Ty: types.ExecPack}
...@@ -253,8 +253,10 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re ...@@ -253,8 +253,10 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re
if types.IsFork(e.height, "ForkExecRollback") { if types.IsFork(e.height, "ForkExecRollback") {
e.stateDB.Rollback() e.stateDB.Rollback()
} }
e.localDB.Rollback()
return receipts, nil return receipts, nil
} }
exec.execLocalSameTime(e, txs[0], receipts[0], index)
for i := 1; i < len(txs); i++ { for i := 1; i < len(txs); i++ {
//如果有一笔执行失败了,那么全部回滚 //如果有一笔执行失败了,那么全部回滚
receipts[i], err = e.execTxOne(receipts[i], txs[i], index+i) receipts[i], err = e.execTxOne(receipts[i], txs[i], index+i)
...@@ -272,10 +274,13 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re ...@@ -272,10 +274,13 @@ func (e *executor) execTxGroup(txs []*types.Transaction, index int) ([]*types.Re
} }
//撤销所有的数据库更新 //撤销所有的数据库更新
e.stateDB.Rollback() e.stateDB.Rollback()
e.localDB.Rollback()
return receipts, nil return receipts, nil
} }
exec.execLocalSameTime(e, txs[i], receipts[i], index+i)
} }
e.stateDB.Commit() e.stateDB.Commit()
e.localDB.Commit()
return receipts, nil return receipts, nil
} }
......
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"github.com/33cn/chain33/account" "github.com/33cn/chain33/account"
"github.com/33cn/chain33/client/api" "github.com/33cn/chain33/client/api"
dbm "github.com/33cn/chain33/common/db"
clog "github.com/33cn/chain33/common/log" clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15" log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/pluginmgr" "github.com/33cn/chain33/pluginmgr"
...@@ -39,6 +40,7 @@ func DisableLog() { ...@@ -39,6 +40,7 @@ func DisableLog() {
// Executor executor struct // Executor executor struct
type Executor struct { type Executor struct {
disableLocal bool
client queue.Client client queue.Client
qclient client.QueueProtocolAPI qclient client.QueueProtocolAPI
grpccli types.Chain33Client grpccli types.Chain33Client
...@@ -142,8 +144,12 @@ func (exec *Executor) procExecQuery(msg queue.Message) { ...@@ -142,8 +144,12 @@ func (exec *Executor) procExecQuery(msg queue.Message) {
if data.StateHash == nil { if data.StateHash == nil {
data.StateHash = header.StateHash data.StateHash = header.StateHash
} }
localdb := NewLocalDB(exec.client) var localdb dbm.KVDB
if !exec.disableLocal {
localdb = NewLocalDB(exec.client)
defer localdb.(*LocalDB).Close()
driver.SetLocalDB(localdb) driver.SetLocalDB(localdb)
}
opt := &StateDBOption{EnableMVCC: exec.pluginEnable["mvcc"], Height: header.GetHeight()} opt := &StateDBOption{EnableMVCC: exec.pluginEnable["mvcc"], Height: header.GetHeight()}
db := NewStateDB(exec.client, data.StateHash, localdb, opt) db := NewStateDB(exec.client, data.StateHash, localdb, opt)
...@@ -173,7 +179,12 @@ func (exec *Executor) procExecCheckTx(msg queue.Message) { ...@@ -173,7 +179,12 @@ func (exec *Executor) procExecCheckTx(msg queue.Message) {
mainHeight: datas.MainHeight, mainHeight: datas.MainHeight,
parentHash: datas.ParentHash, parentHash: datas.ParentHash,
} }
execute := newExecutor(ctx, exec, datas.Txs, nil) var localdb dbm.KVDB
if !exec.disableLocal {
localdb = NewLocalDB(exec.client)
defer localdb.(*LocalDB).Close()
}
execute := newExecutor(ctx, exec, localdb, datas.Txs, nil)
execute.enableMVCC(nil) execute.enableMVCC(nil)
//返回一个列表表示成功还是失败 //返回一个列表表示成功还是失败
result := &types.ReceiptCheckTxList{} result := &types.ReceiptCheckTxList{}
...@@ -204,7 +215,12 @@ func (exec *Executor) procExecTxList(msg queue.Message) { ...@@ -204,7 +215,12 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
mainHeight: datas.MainHeight, mainHeight: datas.MainHeight,
parentHash: datas.ParentHash, parentHash: datas.ParentHash,
} }
execute := newExecutor(ctx, exec, datas.Txs, nil) var localdb dbm.KVDB
if !exec.disableLocal {
localdb = NewLocalDB(exec.client)
defer localdb.(*LocalDB).Close()
}
execute := newExecutor(ctx, exec, localdb, datas.Txs, nil)
execute.enableMVCC(nil) execute.enableMVCC(nil)
var receipts []*types.Receipt var receipts []*types.Receipt
index := 0 index := 0
...@@ -225,6 +241,8 @@ func (exec *Executor) procExecTxList(msg queue.Message) { ...@@ -225,6 +241,8 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
receipts = append(receipts, types.NewErrReceipt(err)) receipts = append(receipts, types.NewErrReceipt(err))
continue continue
} }
//update local
exec.execLocalSameTime(execute, tx, receipt, index)
receipts = append(receipts, receipt) receipts = append(receipts, receipt)
index++ index++
continue continue
...@@ -239,7 +257,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) { ...@@ -239,7 +257,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
receipts = append(receipts, types.NewErrReceipt(types.ErrTxGroupCount)) receipts = append(receipts, types.NewErrReceipt(types.ErrTxGroupCount))
continue continue
} }
receiptlist, err := execute.execTxGroup(datas.Txs[i:i+int(tx.GroupCount)], index) receiptlist, err := execute.execTxGroup(exec, datas.Txs[i:i+int(tx.GroupCount)], index)
i = i + int(tx.GroupCount) - 1 i = i + int(tx.GroupCount) - 1
if len(receiptlist) > 0 && len(receiptlist) != int(tx.GroupCount) { if len(receiptlist) > 0 && len(receiptlist) != int(tx.GroupCount) {
panic("len(receiptlist) must be equal tx.GroupCount") panic("len(receiptlist) must be equal tx.GroupCount")
...@@ -261,6 +279,22 @@ func (exec *Executor) procExecTxList(msg queue.Message) { ...@@ -261,6 +279,22 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
&types.Receipts{Receipts: receipts})) &types.Receipts{Receipts: receipts}))
} }
func (exec *Executor) execLocalSameTime(execute *executor, tx *types.Transaction, receipt *types.Receipt, index int) {
e := execute.loadDriver(tx, index)
if e.ExecutorOrder() == drivers.ExecLocalSameTime {
var r = &types.ReceiptData{}
if receipt != nil {
r.Ty = receipt.Ty
r.Logs = receipt.Logs
}
_, err := exec.execLocalTx(execute, tx, r, index)
//ignore err, only print err
if err != nil {
elog.Debug("ExecLocal Same Time", "err", err)
}
}
}
func (exec *Executor) procExecAddBlock(msg queue.Message) { func (exec *Executor) procExecAddBlock(msg queue.Message) {
datas := msg.GetData().(*types.BlockDetail) datas := msg.GetData().(*types.BlockDetail)
b := datas.Block b := datas.Block
...@@ -273,7 +307,12 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) { ...@@ -273,7 +307,12 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) {
mainHeight: b.MainHeight, mainHeight: b.MainHeight,
parentHash: b.ParentHash, parentHash: b.ParentHash,
} }
execute := newExecutor(ctx, exec, b.Txs, datas.Receipts) var localdb dbm.KVDB
if !exec.disableLocal {
localdb = NewLocalDB(exec.client)
defer localdb.(*LocalDB).Close()
}
execute := newExecutor(ctx, exec, localdb, b.Txs, datas.Receipts)
//因为mvcc 还没有写入,所以目前的mvcc版本是前一个区块的版本 //因为mvcc 还没有写入,所以目前的mvcc版本是前一个区块的版本
execute.enableMVCC(datas.PrevStatusHash) execute.enableMVCC(datas.PrevStatusHash)
var kvset types.LocalDBSet var kvset types.LocalDBSet
...@@ -305,27 +344,36 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) { ...@@ -305,27 +344,36 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) {
} }
for i := 0; i < len(b.Txs); i++ { for i := 0; i < len(b.Txs); i++ {
tx := b.Txs[i] tx := b.Txs[i]
kv, err := execute.execLocal(tx, datas.Receipts[i], i) kv, err := exec.execLocalTx(execute, tx, datas.Receipts[i], i)
if err == types.ErrActionNotSupport {
continue
}
if err != nil { if err != nil {
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, err)) msg.Reply(exec.client.NewMessage("", types.EventAddBlock, err))
return return
} }
if kv != nil && kv.KV != nil { if kv != nil && kv.KV != nil {
kvset.KV = append(kvset.KV, kv.KV...)
}
}
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, &kvset))
}
func (exec *Executor) execLocalTx(execute *executor, tx *types.Transaction, r *types.ReceiptData, index int) (*types.LocalDBSet, error) {
kv, err := execute.execLocal(tx, r, index)
if err == types.ErrActionNotSupport {
return nil, nil
}
if err != nil {
return nil, err
}
if kv != nil && kv.KV != nil {
err := exec.checkPrefix(tx.Execer, kv.KV) err := exec.checkPrefix(tx.Execer, kv.KV)
if err != nil { if err != nil {
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, err)) return nil, err
return
} }
kvset.KV = append(kvset.KV, kv.KV...)
for _, kv := range kv.KV { for _, kv := range kv.KV {
execute.localDB.Set(kv.Key, kv.Value) execute.localDB.Set(kv.Key, kv.Value)
} }
} }
} return kv, nil
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, &kvset))
} }
func (exec *Executor) procExecDelBlock(msg queue.Message) { func (exec *Executor) procExecDelBlock(msg queue.Message) {
...@@ -340,7 +388,12 @@ func (exec *Executor) procExecDelBlock(msg queue.Message) { ...@@ -340,7 +388,12 @@ func (exec *Executor) procExecDelBlock(msg queue.Message) {
mainHeight: b.MainHeight, mainHeight: b.MainHeight,
parentHash: b.ParentHash, parentHash: b.ParentHash,
} }
execute := newExecutor(ctx, exec, b.Txs, nil) var localdb dbm.KVDB
if !exec.disableLocal {
localdb = NewLocalDB(exec.client)
defer localdb.(*LocalDB).Close()
}
execute := newExecutor(ctx, exec, localdb, b.Txs, nil)
execute.enableMVCC(nil) execute.enableMVCC(nil)
var kvset types.LocalDBSet var kvset types.LocalDBSet
for _, kv := range datas.KV { for _, kv := range datas.KV {
......
...@@ -6,13 +6,16 @@ package executor_test ...@@ -6,13 +6,16 @@ package executor_test
import ( import (
"errors" "errors"
"fmt"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"testing" "testing"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address" "github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/merkle" "github.com/33cn/chain33/common/merkle"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system"
drivers "github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/33cn/chain33/util" "github.com/33cn/chain33/util"
"github.com/33cn/chain33/util/testnode" "github.com/33cn/chain33/util/testnode"
...@@ -20,6 +23,8 @@ import ( ...@@ -20,6 +23,8 @@ import (
) )
func init() { func init() {
drivers.Register("demo2", newdemoApp, 1)
types.AllowUserExec = append(types.AllowUserExec, []byte("demo2"))
go func() { go func() {
http.ListenAndServe("localhost:6060", nil) http.ListenAndServe("localhost:6060", nil)
}() }()
...@@ -271,3 +276,129 @@ func BenchmarkExecBlock(b *testing.B) { ...@@ -271,3 +276,129 @@ func BenchmarkExecBlock(b *testing.B) {
util.ExecBlock(mock33.GetClient(), block0.StateHash, block, false, true) util.ExecBlock(mock33.GetClient(), block0.StateHash, block, false, true)
} }
} }
/*
ExecLocalSameTime test
*/
type demoApp struct {
*drivers.DriverBase
}
func newdemoApp() drivers.Driver {
demo := &demoApp{DriverBase: &drivers.DriverBase{}}
demo.SetChild(demo)
return demo
}
func (demo *demoApp) GetDriverName() string {
return "demo2"
}
var orderflag = drivers.ExecLocalSameTime
func (demo *demoApp) ExecutorOrder() int64 {
return orderflag
}
func (demo *demoApp) Exec(tx *types.Transaction, index int) (receipt *types.Receipt, err error) {
addr := tx.From()
id := common.ToHex(tx.Hash())
values, err := demo.GetLocalDB().List(demoCalcLocalKey(addr, ""), nil, 0, 0)
if err != nil && err != types.ErrNotFound {
return nil, err
}
receipt = &types.Receipt{Ty: types.ExecOk}
receipt.KV = append(receipt.KV, &types.KeyValue{
Key: demoCalcStateKey(addr, id),
Value: []byte(fmt.Sprint(len(values))),
})
receipt.Logs = append(receipt.Logs, &types.ReceiptLog{Ty: int32(len(values))})
return receipt, nil
}
func (demo *demoApp) ExecLocal(tx *types.Transaction, receipt *types.ReceiptData, index int) (localkv *types.LocalDBSet, err error) {
localkv = &types.LocalDBSet{}
addr := tx.From()
id := common.ToHex(tx.Hash())
localkv.KV = append(localkv.KV, &types.KeyValue{
Key: demoCalcLocalKey(addr, id),
Value: tx.Hash(),
})
return localkv, nil
}
func demoCalcStateKey(addr string, id string) []byte {
key := append([]byte("mavl-demo2-"), []byte(addr)...)
key = append(key, []byte(":")...)
key = append(key, []byte(id)...)
return key
}
func demoCalcLocalKey(addr string, id string) []byte {
key := append([]byte("LODB-demo2-"), []byte(addr)...)
key = append(key, []byte(":")...)
if len(id) > 0 {
key = append(key, []byte(id)...)
}
return key
}
func TestExecLocalSameTime1(t *testing.T) {
mock33 := newMockNode()
defer mock33.Close()
orderflag = 1
genkey := mock33.GetGenesisKey()
genaddr := mock33.GetGenesisAddress()
mock33.WaitHeight(0)
block := mock33.GetBlock(0)
assert.Equal(t, mock33.GetAccount(block.StateHash, genaddr).Balance, 100000000*types.Coin)
var txs []*types.Transaction
addr1, priv1 := util.Genaddress()
txs = append(txs, util.CreateCoinsTx(genkey, addr1, 1e8))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
if err != nil {
t.Error(err)
return
}
for i, receipt := range detail.Receipts {
assert.Equal(t, receipt.GetTy(), int32(2), fmt.Sprint(i))
if i >= 1 {
fmt.Println(receipt)
assert.Equal(t, len(receipt.Logs), 2)
assert.Equal(t, receipt.Logs[1].Ty, int32(i)-1)
}
}
}
func TestExecLocalSameTime0(t *testing.T) {
mock33 := newMockNode()
defer mock33.Close()
orderflag = 0
genkey := mock33.GetGenesisKey()
genaddr := mock33.GetGenesisAddress()
mock33.WaitHeight(0)
block := mock33.GetBlock(0)
assert.Equal(t, mock33.GetAccount(block.StateHash, genaddr).Balance, 100000000*types.Coin)
var txs []*types.Transaction
addr1, priv1 := util.Genaddress()
txs = append(txs, util.CreateCoinsTx(genkey, addr1, 1e8))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
txs = append(txs, util.CreateTxWithExecer(priv1, "demo2"))
block2 := util.CreateNewBlock(block, txs)
detail, _, err := util.ExecBlock(mock33.GetClient(), block.StateHash, block2, false, true)
if err != nil {
t.Error(err)
return
}
for i, receipt := range detail.Receipts {
assert.Equal(t, receipt.GetTy(), int32(2), fmt.Sprint(i))
if i >= 1 {
fmt.Println(receipt)
assert.Equal(t, len(receipt.Logs), 2)
assert.Equal(t, receipt.Logs[1].Ty, int32(0))
}
}
}
...@@ -58,7 +58,7 @@ func TestExecutorGetTxGroup(t *testing.T) { ...@@ -58,7 +58,7 @@ func TestExecutorGetTxGroup(t *testing.T) {
mainHash: nil, mainHash: nil,
parentHash: nil, parentHash: nil,
} }
execute := newExecutor(ctx, exec, txs, nil) execute := newExecutor(ctx, exec, nil, txs, nil)
e := execute.loadDriver(txs[0], 0) e := execute.loadDriver(txs[0], 0)
execute.setEnv(e) execute.setEnv(e)
txs2 := e.GetTxs() txs2 := e.GetTxs()
...@@ -73,7 +73,7 @@ func TestExecutorGetTxGroup(t *testing.T) { ...@@ -73,7 +73,7 @@ func TestExecutorGetTxGroup(t *testing.T) {
//err tx group list //err tx group list
txs[0].Header = nil txs[0].Header = nil
execute = newExecutor(ctx, exec, txs, nil) execute = newExecutor(ctx, exec, nil, txs, nil)
e = execute.loadDriver(txs[0], 0) e = execute.loadDriver(txs[0], 0)
execute.setEnv(e) execute.setEnv(e)
_, err = e.GetTxGroup(len(txs) - 1) _, err = e.GetTxGroup(len(txs) - 1)
...@@ -180,7 +180,7 @@ func TestExecutorErrAPIEnv(t *testing.T) { ...@@ -180,7 +180,7 @@ func TestExecutorErrAPIEnv(t *testing.T) {
types.SetMinFee(0) types.SetMinFee(0)
defer types.SetMinFee(minfee) defer types.SetMinFee(minfee)
q := queue.New("channel") q := queue.New("channel")
exec := &Executor{client: q.Client()} exec := &Executor{client: q.Client(), disableLocal: true}
execInit(nil) execInit(nil)
var txs []*types.Transaction var txs []*types.Transaction
genkey := util.TestPrivkeyList[0] genkey := util.TestPrivkeyList[0]
......
package executor package executor
import ( import (
"github.com/33cn/chain33/client"
"github.com/33cn/chain33/common/db" "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
) )
// LocalDB local db for store key value in local //LocalDB 本地数据库,类似localdb,不加入区块链的状态。
//数据库只读,不能落盘
//数据的get set 主要经过 cache
//如果需要进行list, 那么把get set 的内容加入到 后端数据库
type LocalDB struct { type LocalDB struct {
cache map[string][]byte cache map[string][]byte
txcache map[string][]byte txcache map[string][]byte
keys []string keys []string
intx bool intx bool
kvs []*types.KeyValue
txid *types.Int64
client queue.Client client queue.Client
api client.QueueProtocolAPI
} }
// NewLocalDB new local db //NewLocalDB 创建一个新的LocalDB
func NewLocalDB(client queue.Client) db.KVDB { func NewLocalDB(cli queue.Client) db.KVDB {
return &LocalDB{cache: make(map[string][]byte), client: client} api, err := client.New(cli, nil)
if err != nil {
panic(err)
}
txid, err := api.LocalNew(nil)
if err != nil {
panic(err)
}
return &LocalDB{
cache: make(map[string][]byte),
txid: txid,
client: cli,
api: api,
}
} }
// Get get value from local db func (l *LocalDB) resetTx() {
func (l *LocalDB) Get(key []byte) ([]byte, error) { l.intx = false
value, err := l.get(key) l.txcache = nil
debugAccount("==lget==", key, value) l.keys = nil
return value, err }
//Begin 开始一个事务
func (l *LocalDB) Begin() {
l.intx = true
l.keys = nil
l.txcache = nil
err := l.api.LocalBegin(l.txid)
if err != nil {
panic(err)
}
}
func (l *LocalDB) save() error {
if l.kvs != nil {
param := &types.LocalDBSet{Txid: l.txid.Data}
param.KV = l.kvs
err := l.api.LocalSet(param)
if err != nil {
return err
}
l.kvs = nil
}
return nil
}
//Commit 提交一个事务
func (l *LocalDB) Commit() error {
for k, v := range l.txcache {
l.cache[k] = v
}
err := l.save()
if err != nil {
return err
}
l.resetTx()
err = l.api.LocalCommit(l.txid)
return err
}
//Close 提交一个事务
func (l *LocalDB) Close() error {
l.cache = nil
l.resetTx()
err := l.api.LocalClose(l.txid)
return err
}
//Rollback 回滚修改
func (l *LocalDB) Rollback() {
l.resetTx()
err := l.api.LocalRollback(l.txid)
if err != nil {
panic(err)
}
} }
func (l *LocalDB) get(key []byte) ([]byte, error) { //Get 获取key
func (l *LocalDB) Get(key []byte) ([]byte, error) {
skey := string(key) skey := string(key)
if l.intx && l.txcache != nil { if l.intx && l.txcache != nil {
if value, ok := l.txcache[skey]; ok { if value, ok := l.txcache[skey]; ok {
...@@ -35,34 +110,31 @@ func (l *LocalDB) get(key []byte) ([]byte, error) { ...@@ -35,34 +110,31 @@ func (l *LocalDB) get(key []byte) ([]byte, error) {
} }
} }
if value, ok := l.cache[skey]; ok { if value, ok := l.cache[skey]; ok {
return value, nil if value == nil {
}
if l.client == nil {
return nil, types.ErrNotFound return nil, types.ErrNotFound
} }
query := &types.LocalDBGet{Keys: [][]byte{key}} return value, nil
msg := l.client.NewMessage("blockchain", types.EventLocalGet, query) }
l.client.Send(msg, true) query := &types.LocalDBGet{Txid: l.txid.Data, Keys: [][]byte{key}}
resp, err := l.client.Wait(msg) resp, err := l.api.LocalGet(query)
if err != nil { if err != nil {
panic(err) //no happen for ever panic(err) //no happen for ever
} }
if nil == resp.GetData().(*types.LocalReplyValue).Values { if nil == resp.Values {
l.cache[string(key)] = nil
return nil, types.ErrNotFound return nil, types.ErrNotFound
} }
value := resp.GetData().(*types.LocalReplyValue).Values[0] value := resp.Values[0]
if value == nil { if value == nil {
//panic(string(key)) l.cache[string(key)] = nil
return nil, types.ErrNotFound return nil, types.ErrNotFound
} }
l.cache[string(key)] = value l.cache[string(key)] = value
return value, nil return value, nil
} }
// Set set key value to local db //Set 获取key
func (l *LocalDB) Set(key []byte, value []byte) error { func (l *LocalDB) Set(key []byte, value []byte) error {
debugAccount("==lset==", key, value)
skey := string(key) skey := string(key)
if l.intx { if l.intx {
if l.txcache == nil { if l.txcache == nil {
...@@ -73,22 +145,22 @@ func (l *LocalDB) Set(key []byte, value []byte) error { ...@@ -73,22 +145,22 @@ func (l *LocalDB) Set(key []byte, value []byte) error {
} else { } else {
setmap(l.cache, skey, value) setmap(l.cache, skey, value)
} }
l.kvs = append(l.kvs, &types.KeyValue{Key: key, Value: value})
return nil return nil
} }
// List 从数据库中查询数据列表,set 中的cache 更新不会影响这个list // List 从数据库中查询数据列表
func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) { func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, error) {
if l.client == nil { err := l.save()
return nil, types.ErrNotFound if err != nil {
return nil, err
} }
query := &types.LocalDBList{Prefix: prefix, Key: key, Count: count, Direction: direction} query := &types.LocalDBList{Txid: l.txid.Data, Prefix: prefix, Key: key, Count: count, Direction: direction}
msg := l.client.NewMessage("blockchain", types.EventLocalList, query) resp, err := l.api.LocalList(query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil { if err != nil {
panic(err) //no happen for ever panic(err) //no happen for ever
} }
values := resp.GetData().(*types.LocalReplyValue).Values values := resp.Values
if values == nil { if values == nil {
//panic(string(key)) //panic(string(key))
return nil, types.ErrNotFound return nil, types.ErrNotFound
...@@ -98,43 +170,5 @@ func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, er ...@@ -98,43 +170,5 @@ func (l *LocalDB) List(prefix, key []byte, count, direction int32) ([][]byte, er
// PrefixCount 从数据库中查询指定前缀的key的数量 // PrefixCount 从数据库中查询指定前缀的key的数量
func (l *LocalDB) PrefixCount(prefix []byte) (count int64) { func (l *LocalDB) PrefixCount(prefix []byte) (count int64) {
if l.client == nil { panic("localdb not support PrefixCount")
return 0
}
query := &types.ReqKey{Key: prefix}
msg := l.client.NewMessage("blockchain", types.EventLocalPrefixCount, query)
l.client.Send(msg, true)
resp, err := l.client.Wait(msg)
if err != nil {
panic(err) //no happen for ever
}
count = resp.GetData().(*types.Int64).Data
return
}
//Begin 开启内存事务处理
func (l *LocalDB) Begin() {
l.intx = true
l.keys = nil
l.txcache = nil
}
// Rollback reset tx
func (l *LocalDB) Rollback() {
l.resetTx()
}
// Commit canche tx
func (l *LocalDB) Commit() error {
for k, v := range l.txcache {
l.cache[k] = v
}
l.resetTx()
return nil
}
func (l *LocalDB) resetTx() {
l.intx = false
l.txcache = nil
l.keys = nil
} }
package executor_test
import (
"testing"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/executor"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
)
func TestLocalDBGet(t *testing.T) {
mock33 := testnode.New("", nil)
defer mock33.Close()
db := executor.NewLocalDB(mock33.GetClient())
defer db.(*executor.LocalDB).Close()
testDBGet(t, db)
}
func BenchmarkLocalDBGet(b *testing.B) {
mock33 := testnode.New("", nil)
defer mock33.Close()
db := executor.NewLocalDB(mock33.GetClient())
defer db.(*executor.LocalDB).Close()
err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(b, err)
b.StartTimer()
for i := 0; i < b.N; i++ {
v, err := db.Get([]byte("k1"))
assert.Nil(b, err)
assert.Equal(b, v, []byte("v1"))
}
}
func TestLocalDBTxGet(t *testing.T) {
mock33 := testnode.New("", nil)
defer mock33.Close()
db := executor.NewLocalDB(mock33.GetClient())
testTxGet(t, db)
}
func testDBGet(t *testing.T, db dbm.KV) {
err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(t, err)
v, err := db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
err = db.Set([]byte("k1"), []byte("v11"))
assert.Nil(t, err)
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v11"))
}
func testTxGet(t *testing.T, db dbm.KV) {
//新版本
db.Begin()
err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(t, err)
v, err := db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
db.Commit()
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
//在非transaction中set,直接set成功,不能rollback
err = db.Set([]byte("k1"), []byte("v11"))
assert.Nil(t, err)
db.Begin()
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v11"))
err = db.Set([]byte("k1"), []byte("v12"))
assert.Nil(t, err)
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v12"))
db.Rollback()
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v11"))
}
func TestLocalDB(t *testing.T) {
mock33 := testnode.New("", nil)
defer mock33.Close()
db := executor.NewLocalDB(mock33.GetClient())
defer db.(*executor.LocalDB).Close()
err := db.Set([]byte("k1"), []byte("v1"))
assert.Nil(t, err)
v, err := db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v1"))
err = db.Set([]byte("k1"), []byte("v11"))
assert.Nil(t, err)
v, err = db.Get([]byte("k1"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v11"))
//beigin and rollback not imp
db.Begin()
err = db.Set([]byte("k2"), []byte("v2"))
assert.Nil(t, err)
db.Rollback()
_, err = db.Get([]byte("k2"))
assert.Equal(t, err, types.ErrNotFound)
err = db.Set([]byte("k2"), []byte("v2"))
assert.Nil(t, err)
//get
v, err = db.Get([]byte("k2"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v2"))
//list
values, err := db.List([]byte("k"), nil, 0, 0)
assert.Nil(t, err)
assert.Equal(t, len(values), 2)
assert.Equal(t, string(values[0]), "v2")
assert.Equal(t, string(values[1]), "v11")
err = db.Commit()
assert.Nil(t, err)
//get
v, err = db.Get([]byte("k2"))
assert.Nil(t, err)
assert.Equal(t, v, []byte("v2"))
//list
values, err = db.List([]byte("k"), nil, 0, 0)
assert.Nil(t, err)
assert.Equal(t, len(values), 2)
assert.Equal(t, string(values[0]), "v2")
assert.Equal(t, string(values[1]), "v11")
}
...@@ -258,7 +258,10 @@ func autoMine(cmd *cobra.Command, args []string) { ...@@ -258,7 +258,10 @@ func autoMine(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr") rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
flag, _ := cmd.Flags().GetInt32("flag") flag, _ := cmd.Flags().GetInt32("flag")
if flag != 0 && flag != 1 { if flag != 0 && flag != 1 {
cmd.UsageFunc()(cmd) err := cmd.UsageFunc()(cmd)
if err != nil {
fmt.Fprintln(os.Stderr, err)
}
return return
} }
params := struct { params := struct {
......
...@@ -29,6 +29,9 @@ const ( ...@@ -29,6 +29,9 @@ const (
TxIndexFrom = 1 TxIndexFrom = 1
// TxIndexTo transaction index to // TxIndexTo transaction index to
TxIndexTo = 2 TxIndexTo = 2
//ExecLocalSameTime Exec 的时候 同时执行 ExecLocal
ExecLocalSameTime = int64(1)
) )
// Driver defines some interface // Driver defines some interface
...@@ -69,6 +72,7 @@ type Driver interface { ...@@ -69,6 +72,7 @@ type Driver interface {
GetFuncMap() map[string]reflect.Method GetFuncMap() map[string]reflect.Method
GetExecutorType() types.ExecutorType GetExecutorType() types.ExecutorType
CheckReceiptExecOk() bool CheckReceiptExecOk() bool
ExecutorOrder() int64
} }
// DriverBase defines driverbase type // DriverBase defines driverbase type
...@@ -106,6 +110,12 @@ func (d *DriverBase) GetExecutorType() types.ExecutorType { ...@@ -106,6 +110,12 @@ func (d *DriverBase) GetExecutorType() types.ExecutorType {
return d.ety return d.ety
} }
//ExecutorOrder 执行顺序, 如果要使用 ExecLocalSameTime
//那么会同时执行 ExecLocal
func (d *DriverBase) ExecutorOrder() int64 {
return 0
}
//GetLastHash 获取最后区块的hash,主链和平行链不同 //GetLastHash 获取最后区块的hash,主链和平行链不同
func (d *DriverBase) GetLastHash() []byte { func (d *DriverBase) GetLastHash() []byte {
if types.IsPara() { if types.IsPara() {
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package executor_test package executor_test
import ( import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
"testing"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/limits"
"github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/executor"
"github.com/33cn/chain33/queue"
_ "github.com/33cn/chain33/system" _ "github.com/33cn/chain33/system"
pty "github.com/33cn/chain33/system/dapp/manage/types"
"github.com/33cn/chain33/types"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
) )
var (
isManageTest = false
execNameMa = "user.p.guodun.manage"
feeForToken int64 = 1e6
ErrTest = errors.New("ErrTest")
addr1 string
mainNetgrpcAddr = "localhost:8802"
ParaNetgrpcAddr = "localhost:8902"
mainClient types.Chain33Client
paraClient types.Chain33Client
random *rand.Rand
zeroHash [32]byte
cfg *types.Config
addr string
genkey crypto.PrivKey
privGenesis crypto.PrivKey
privkeySuper crypto.PrivKey
)
func init() {
go func() {
http.ListenAndServe("localhost:6060", nil)
}()
types.Init("local", nil)
err := limits.SetLimits()
if err != nil {
panic(err)
}
conn, err := grpc.Dial(mainNetgrpcAddr, grpc.WithInsecure())
if err != nil {
panic(err)
}
mainClient = types.NewChain33Client(conn)
conn, err = grpc.Dial(ParaNetgrpcAddr, grpc.WithInsecure())
if err != nil {
panic(err)
}
paraClient = types.NewChain33Client(conn)
random = rand.New(rand.NewSource(types.Now().UnixNano()))
genkey = getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944")
log.SetLogLevel("error")
privGenesis = getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944")
privkeySuper = getprivkey("4a92f3700920dc422c8ba993020d26b54711ef9b3d74deab7c3df055218ded42")
}
func getprivkey(key string) crypto.PrivKey {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
panic(err)
}
bkey, err := common.FromHex(key)
if err != nil {
panic(err)
}
priv, err := cr.PrivKeyFromBytes(bkey)
if err != nil {
panic(err)
}
return priv
}
func initUnitEnv() (queue.Queue, *executor.Executor) {
var q = queue.New("channel")
cfg, sub := types.InitCfg("../../../../cmd/chain33/chain33.test.toml")
exec := executor.New(cfg.Exec, sub.Exec)
exec.SetQueueClient(q.Client())
return q, exec
}
func genaddress() (string, crypto.PrivKey) {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
panic(err)
}
privto, err := cr.GenKey()
if err != nil {
panic(err)
}
addrto := address.PubKeyToAddress(privto.PubKey().Bytes())
return addrto.String(), privto
}
func createTxEx(priv crypto.PrivKey, to string, amount int64, ty int32, execer string) *types.Transaction {
var tx *types.Transaction
switch execer {
case "manage":
v := &types.ModifyConfig{}
modify := &pty.ManageAction{
Ty: ty,
Value: &pty.ManageAction_Modify{Modify: v},
}
tx = &types.Transaction{Execer: []byte("manage"), Payload: types.Encode(modify)}
default:
return nil
}
tx.Nonce = random.Int63()
//tx.To = address.ExecAddress(execer).String()
tx.Sign(types.SECP256K1, priv)
return tx
}
func genTxsEx(n int64, ty int32, execer string) (txs []*types.Transaction) {
_, priv := genaddress()
to, _ := genaddress()
for i := 0; i < int(n); i++ {
txs = append(txs, createTxEx(priv, to, types.Coin*(n+1), ty, execer))
}
return txs
}
func createBlockEx(n int64, ty int32, execer string) *types.Block {
newblock := &types.Block{}
newblock.Height = -1
newblock.BlockTime = types.Now().Unix()
newblock.ParentHash = zeroHash[:]
newblock.Txs = genTxsEx(n, ty, execer)
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
return newblock
}
func constructionBlockDetail(block *types.Block, height int64, txcount int) *types.BlockDetail {
var blockdetail types.BlockDetail
blockdetail.Receipts = make([]*types.ReceiptData, txcount)
for j := 0; j < txcount; j++ {
ReceiptData := types.ReceiptData{Ty: 2}
blockdetail.Receipts[j] = &ReceiptData
}
blockdetail.Block = block
return &blockdetail
}
func genExecTxListMsg(client queue.Client, block *types.Block) queue.Message {
list := &types.ExecTxList{
StateHash: zeroHash[:],
Txs: block.Txs,
BlockTime: block.BlockTime,
Height: block.Height,
}
msg := client.NewMessage("execs", types.EventExecTxList, list)
return msg
}
func genExecCheckTxMsg(client queue.Client, block *types.Block) queue.Message {
list := &types.ExecTxList{
StateHash: zeroHash[:],
Txs: block.Txs,
BlockTime: block.BlockTime,
Height: block.Height,
}
msg := client.NewMessage("execs", types.EventCheckTx, list)
return msg
}
func genEventAddBlockMsg(client queue.Client, block *types.Block) queue.Message {
blockDetail := constructionBlockDetail(block, 1, 1)
msg := client.NewMessage("execs", types.EventAddBlock, blockDetail)
return msg
}
func genEventDelBlockMsg(client queue.Client, block *types.Block) queue.Message {
blockDetail := constructionBlockDetail(block, 1, 1)
msg := client.NewMessage("execs", types.EventDelBlock, blockDetail)
return msg
}
//"coins", "GetTxsByAddr",
func genEventBlockChainQueryMsg(client queue.Client, param []byte, strDriver string, strFunName string) queue.Message {
blockChainQue := &types.ChainExecutor{
Driver: strDriver,
FuncName: strFunName,
StateHash: zeroHash[:],
Param: param,
}
msg := client.NewMessage("execs", types.EventBlockChainQuery, blockChainQue)
return msg
}
func storeProcess(q queue.Queue) {
go func() {
client := q.Client()
client.Sub("store")
for msg := range client.Recv() {
switch msg.Ty {
case types.EventStoreGet:
datas := msg.GetData().(*types.StoreGet)
//fmt.Println("EventStoreGet Keys[0] = %s", string(datas.Keys[0]))
var value []byte
if strings.Contains(string(datas.Keys[0]), "this type ***") { //这种type结构体走该分支
item := &types.ConfigItem{
Key: "111111111111111",
Addr: "2222222222222",
Value: &types.ConfigItem_Arr{
Arr: &types.ArrayConfig{}},
}
value = types.Encode(item)
} else {
account := &types.Account{
Balance: 1000 * 1e8,
Addr: addr1,
}
value = types.Encode(account)
}
values := make([][]byte, 2)
values = append(values[:0], value)
msg.Reply(client.NewMessage("", types.EventStoreGetReply, &types.StoreReplyValue{Values: values}))
case types.EventStoreGetTotalCoins:
req := msg.GetData().(*types.IterateRangeByStateHash)
resp := &types.ReplyGetTotalCoins{}
resp.Count = req.Count
msg.Reply(client.NewMessage("", types.EventGetTotalCoinsReply, resp))
default:
msg.ReplyErr("Do not support", types.ErrNotSupport)
}
}
}()
}
func blockchainProcess(q queue.Queue) {
go func() {
client := q.Client()
client.Sub("blockchain")
for msg := range client.Recv() {
switch msg.Ty {
case types.EventGetLastHeader:
header := &types.Header{StateHash: []byte("111111111111111111111"), Height: 1}
msg.Reply(client.NewMessage("", types.EventHeader, header))
case types.EventLocalGet:
//fmt.Println("EventLocalGet rsp")
msg.Reply(client.NewMessage("", types.EventLocalReplyValue, &types.LocalReplyValue{}))
case types.EventLocalList:
var values [][]byte
msg.Reply(client.NewMessage("", types.EventLocalReplyValue, &types.LocalReplyValue{Values: values}))
case types.EventLocalPrefixCount:
msg.Reply(client.NewMessage("", types.EventLocalReplyValue, &types.Int64{Data: 0}))
default:
msg.ReplyErr("Do not support", types.ErrNotSupport)
}
}
}()
}
func createBlockChainQueryRq(execer string, funcName string) proto.Message {
switch execer {
case "manage":
{
if funcName == "GetConfigItem" {
in := &types.ReqString{Data: "this type ***"}
return in
}
}
default:
return nil
}
return nil
}
func TestQueueClient(t *testing.T) {
q, _ := initUnitEnv()
storeProcess(q)
blockchainProcess(q)
var txNum int64 = 1
var msgs []queue.Message
var msg queue.Message
var block *types.Block
execTy := [][]string{
{"manage", strconv.Itoa(pty.ManageActionModifyConfig)},
}
for _, str := range execTy {
ty, _ := strconv.Atoi(str[1])
block = createBlockEx(txNum, int32(ty), str[0])
addr1 = block.Txs[0].From() //将获取随机生成交易地址
// 1、测试 EventExecTxList 消息
msg = genExecTxListMsg(q.Client(), block)
msgs = append(msgs, msg)
//2、测试 EventAddBlock 消息
msg = genEventAddBlockMsg(q.Client(), block)
msgs = append(msgs, msg)
// 3、测试 EventDelBlock 消息
msg = genEventDelBlockMsg(q.Client(), block)
msgs = append(msgs, msg)
// 4、测试 EventCheckTx 消息
msg = genExecCheckTxMsg(q.Client(), block)
msgs = append(msgs, msg)
}
// 5、测试 EventBlockChainQuery 消息
var reqAddr types.ReqAddr
reqAddr.Addr, _ = genaddress()
reqAddr.Flag = 0
reqAddr.Count = 10
reqAddr.Direction = 0
reqAddr.Height = -1
reqAddr.Index = 0
execFunName := [][]string{
{"manage", "GetConfigItem"},
}
for _, str := range execFunName {
t := createBlockChainQueryRq(str[0], str[1])
if nil == t {
continue
}
msg = genEventBlockChainQueryMsg(q.Client(), types.Encode(t), str[0], str[1]) //生成消息
msgs = append(msgs, msg)
}
go func() {
for _, msga := range msgs {
q.Client().Send(msga, true)
_, err := q.Client().Wait(msga)
if err == nil || err == types.ErrNotFound || err == types.ErrEmpty {
t.Logf("%v,%v", msga, err)
} else {
t.Error(err)
}
}
q.Close()
}()
q.Start()
}
func waitTx(hash []byte) bool {
i := 0
for {
i++
if i%100 == 0 {
fmt.Println("wait transaction timeout")
return false
}
var reqHash types.ReqHash
reqHash.Hash = hash
res, err := mainClient.QueryTransaction(context.Background(), &reqHash)
if err != nil {
time.Sleep(time.Second)
}
if res != nil {
return true
}
}
}
func TestManageForTokenBlackList(t *testing.T) {
if !isManageTest {
return
}
fmt.Println("TestManageForTokenBlackList start")
defer fmt.Println("TestManageForTokenBlackList end")
v := &types.ModifyConfig{Key: "token-blacklist", Op: "add", Value: "GDT", Addr: ""}
modify := &pty.ManageAction{
Ty: pty.ManageActionModifyConfig,
Value: &pty.ManageAction_Modify{Modify: v},
}
tx := &types.Transaction{
Execer: []byte(execNameMa),
Payload: types.Encode(modify),
Fee: feeForToken,
Nonce: random.Int63(),
To: address.ExecAddress(execNameMa),
}
tx.Sign(types.SECP256K1, privkeySuper)
reply, err := mainClient.SendTransaction(context.Background(), tx)
if err != nil {
fmt.Println("err", err)
t.Error(err)
return
}
if !reply.IsOk {
fmt.Println("err = ", reply.GetMsg())
t.Error(ErrTest)
return
}
if !waitTx(tx.Hash()) {
t.Error(ErrTest)
return
}
time.Sleep(5 * time.Second)
}
func TestManageForTokenFinisher(t *testing.T) {
if !isManageTest {
return
}
fmt.Println("TestManageForTokenFinisher start")
defer fmt.Println("TestManageForTokenFinisher end")
v := &types.ModifyConfig{Key: "token-finisher", Op: "add", Value: addr, Addr: ""}
modify := &pty.ManageAction{
Ty: pty.ManageActionModifyConfig,
Value: &pty.ManageAction_Modify{Modify: v},
}
tx := &types.Transaction{
Execer: []byte(execNameMa),
Payload: types.Encode(modify),
Fee: feeForToken,
Nonce: random.Int63(),
To: address.ExecAddress(execNameMa),
}
tx.Sign(types.SECP256K1, privkeySuper)
reply, err := mainClient.SendTransaction(context.Background(), tx)
if err != nil {
fmt.Println("err", err)
t.Error(err)
return
}
if !reply.IsOk {
fmt.Println("err = ", reply.GetMsg())
t.Error(ErrTest)
return
}
if !waitTx(tx.Hash()) {
t.Error(ErrTest)
return
}
time.Sleep(5 * time.Second)
}
...@@ -144,6 +144,8 @@ const ( ...@@ -144,6 +144,8 @@ const (
EventLocalBegin = 135 EventLocalBegin = 135
EventLocalCommit = 136 EventLocalCommit = 136
EventLocalRollback = 137 EventLocalRollback = 137
EventLocalNew = 138
EventLocalClose = 139
//exec //exec
EventBlockChainQuery = 212 EventBlockChainQuery = 212
EventConsensusQuery = 213 EventConsensusQuery = 213
...@@ -281,6 +283,11 @@ var eventName = map[int]string{ ...@@ -281,6 +283,11 @@ var eventName = map[int]string{
// Token // Token
EventBlockChainQuery: "EventBlockChainQuery", EventBlockChainQuery: "EventBlockChainQuery",
EventConsensusQuery: "EventConsensusQuery", EventConsensusQuery: "EventConsensusQuery",
EventGetBlockBySeq: "EventGetBlockBySeq", EventGetBlockBySeq: "EventGetBlockBySeq",
EventLocalBegin: "EventLocalBegin",
EventLocalCommit: "EventLocalCommit",
EventLocalRollback: "EventLocalRollback",
EventLocalNew: "EventLocalNew",
EventLocalClose: "EventLocalClose",
} }
...@@ -150,9 +150,13 @@ func (f *Forks) CloneZero(from, to string) error { ...@@ -150,9 +150,13 @@ func (f *Forks) CloneZero(from, to string) error {
} }
// CloneMaxHeight fork信息拷贝并设置所有fork高度MaxHeight // CloneMaxHeight fork信息拷贝并设置所有fork高度MaxHeight
func (f *Forks) CloneMaxHeight(from, to string) { func (f *Forks) CloneMaxHeight(from, to string) error {
f.Clone(from, to) err := f.Clone(from, to)
if err != nil {
return err
}
f.SetAllFork(to, MaxHeight) f.SetAllFork(to, MaxHeight)
return nil
} }
// SetAllFork 设置所有fork的高度 // SetAllFork 设置所有fork的高度
...@@ -219,7 +223,10 @@ func setLocalFork() { ...@@ -219,7 +223,10 @@ func setLocalFork() {
//paraName not used currently //paraName not used currently
func setForkForPara(paraName string) { func setForkForPara(paraName string) {
systemFork.CloneZero("chain33", paraName) err := systemFork.CloneZero("chain33", paraName)
if err != nil {
tlog.Error("setForkForPara", "error", err)
}
systemFork.ReplaceFork(paraName, "ForkBlockHash", 1) systemFork.ReplaceFork(paraName, "ForkBlockHash", 1)
} }
......
...@@ -30,12 +30,21 @@ func CreateTxGroup(txs []*Transaction) (*Transactions, error) { ...@@ -30,12 +30,21 @@ func CreateTxGroup(txs []*Transaction) (*Transactions, error) {
} }
txgroup := &Transactions{} txgroup := &Transactions{}
txgroup.Txs = txs txgroup.Txs = txs
var header []byte
totalfee := int64(0) totalfee := int64(0)
minfee := int64(0) minfee := int64(0)
header := txs[0].Hash()
for i := len(txs) - 1; i >= 0; i-- { for i := len(txs) - 1; i >= 0; i-- {
txs[i].GroupCount = int32(len(txs)) txs[i].GroupCount = int32(len(txs))
totalfee += txs[i].GetFee() totalfee += txs[i].GetFee()
// Header和Fee设置是为了GetRealFee里面Size的计算,Fee是否为0和不同大小,size也是有差别的,header是否为空差别是common.Sha256Len+2
// 这里直接设置Header兼容性更好, Next不需要,已经设置过了,唯一不同的是,txs[0].fee会跟实际计算有差别,这里设置一个超大值只做计算
txs[i].Header = header
if i == 0 {
//对txs[0].fee设置一个超大值,大于后面实际计算出的fee,也就>=check时候计算出的fee, 对size影响10个字节,在1000临界值时候有差别
txs[i].Fee = 1 << 62
} else {
txs[i].Fee = 0
}
realfee, err := txs[i].GetRealFee(GInt("MinFee")) realfee, err := txs[i].GetRealFee(GInt("MinFee"))
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -46,7 +55,7 @@ func CreateTxGroup(txs []*Transaction) (*Transactions, error) { ...@@ -46,7 +55,7 @@ func CreateTxGroup(txs []*Transaction) (*Transactions, error) {
totalfee = minfee totalfee = minfee
} }
txs[0].Fee = totalfee txs[0].Fee = totalfee
header = txs[i].Hash() header = txs[0].Hash()
} else { } else {
txs[i].Fee = 0 txs[i].Fee = 0
txs[i-1].Next = txs[i].Hash() txs[i-1].Next = txs[i].Hash()
......
...@@ -47,6 +47,52 @@ func TestCreateGroupTx(t *testing.T) { ...@@ -47,6 +47,52 @@ func TestCreateGroupTx(t *testing.T) {
t.Log(grouptx) t.Log(grouptx)
} }
func TestCreateGroupTxWithSize(t *testing.T) {
tx1 := "0a05636f696e73120e18010a0a1080c2d72f1a036f746520a08d0630f1cdebc8f7efa5e9283a22313271796f6361794e46374c7636433971573461767873324537553431664b536676"
tx2 := "0a05636f696e73120e18010a0a1080c2d72f1a036f746520a08d0630de92c3828ad194b26d3a22313271796f6361794e46374c7636433971573461767873324537553431664b536676"
tx3 := "0a05636f696e73120e18010a0a1080c2d72f1a036f746520a08d0630b0d6c895c4d28efe5d3a22313271796f6361794e46374c7636433971573461767873324537553431664b536676"
tx11, _ := hex.DecodeString(tx1)
tx21, _ := hex.DecodeString(tx2)
tx31, _ := hex.DecodeString(tx3)
len150str := "0a05636f696e73120e18010a0a1080c2d72f1a036f746520a08d0630f1cdebc8f7efa5e9283a22313271796f6361794e46374c7636433971573461767873324537553431664b5366761122"
len130str := "0a05636f696e73120e18010a0a1080c2d72f1a036f746520a08d0630f1cdebc8f7efa5e9283a22313271796f6361794e46374c76364339715734617678733245375"
len105str := "0a05636f696e73120e18010a0a1080c2d72f1a036f746520a08d0630f1cdebc8f7efa5e9283a22313271796f6361794e46374c7633"
var tx12 Transaction
Decode(tx11, &tx12)
tx12.Fee = 1
//构造临界size, fee=1,构建时候计算出size是998, 构建之前fee是100000,构建之后tx[0].fee=200000,原代码会出错
extSize := []byte(len150str + len150str + len150str + len105str)
tx12.Payload = append(tx12.Payload, extSize...)
var tx22 Transaction
Decode(tx21, &tx22)
//构造临界size, 有没有header的场景
extSize = []byte(len150str + len150str + len150str + len130str)
tx22.Payload = append(tx22.Payload, extSize...)
var tx32 Transaction
Decode(tx31, &tx32)
group, err := CreateTxGroup([]*Transaction{&tx12, &tx22, &tx32})
if err != nil {
t.Error(err)
return
}
err = group.Check(0, GInt("MinFee"))
if err != nil {
for i := 0; i < len(group.Txs); i++ {
t.Log(group.Txs[i].JSON())
}
t.Error(err)
return
}
newtx := group.Tx()
grouptx := hex.EncodeToString(Encode(newtx))
t.Log(grouptx)
}
func TestDecodeTx(t *testing.T) { func TestDecodeTx(t *testing.T) {
signtx := "0a05636f696e73120e18010a0a1080c2d72f1a036f74651a6d0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a46304402200e566613679e8fe645990adb8ed6aa8c46060d944f5bab358e2c78443c3eed53022049d671e596d48f091dae3558b6fd811250412101765ba0dec5cc4188a180088720e0a71230f1cdebc8f7efa5e9283a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034ada050afe010a05636f696e73120e18010a0a1080c2d72f1a036f74651a6d0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a46304402200e566613679e8fe645990adb8ed6aa8c46060d944f5bab358e2c78443c3eed53022049d671e596d48f091dae3558b6fd811250412101765ba0dec5cc4188a180088720e0a71230f1cdebc8f7efa5e9283a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034a20a9ef5454033a9ab080360470291e9b1f63881ff9a03c3c09a06e7200688d019852209a56c5dbff8b246e32d3ad0534f42dbbae88cf4b0bed24fe6420e06d59187c690afb010a05636f696e73120e18010a0a1080c2d72f1a036f74651a6e0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a473045022100ac7acba851854179f0d574428e8c5a4c69d4431604e8626fd7ace87a8abe1f6c022039eb3f7ec190030b2c7e32457972482b3d074521856ea0d09820071b39ffb4b930de92c3828ad194b26d3a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034a20a9ef5454033a9ab080360470291e9b1f63881ff9a03c3c09a06e7200688d0198522036bb9ca17aeef20b6e9afcd5ba52d89f5109db5b5d2aee200723b2bb0c7e9aa30ad8010a05636f696e73120e18010a0a1080c2d72f1a036f74651a6d0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a4630440220094e12621f235ea46e99d21f30e8be510a52e6d92410b35e307936ce61aafe9602207fcdeb51825af222159c82b74ab2386d01263e4903d4a0cf96426c1b48bd083130b0d6c895c4d28efe5d3a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034a20a9ef5454033a9ab080360470291e9b1f63881ff9a03c3c09a06e7200688d019852209a56c5dbff8b246e32d3ad0534f42dbbae88cf4b0bed24fe6420e06d59187c69" signtx := "0a05636f696e73120e18010a0a1080c2d72f1a036f74651a6d0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a46304402200e566613679e8fe645990adb8ed6aa8c46060d944f5bab358e2c78443c3eed53022049d671e596d48f091dae3558b6fd811250412101765ba0dec5cc4188a180088720e0a71230f1cdebc8f7efa5e9283a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034ada050afe010a05636f696e73120e18010a0a1080c2d72f1a036f74651a6d0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a46304402200e566613679e8fe645990adb8ed6aa8c46060d944f5bab358e2c78443c3eed53022049d671e596d48f091dae3558b6fd811250412101765ba0dec5cc4188a180088720e0a71230f1cdebc8f7efa5e9283a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034a20a9ef5454033a9ab080360470291e9b1f63881ff9a03c3c09a06e7200688d019852209a56c5dbff8b246e32d3ad0534f42dbbae88cf4b0bed24fe6420e06d59187c690afb010a05636f696e73120e18010a0a1080c2d72f1a036f74651a6e0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a473045022100ac7acba851854179f0d574428e8c5a4c69d4431604e8626fd7ace87a8abe1f6c022039eb3f7ec190030b2c7e32457972482b3d074521856ea0d09820071b39ffb4b930de92c3828ad194b26d3a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034a20a9ef5454033a9ab080360470291e9b1f63881ff9a03c3c09a06e7200688d0198522036bb9ca17aeef20b6e9afcd5ba52d89f5109db5b5d2aee200723b2bb0c7e9aa30ad8010a05636f696e73120e18010a0a1080c2d72f1a036f74651a6d0801122102504fa1c28caaf1d5a20fefb87c50a49724ff401043420cb3ba271997eb5a43871a4630440220094e12621f235ea46e99d21f30e8be510a52e6d92410b35e307936ce61aafe9602207fcdeb51825af222159c82b74ab2386d01263e4903d4a0cf96426c1b48bd083130b0d6c895c4d28efe5d3a22313271796f6361794e46374c7636433971573461767873324537553431664b53667640034a20a9ef5454033a9ab080360470291e9b1f63881ff9a03c3c09a06e7200688d019852209a56c5dbff8b246e32d3ad0534f42dbbae88cf4b0bed24fe6420e06d59187c69"
var tx Transaction var tx Transaction
......
...@@ -39,6 +39,9 @@ func (s *HealthCheckServer) Close() { ...@@ -39,6 +39,9 @@ func (s *HealthCheckServer) Close() {
// NewHealthCheckServer new json rpcserver object // NewHealthCheckServer new json rpcserver object
func NewHealthCheckServer(c queue.Client) *HealthCheckServer { func NewHealthCheckServer(c queue.Client) *HealthCheckServer {
if c == nil {
return nil
}
h := &HealthCheckServer{} h := &HealthCheckServer{}
h.api, _ = client.New(c, nil) h.api, _ = client.New(c, nil)
h.quit = make(chan struct{}) h.quit = make(chan struct{})
......
...@@ -44,6 +44,8 @@ func TestGetHealth(t *testing.T) { ...@@ -44,6 +44,8 @@ func TestGetHealth(t *testing.T) {
peerlist := &types.PeerList{Peers: []*types.Peer{peer2}} peerlist := &types.PeerList{Peers: []*types.Peer{peer2}}
api.On("PeerInfo").Return(peerlist, nil).Once() api.On("PeerInfo").Return(peerlist, nil).Once()
healthNil := NewHealthCheckServer(nil)
assert.Nil(t, healthNil)
q := queue.New("channel") q := queue.New("channel")
health := NewHealthCheckServer(q.Client()) health := NewHealthCheckServer(q.Client())
health.api = api health.api = api
......
...@@ -390,8 +390,6 @@ func ExecAndCheckBlockCB(qclient queue.Client, block *types.Block, txs []*types. ...@@ -390,8 +390,6 @@ func ExecAndCheckBlockCB(qclient queue.Client, block *types.Block, txs []*types.
if err := cb(i, detail.Receipts[index]); err != nil { if err := cb(i, detail.Receipts[index]); err != nil {
return nil, err return nil, err
} }
} else {
panic("never happen")
} }
} }
return detail.Block, nil return detail.Block, nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment