Commit 292c0a50 authored by vipwzw's avatar vipwzw Committed by 33cn

update chain33

parent f933d7da
......@@ -255,7 +255,7 @@ func (client *client) CreateGenesisTx() (ret []*types.Transaction) {
return
}
func (client *client) ProcEvent(msg queue.Message) bool {
func (client *client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -36,7 +36,7 @@ func NewBlockstore(cfg *types.Consensus, replyChan chan *types.ClientReply, requ
}
// ProcEvent method
func (client *Client) ProcEvent(msg queue.Message) bool {
func (client *Client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -67,7 +67,7 @@ func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
}
// ProcEvent method
func (client *Client) ProcEvent(msg queue.Message) bool {
func (client *Client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -359,7 +359,7 @@ func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail
}
// ProcEvent ...
func (client *Client) ProcEvent(msg queue.Message) bool {
func (client *Client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -160,7 +160,7 @@ func (client *Client) Query_FlushTicket(req *types.ReqNil) (types.Message, error
}
// ProcEvent ticket reply not support action err
func (client *Client) ProcEvent(msg queue.Message) bool {
func (client *Client) ProcEvent(msg *queue.Message) bool {
msg.ReplyErr("Client", types.ErrActionNotSupport)
return true
}
......
......@@ -140,7 +140,7 @@ func (kvs *KVStore) IterateRangeByStateHash(statehash []byte, start []byte, end
}
// ProcEvent handles supported events
func (kvs *KVStore) ProcEvent(msg queue.Message) {
func (kvs *KVStore) ProcEvent(msg *queue.Message) {
msg.ReplyErr("KVStore", types.ErrActionNotSupport)
}
......
......@@ -174,7 +174,7 @@ func (mvccs *KVMVCCStore) IterateRangeByStateHash(statehash []byte, start []byte
}
// ProcEvent handles supported events
func (mvccs *KVMVCCStore) ProcEvent(msg queue.Message) {
func (mvccs *KVMVCCStore) ProcEvent(msg *queue.Message) {
msg.ReplyErr("KVStore", types.ErrActionNotSupport)
}
......
......@@ -158,6 +158,6 @@ func (mpts *Store) IterateRangeByStateHash(statehash []byte, start []byte, end [
}
// ProcEvent not support message
func (mpts *Store) ProcEvent(msg queue.Message) {
func (mpts *Store) ProcEvent(msg *queue.Message) {
msg.ReplyErr("Store", types.ErrActionNotSupport)
}
......@@ -9,7 +9,7 @@ import (
"github.com/33cn/chain33/types"
)
func (chain *BlockChain) procLocalDB(msgtype int64, msg queue.Message, reqnum chan struct{}) bool {
func (chain *BlockChain) procLocalDB(msgtype int64, msg *queue.Message, reqnum chan struct{}) bool {
switch msgtype {
case types.EventLocalGet:
go chain.processMsg(msg, reqnum, chain.localGet)
......@@ -35,7 +35,7 @@ func (chain *BlockChain) procLocalDB(msgtype int64, msg queue.Message, reqnum ch
return true
}
func (chain *BlockChain) localGet(msg queue.Message) {
func (chain *BlockChain) localGet(msg *queue.Message) {
keys := (msg.Data).(*types.LocalDBGet)
if keys.Txid == 0 {
values := chain.blockStore.Get(keys)
......@@ -60,7 +60,7 @@ func (chain *BlockChain) localGet(msg queue.Message) {
}
//只允许设置 通过 transaction 来 set 信息
func (chain *BlockChain) localSet(msg queue.Message) {
func (chain *BlockChain) localSet(msg *queue.Message) {
kvs := (msg.Data).(*types.LocalDBSet)
if kvs.Txid == 0 {
msg.Reply(chain.client.NewMessage("", types.EventLocalSet, types.ErrNotSetInTransaction))
......@@ -82,21 +82,21 @@ func (chain *BlockChain) localSet(msg queue.Message) {
}
//创建 localdb transaction
func (chain *BlockChain) localNew(msg queue.Message) {
func (chain *BlockChain) localNew(msg *queue.Message) {
tx := NewLocalDB(chain.blockStore.db)
id := common.StorePointer(tx)
msg.Reply(chain.client.NewMessage("", types.EventLocalNew, &types.Int64{Data: id}))
}
//关闭 localdb transaction
func (chain *BlockChain) localClose(msg queue.Message) {
func (chain *BlockChain) localClose(msg *queue.Message) {
id := (msg.Data).(*types.Int64).Data
_, err := common.GetPointer(id)
common.RemovePointer(id)
msg.Reply(chain.client.NewMessage("", types.EventLocalClose, err))
}
func (chain *BlockChain) localBegin(msg queue.Message) {
func (chain *BlockChain) localBegin(msg *queue.Message) {
id := (msg.Data).(*types.Int64).Data
tx, err := common.GetPointer(id)
if err != nil {
......@@ -107,7 +107,7 @@ func (chain *BlockChain) localBegin(msg queue.Message) {
msg.Reply(chain.client.NewMessage("", types.EventLocalBegin, nil))
}
func (chain *BlockChain) localCommit(msg queue.Message) {
func (chain *BlockChain) localCommit(msg *queue.Message) {
id := (msg.Data).(*types.Int64).Data
tx, err := common.GetPointer(id)
if err != nil {
......@@ -118,7 +118,7 @@ func (chain *BlockChain) localCommit(msg queue.Message) {
msg.Reply(chain.client.NewMessage("", types.EventLocalCommit, err))
}
func (chain *BlockChain) localRollback(msg queue.Message) {
func (chain *BlockChain) localRollback(msg *queue.Message) {
id := (msg.Data).(*types.Int64).Data
tx, err := common.GetPointer(id)
if err != nil {
......@@ -129,7 +129,7 @@ func (chain *BlockChain) localRollback(msg queue.Message) {
msg.Reply(chain.client.NewMessage("", types.EventLocalRollback, nil))
}
func (chain *BlockChain) localList(msg queue.Message) {
func (chain *BlockChain) localList(msg *queue.Message) {
q := (msg.Data).(*types.LocalDBList)
var values [][]byte
if q.Txid > 0 {
......@@ -150,7 +150,7 @@ func (chain *BlockChain) localList(msg queue.Message) {
}
//获取指定前缀key的数量
func (chain *BlockChain) localPrefixCount(msg queue.Message) {
func (chain *BlockChain) localPrefixCount(msg *queue.Message) {
Prefix := (msg.Data).(*types.ReqKey)
counts := db.NewListHelper(chain.blockStore.db).PrefixCount(Prefix.Key)
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, &types.Int64{Data: counts}))
......
......@@ -56,26 +56,26 @@ type mockClient struct {
c queue.Client
}
func (mock *mockClient) Send(msg queue.Message, waitReply bool) error {
func (mock *mockClient) Send(msg *queue.Message, waitReply bool) error {
if msg.Topic == "error" {
return types.ErrInvalidParam
}
return mock.c.Send(msg, waitReply)
}
func (mock *mockClient) SendTimeout(msg queue.Message, waitReply bool, timeout time.Duration) error {
func (mock *mockClient) SendTimeout(msg *queue.Message, waitReply bool, timeout time.Duration) error {
return mock.c.SendTimeout(msg, waitReply, timeout)
}
func (mock *mockClient) Wait(msg queue.Message) (queue.Message, error) {
func (mock *mockClient) Wait(msg *queue.Message) (*queue.Message, error) {
return mock.c.Wait(msg)
}
func (mock *mockClient) WaitTimeout(msg queue.Message, timeout time.Duration) (queue.Message, error) {
func (mock *mockClient) WaitTimeout(msg *queue.Message, timeout time.Duration) (*queue.Message, error) {
return mock.c.WaitTimeout(msg, timeout)
}
func (mock *mockClient) Recv() chan queue.Message {
func (mock *mockClient) Recv() chan *queue.Message {
return mock.c.Recv()
}
......@@ -92,7 +92,7 @@ func (mock *mockClient) CloseQueue() (*types.Reply, error) {
return &types.Reply{IsOk: true, Msg: []byte("Ok")}, nil
}
func (mock *mockClient) NewMessage(topic string, ty int64, data interface{}) queue.Message {
func (mock *mockClient) NewMessage(topic string, ty int64, data interface{}) *queue.Message {
return mock.c.NewMessage(topic, ty, data)
}
......
......@@ -869,28 +869,32 @@ func (_m *QueueProtocolAPI) NewAccount(param *types.ReqNewAccount) (*types.Walle
}
// NewMessage provides a mock function with given fields: topic, msgid, data
func (_m *QueueProtocolAPI) NewMessage(topic string, msgid int64, data interface{}) queue.Message {
func (_m *QueueProtocolAPI) NewMessage(topic string, msgid int64, data interface{}) *queue.Message {
ret := _m.Called(topic, msgid, data)
var r0 queue.Message
if rf, ok := ret.Get(0).(func(string, int64, interface{}) queue.Message); ok {
var r0 *queue.Message
if rf, ok := ret.Get(0).(func(string, int64, interface{}) *queue.Message); ok {
r0 = rf(topic, msgid, data)
} else {
r0 = ret.Get(0).(queue.Message)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*queue.Message)
}
}
return r0
}
// Notify provides a mock function with given fields: topic, ty, data
func (_m *QueueProtocolAPI) Notify(topic string, ty int64, data interface{}) (queue.Message, error) {
func (_m *QueueProtocolAPI) Notify(topic string, ty int64, data interface{}) (*queue.Message, error) {
ret := _m.Called(topic, ty, data)
var r0 queue.Message
if rf, ok := ret.Get(0).(func(string, int64, interface{}) queue.Message); ok {
var r0 *queue.Message
if rf, ok := ret.Get(0).(func(string, int64, interface{}) *queue.Message); ok {
r0 = rf(topic, ty, data)
} else {
r0 = ret.Get(0).(queue.Message)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*queue.Message)
}
}
var r1 error
......
......@@ -55,34 +55,34 @@ func New(client queue.Client, option *QueueProtocolOption) (QueueProtocolAPI, er
if option != nil {
q.option = *option
} else {
q.option.SendTimeout = 600 * time.Second
q.option.WaitTimeout = 600 * time.Second
q.option.SendTimeout = time.Duration(-1)
q.option.WaitTimeout = time.Duration(-1)
}
return q, nil
}
func (q *QueueProtocol) query(topic string, ty int64, data interface{}) (queue.Message, error) {
func (q *QueueProtocol) query(topic string, ty int64, data interface{}) (*queue.Message, error) {
client := q.client
msg := client.NewMessage(topic, ty, data)
err := client.SendTimeout(msg, true, q.option.SendTimeout)
if err != nil {
return queue.Message{}, err
return &queue.Message{}, err
}
return client.WaitTimeout(msg, q.option.WaitTimeout)
}
func (q *QueueProtocol) notify(topic string, ty int64, data interface{}) (queue.Message, error) {
func (q *QueueProtocol) notify(topic string, ty int64, data interface{}) (*queue.Message, error) {
client := q.client
msg := client.NewMessage(topic, ty, data)
err := client.SendTimeout(msg, false, q.option.SendTimeout)
if err != nil {
return queue.Message{}, err
return &queue.Message{}, err
}
return msg, err
}
// Notify new and send client message
func (q *QueueProtocol) Notify(topic string, ty int64, data interface{}) (queue.Message, error) {
func (q *QueueProtocol) Notify(topic string, ty int64, data interface{}) (*queue.Message, error) {
return q.notify(topic, ty, data)
}
......@@ -92,7 +92,7 @@ func (q *QueueProtocol) Close() {
}
// NewMessage new message
func (q *QueueProtocol) NewMessage(topic string, msgid int64, data interface{}) queue.Message {
func (q *QueueProtocol) NewMessage(topic string, msgid int64, data interface{}) *queue.Message {
return q.client.NewMessage(topic, msgid, data)
}
......
......@@ -13,8 +13,8 @@ import (
type QueueProtocolAPI interface {
Version() (*types.VersionInfo, error)
Close()
NewMessage(topic string, msgid int64, data interface{}) queue.Message
Notify(topic string, ty int64, data interface{}) (queue.Message, error)
NewMessage(topic string, msgid int64, data interface{}) *queue.Message
Notify(topic string, ty int64, data interface{}) (*queue.Message, error)
// +++++++++++++++ mempool interfaces begin
// 同步发送交易信息到指定模块,获取应答消息 types.EventTx
SendTx(param *types.Transaction) (*types.Reply, error)
......
......@@ -129,7 +129,7 @@ func (exec *Executor) SetQueueClient(qcli queue.Client) {
}()
}
func (exec *Executor) procExecQuery(msg queue.Message) {
func (exec *Executor) procExecQuery(msg *queue.Message) {
header, err := exec.qclient.GetLastHeader()
if err != nil {
msg.Reply(exec.client.NewMessage("", types.EventBlockChainQuery, err))
......@@ -168,7 +168,7 @@ func (exec *Executor) procExecQuery(msg queue.Message) {
msg.Reply(exec.client.NewMessage("", types.EventBlockChainQuery, ret))
}
func (exec *Executor) procExecCheckTx(msg queue.Message) {
func (exec *Executor) procExecCheckTx(msg *queue.Message) {
datas := msg.GetData().(*types.ExecTxList)
ctx := &executorCtx{
stateHash: datas.StateHash,
......@@ -204,7 +204,7 @@ func (exec *Executor) procExecCheckTx(msg queue.Message) {
msg.Reply(exec.client.NewMessage("", types.EventReceiptCheckTx, result))
}
func (exec *Executor) procExecTxList(msg queue.Message) {
func (exec *Executor) procExecTxList(msg *queue.Message) {
datas := msg.GetData().(*types.ExecTxList)
ctx := &executorCtx{
stateHash: datas.StateHash,
......@@ -278,7 +278,7 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
&types.Receipts{Receipts: receipts}))
}
func (exec *Executor) procExecAddBlock(msg queue.Message) {
func (exec *Executor) procExecAddBlock(msg *queue.Message) {
datas := msg.GetData().(*types.BlockDetail)
b := datas.Block
ctx := &executorCtx{
......@@ -340,7 +340,7 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) {
msg.Reply(exec.client.NewMessage("", types.EventAddBlock, &kvset))
}
func (exec *Executor) procExecDelBlock(msg queue.Message) {
func (exec *Executor) procExecDelBlock(msg *queue.Message) {
datas := msg.GetData().(*types.BlockDetail)
b := datas.Block
ctx := &executorCtx{
......
......@@ -23,13 +23,13 @@ import (
// EventInterface p2p subscribe to the event hander interface
type EventInterface interface {
BroadCastTx(msg queue.Message, taskindex int64)
GetMemPool(msg queue.Message, taskindex int64)
GetPeerInfo(msg queue.Message, taskindex int64)
GetHeaders(msg queue.Message, taskindex int64)
GetBlocks(msg queue.Message, taskindex int64)
BlockBroadcast(msg queue.Message, taskindex int64)
GetNetInfo(msg queue.Message, taskindex int64)
BroadCastTx(msg *queue.Message, taskindex int64)
GetMemPool(msg *queue.Message, taskindex int64)
GetPeerInfo(msg *queue.Message, taskindex int64)
GetHeaders(msg *queue.Message, taskindex int64)
GetBlocks(msg *queue.Message, taskindex int64)
BlockBroadcast(msg *queue.Message, taskindex int64)
GetNetInfo(msg *queue.Message, taskindex int64)
}
// NormalInterface subscribe to the event hander interface
......@@ -67,7 +67,7 @@ func NewNormalP2PCli() NormalInterface {
}
// BroadCastTx broadcast transactions
func (m *Cli) BroadCastTx(msg queue.Message, taskindex int64) {
func (m *Cli) BroadCastTx(msg *queue.Message, taskindex int64) {
defer func() {
<-m.network.txFactory
atomic.AddInt32(&m.network.txCapcity, 1)
......@@ -78,7 +78,7 @@ func (m *Cli) BroadCastTx(msg queue.Message, taskindex int64) {
}
// GetMemPool get mempool contents
func (m *Cli) GetMemPool(msg queue.Message, taskindex int64) {
func (m *Cli) GetMemPool(msg *queue.Message, taskindex int64) {
defer func() {
<-m.network.otherFactory
log.Debug("GetMemPool", "task complete:", taskindex)
......@@ -198,8 +198,7 @@ func (m *Cli) GetAddrList(peer *Peer) (map[string]int64, error) {
log.Error("getLocalPeerInfo blockchain", "Error", err.Error())
return addrlist, err
}
var respmsg queue.Message
respmsg, err = client.WaitTimeout(msg, time.Second*30)
respmsg, err := client.WaitTimeout(msg, time.Second*30)
if err != nil {
return addrlist, err
}
......@@ -318,7 +317,7 @@ func (m *Cli) GetBlockHeight(nodeinfo *NodeInfo) (int64, error) {
}
// GetPeerInfo return peer information
func (m *Cli) GetPeerInfo(msg queue.Message, taskindex int64) {
func (m *Cli) GetPeerInfo(msg *queue.Message, taskindex int64) {
defer func() {
log.Debug("GetPeerInfo", "task complete:", taskindex)
}()
......@@ -343,7 +342,7 @@ func (m *Cli) GetPeerInfo(msg queue.Message, taskindex int64) {
}
// GetHeaders get headers information
func (m *Cli) GetHeaders(msg queue.Message, taskindex int64) {
func (m *Cli) GetHeaders(msg *queue.Message, taskindex int64) {
defer func() {
<-m.network.otherFactory
log.Debug("GetHeaders", "task complete:", taskindex)
......@@ -388,7 +387,7 @@ func (m *Cli) GetHeaders(msg queue.Message, taskindex int64) {
}
// GetBlocks get blocks information
func (m *Cli) GetBlocks(msg queue.Message, taskindex int64) {
func (m *Cli) GetBlocks(msg *queue.Message, taskindex int64) {
defer func() {
<-m.network.otherFactory
log.Debug("GetBlocks", "task complete:", taskindex)
......@@ -546,7 +545,7 @@ func (m *Cli) GetBlocks(msg queue.Message, taskindex int64) {
}
// BlockBroadcast block broadcast
func (m *Cli) BlockBroadcast(msg queue.Message, taskindex int64) {
func (m *Cli) BlockBroadcast(msg *queue.Message, taskindex int64) {
defer func() {
<-m.network.otherFactory
log.Debug("BlockBroadcast", "task complete:", taskindex)
......@@ -555,7 +554,7 @@ func (m *Cli) BlockBroadcast(msg queue.Message, taskindex int64) {
}
// GetNetInfo get network information
func (m *Cli) GetNetInfo(msg queue.Message, taskindex int64) {
func (m *Cli) GetNetInfo(msg *queue.Message, taskindex int64) {
defer func() {
<-m.network.otherFactory
log.Debug("GetNetInfo", "task complete:", taskindex)
......
......@@ -29,15 +29,15 @@ var gid int64
// Client 消息队列的接口,每个模块都需要一个发送接受client
type Client interface {
Send(msg Message, waitReply bool) (err error) //同步发送消息
SendTimeout(msg Message, waitReply bool, timeout time.Duration) (err error)
Wait(msg Message) (Message, error) //等待消息处理完成
WaitTimeout(msg Message, timeout time.Duration) (Message, error) //等待消息处理完成
Recv() chan Message
Send(msg *Message, waitReply bool) (err error) //同步发送消息
SendTimeout(msg *Message, waitReply bool, timeout time.Duration) (err error)
Wait(msg *Message) (*Message, error) //等待消息处理完成
WaitTimeout(msg *Message, timeout time.Duration) (*Message, error) //等待消息处理完成
Recv() chan *Message
Sub(topic string) //订阅消息
Close()
CloseQueue() (*types.Reply, error)
NewMessage(topic string, ty int64, data interface{}) (msg Message)
NewMessage(topic string, ty int64, data interface{}) (msg *Message)
}
// Module be used for module interface
......@@ -50,7 +50,7 @@ type Module interface {
type client struct {
q *queue
recv chan Message
recv chan *Message
done chan struct{}
wg *sync.WaitGroup
topic unsafe.Pointer
......@@ -61,7 +61,7 @@ type client struct {
func newClient(q *queue) Client {
client := &client{}
client.q = q
client.recv = make(chan Message, 5)
client.recv = make(chan *Message, 5)
client.done = make(chan struct{}, 1)
client.wg = &sync.WaitGroup{}
return client
......@@ -70,11 +70,8 @@ func newClient(q *queue) Client {
// Send 发送消息,msg 消息 ,waitReply 是否等待回应
//1. 系统保证send出去的消息就是成功了,除非系统崩溃
//2. 系统保证每个消息都有对应的 response 消息
func (client *client) Send(msg Message, waitReply bool) (err error) {
timeout := 10 * time.Minute
if types.IsTestNet() {
timeout = time.Minute
}
func (client *client) Send(msg *Message, waitReply bool) (err error) {
timeout := time.Duration(-1)
err = client.SendTimeout(msg, waitReply, timeout)
if err == ErrQueueTimeout {
panic(err)
......@@ -83,7 +80,7 @@ func (client *client) Send(msg Message, waitReply bool) (err error) {
}
// SendTimeout 超时发送, msg 消息 ,waitReply 是否等待回应, timeout 超时时间
func (client *client) SendTimeout(msg Message, waitReply bool, timeout time.Duration) (err error) {
func (client *client) SendTimeout(msg *Message, waitReply bool, timeout time.Duration) (err error) {
if client.isClose() {
return ErrIsQueueClosed
}
......@@ -99,15 +96,19 @@ func (client *client) SendTimeout(msg Message, waitReply bool, timeout time.Dura
//2. Send 高优先级别的发送消息
// NewMessage 新建消息 topic模块名称 ty消息类型 data 数据
func (client *client) NewMessage(topic string, ty int64, data interface{}) (msg Message) {
func (client *client) NewMessage(topic string, ty int64, data interface{}) (msg *Message) {
id := atomic.AddInt64(&gid, 1)
return NewMessage(id, topic, ty, data)
}
// WaitTimeout 等待时间 msg 消息 timeout 超时时间
func (client *client) WaitTimeout(msg Message, timeout time.Duration) (Message, error) {
func (client *client) WaitTimeout(msg *Message, timeout time.Duration) (*Message, error) {
if msg.chReply == nil {
return Message{}, errors.New("empty wait channel")
return &Message{}, errors.New("empty wait channel")
}
if timeout == -1 {
msg = <-msg.chReply
return msg, msg.Err()
}
t := time.NewTimer(timeout)
defer t.Stop()
......@@ -115,18 +116,15 @@ func (client *client) WaitTimeout(msg Message, timeout time.Duration) (Message,
case msg = <-msg.chReply:
return msg, msg.Err()
case <-client.done:
return Message{}, ErrIsQueueClosed
return &Message{}, ErrIsQueueClosed
case <-t.C:
return Message{}, ErrQueueTimeout
return &Message{}, ErrQueueTimeout
}
}
// Wait 等待时间
func (client *client) Wait(msg Message) (Message, error) {
timeout := 10 * time.Minute
if types.IsTestNet() {
timeout = 5 * time.Minute
}
func (client *client) Wait(msg *Message) (*Message, error) {
timeout := time.Duration(-1)
msg, err := client.WaitTimeout(msg, timeout)
if err == ErrQueueTimeout {
panic(err)
......@@ -135,7 +133,7 @@ func (client *client) Wait(msg Message) (Message, error) {
}
// Recv 获取接受消息通道
func (client *client) Recv() chan Message {
func (client *client) Recv() chan *Message {
return client.recv
}
......@@ -181,7 +179,7 @@ func (client *client) CloseQueue() (*types.Reply, error) {
return &types.Reply{IsOk: true}, nil
}
func (client *client) isEnd(data Message, ok bool) bool {
func (client *client) isEnd(data *Message, ok bool) bool {
if !ok {
return true
}
......
......@@ -16,5 +16,4 @@ func TestSetTopic(t *testing.T) {
client.setTopic(hi)
ret := client.getTopic()
assert.Equal(t, hi, ret)
}
......@@ -41,29 +41,31 @@ func (_m *Client) CloseQueue() (*types.Reply, error) {
}
// NewMessage provides a mock function with given fields: topic, ty, data
func (_m *Client) NewMessage(topic string, ty int64, data interface{}) queue.Message {
func (_m *Client) NewMessage(topic string, ty int64, data interface{}) *queue.Message {
ret := _m.Called(topic, ty, data)
var r0 queue.Message
if rf, ok := ret.Get(0).(func(string, int64, interface{}) queue.Message); ok {
var r0 *queue.Message
if rf, ok := ret.Get(0).(func(string, int64, interface{}) *queue.Message); ok {
r0 = rf(topic, ty, data)
} else {
r0 = ret.Get(0).(queue.Message)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*queue.Message)
}
}
return r0
}
// Recv provides a mock function with given fields:
func (_m *Client) Recv() chan queue.Message {
func (_m *Client) Recv() chan *queue.Message {
ret := _m.Called()
var r0 chan queue.Message
if rf, ok := ret.Get(0).(func() chan queue.Message); ok {
var r0 chan *queue.Message
if rf, ok := ret.Get(0).(func() chan *queue.Message); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(chan queue.Message)
r0 = ret.Get(0).(chan *queue.Message)
}
}
......@@ -71,11 +73,11 @@ func (_m *Client) Recv() chan queue.Message {
}
// Send provides a mock function with given fields: msg, waitReply
func (_m *Client) Send(msg queue.Message, waitReply bool) error {
func (_m *Client) Send(msg *queue.Message, waitReply bool) error {
ret := _m.Called(msg, waitReply)
var r0 error
if rf, ok := ret.Get(0).(func(queue.Message, bool) error); ok {
if rf, ok := ret.Get(0).(func(*queue.Message, bool) error); ok {
r0 = rf(msg, waitReply)
} else {
r0 = ret.Error(0)
......@@ -85,11 +87,11 @@ func (_m *Client) Send(msg queue.Message, waitReply bool) error {
}
// SendTimeout provides a mock function with given fields: msg, waitReply, timeout
func (_m *Client) SendTimeout(msg queue.Message, waitReply bool, timeout time.Duration) error {
func (_m *Client) SendTimeout(msg *queue.Message, waitReply bool, timeout time.Duration) error {
ret := _m.Called(msg, waitReply, timeout)
var r0 error
if rf, ok := ret.Get(0).(func(queue.Message, bool, time.Duration) error); ok {
if rf, ok := ret.Get(0).(func(*queue.Message, bool, time.Duration) error); ok {
r0 = rf(msg, waitReply, timeout)
} else {
r0 = ret.Error(0)
......@@ -104,18 +106,20 @@ func (_m *Client) Sub(topic string) {
}
// Wait provides a mock function with given fields: msg
func (_m *Client) Wait(msg queue.Message) (queue.Message, error) {
func (_m *Client) Wait(msg *queue.Message) (*queue.Message, error) {
ret := _m.Called(msg)
var r0 queue.Message
if rf, ok := ret.Get(0).(func(queue.Message) queue.Message); ok {
var r0 *queue.Message
if rf, ok := ret.Get(0).(func(*queue.Message) *queue.Message); ok {
r0 = rf(msg)
} else {
r0 = ret.Get(0).(queue.Message)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*queue.Message)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(queue.Message) error); ok {
if rf, ok := ret.Get(1).(func(*queue.Message) error); ok {
r1 = rf(msg)
} else {
r1 = ret.Error(1)
......@@ -125,18 +129,20 @@ func (_m *Client) Wait(msg queue.Message) (queue.Message, error) {
}
// WaitTimeout provides a mock function with given fields: msg, timeout
func (_m *Client) WaitTimeout(msg queue.Message, timeout time.Duration) (queue.Message, error) {
func (_m *Client) WaitTimeout(msg *queue.Message, timeout time.Duration) (*queue.Message, error) {
ret := _m.Called(msg, timeout)
var r0 queue.Message
if rf, ok := ret.Get(0).(func(queue.Message, time.Duration) queue.Message); ok {
var r0 *queue.Message
if rf, ok := ret.Get(0).(func(*queue.Message, time.Duration) *queue.Message); ok {
r0 = rf(msg, timeout)
} else {
r0 = ret.Get(0).(queue.Message)
if ret.Get(0) != nil {
r0 = ret.Get(0).(*queue.Message)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(queue.Message, time.Duration) error); ok {
if rf, ok := ret.Get(1).(func(*queue.Message, time.Duration) error); ok {
r1 = rf(msg, timeout)
} else {
r1 = ret.Error(1)
......
......@@ -46,8 +46,8 @@ func DisableLog() {
}
type chanSub struct {
high chan Message
low chan Message
high chan *Message
low chan *Message
isClose int32
}
......@@ -113,8 +113,8 @@ func (q *queue) Close() {
q.mu.Lock()
for topic, ch := range q.chanSubs {
if ch.isClose == 0 {
ch.high <- Message{}
ch.low <- Message{}
ch.high <- &Message{}
ch.low <- &Message{}
q.chanSubs[topic] = &chanSub{isClose: 1}
}
}
......@@ -130,7 +130,7 @@ func (q *queue) chanSub(topic string) *chanSub {
defer q.mu.Unlock()
_, ok := q.chanSubs[topic]
if !ok {
q.chanSubs[topic] = &chanSub{make(chan Message, defaultChanBuffer), make(chan Message, defaultLowChanBuffer), 0}
q.chanSubs[topic] = &chanSub{make(chan *Message, defaultChanBuffer), make(chan *Message, defaultLowChanBuffer), 0}
}
return q.chanSubs[topic]
}
......@@ -143,13 +143,13 @@ func (q *queue) closeTopic(topic string) {
return
}
if sub.isClose == 0 {
sub.high <- Message{}
sub.low <- Message{}
sub.high <- &Message{}
sub.low <- &Message{}
}
q.chanSubs[topic] = &chanSub{isClose: 1}
}
func (q *queue) send(msg Message, timeout time.Duration) (err error) {
func (q *queue) send(msg *Message, timeout time.Duration) (err error) {
if q.isClosed() {
return types.ErrChannelClosed
}
......@@ -166,28 +166,28 @@ func (q *queue) send(msg Message, timeout time.Duration) (err error) {
if timeout == 0 {
select {
case sub.high <- msg:
qlog.Debug("send ok", "msg", msg, "topic", msg.Topic, "sub", sub)
return nil
default:
qlog.Debug("send chainfull", "msg", msg, "topic", msg.Topic, "sub", sub)
qlog.Error("send chainfull", "msg", msg, "topic", msg.Topic, "sub", sub)
return ErrQueueChannelFull
}
}
if timeout == -1 {
sub.high <- msg
return nil
}
t := time.NewTimer(timeout)
defer t.Stop()
select {
case sub.high <- msg:
case <-t.C:
qlog.Debug("send timeout", "msg", msg, "topic", msg.Topic, "sub", sub)
qlog.Error("send timeout", "msg", msg, "topic", msg.Topic, "sub", sub)
return ErrQueueTimeout
}
if msg.Topic != "store" {
qlog.Debug("send ok", "msg", msg, "topic", msg.Topic, "sub", sub)
}
return nil
}
func (q *queue) sendAsyn(msg Message) error {
func (q *queue) sendAsyn(msg *Message) error {
if q.isClosed() {
return types.ErrChannelClosed
}
......@@ -197,7 +197,6 @@ func (q *queue) sendAsyn(msg Message) error {
}
select {
case sub.low <- msg:
qlog.Debug("send asyn ok", "msg", msg)
return nil
default:
qlog.Error("send asyn err", "msg", msg, "err", ErrQueueChannelFull)
......@@ -205,7 +204,7 @@ func (q *queue) sendAsyn(msg Message) error {
}
}
func (q *queue) sendLowTimeout(msg Message, timeout time.Duration) error {
func (q *queue) sendLowTimeout(msg *Message, timeout time.Duration) error {
if q.isClosed() {
return types.ErrChannelClosed
}
......@@ -216,11 +215,14 @@ func (q *queue) sendLowTimeout(msg Message, timeout time.Duration) error {
if timeout == 0 {
return q.sendAsyn(msg)
}
if timeout == -1 {
sub.low <- msg
return nil
}
t := time.NewTimer(timeout)
defer t.Stop()
select {
case sub.low <- msg:
qlog.Debug("send asyn ok", "msg", msg)
return nil
case <-t.C:
qlog.Error("send asyn timeout", "msg", msg)
......@@ -239,21 +241,22 @@ type Message struct {
Ty int64
ID int64
Data interface{}
chReply chan Message
chReply chan *Message
}
// NewMessage new message
func NewMessage(id int64, topic string, ty int64, data interface{}) (msg Message) {
func NewMessage(id int64, topic string, ty int64, data interface{}) (msg *Message) {
msg = &Message{}
msg.ID = id
msg.Ty = ty
msg.Data = data
msg.Topic = topic
msg.chReply = make(chan Message, 1)
msg.chReply = make(chan *Message, 1)
return msg
}
// GetData get message data
func (msg Message) GetData() interface{} {
func (msg *Message) GetData() interface{} {
if _, ok := msg.Data.(error); ok {
return nil
}
......@@ -261,7 +264,7 @@ func (msg Message) GetData() interface{} {
}
// Err if err return error msg, or return nil
func (msg Message) Err() error {
func (msg *Message) Err() error {
if err, ok := msg.Data.(error); ok {
return err
}
......@@ -269,7 +272,7 @@ func (msg Message) Err() error {
}
// Reply reply message to reply chan
func (msg Message) Reply(replyMsg Message) {
func (msg *Message) Reply(replyMsg *Message) {
if msg.chReply == nil {
qlog.Debug("reply a empty chreply", "msg", msg)
return
......@@ -281,13 +284,13 @@ func (msg Message) Reply(replyMsg Message) {
}
// String print the message information
func (msg Message) String() string {
func (msg *Message) String() string {
return fmt.Sprintf("{topic:%s, Ty:%s, Id:%d, Err:%v, Ch:%v}", msg.Topic,
types.GetEventName(int(msg.Ty)), msg.ID, msg.Err(), msg.chReply != nil)
}
// ReplyErr reply error
func (msg Message) ReplyErr(title string, err error) {
func (msg *Message) ReplyErr(title string, err error) {
var reply types.Reply
if err != nil {
qlog.Error(title, "reply.err", err.Error())
......
......@@ -210,3 +210,70 @@ func TestPrintMessage(t *testing.T) {
msg := client.NewMessage("mempool", types.EventReply, types.Reply{IsOk: true, Msg: []byte("word")})
t.Log(msg)
}
func BenchmarkSendMessage(b *testing.B) {
q := New("channel")
//mempool
go func() {
client := q.Client()
client.Sub("mempool")
defer client.Close()
for msg := range client.Recv() {
if msg.Ty == types.EventTx {
msg.Reply(client.NewMessage("mempool", types.EventReply, types.Reply{IsOk: true, Msg: []byte("word")}))
}
}
}()
go q.Start()
client := q.Client()
//high 优先级
for i := 0; i < b.N; i++ {
msg := client.NewMessage("mempool", types.EventTx, "hello")
err := client.Send(msg, true)
if err != nil {
b.Error(err)
return
}
_, err = client.Wait(msg)
if err != nil {
b.Error(err)
return
}
}
}
func BenchmarkStructChan(b *testing.B) {
ch := make(chan struct{})
go func() {
for {
<-ch
}
}()
for i := 0; i < b.N; i++ {
ch <- struct{}{}
}
}
func BenchmarkBoolChan(b *testing.B) {
ch := make(chan bool)
go func() {
for {
<-ch
}
}()
for i := 0; i < b.N; i++ {
ch <- true
}
}
func BenchmarkIntChan(b *testing.B) {
ch := make(chan int)
go func() {
for {
<-ch
}
}()
for i := 0; i < b.N; i++ {
ch <- 1
}
}
......@@ -37,7 +37,7 @@ type Miner interface {
GetGenesisBlockTime() int64
CreateBlock()
CheckBlock(parent *types.Block, current *types.BlockDetail) error
ProcEvent(msg queue.Message) bool
ProcEvent(msg *queue.Message) bool
}
//BaseClient ...
......
......@@ -83,7 +83,7 @@ func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
}
//ProcEvent false
func (client *Client) ProcEvent(msg queue.Message) bool {
func (client *Client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -20,8 +20,8 @@ var mlog = log.New("module", "mempool.base")
//Mempool mempool 基础类
type Mempool struct {
proxyMtx sync.Mutex
in chan queue.Message
out <-chan queue.Message
in chan *queue.Message
out <-chan *queue.Message
client queue.Client
header *types.Header
sync bool
......@@ -53,8 +53,8 @@ func NewMempool(cfg *types.Mempool) *Mempool {
if cfg.PoolCacheSize == 0 {
cfg.PoolCacheSize = poolCacheSize
}
pool.in = make(chan queue.Message)
pool.out = make(<-chan queue.Message)
pool.in = make(chan *queue.Message)
pool.out = make(<-chan *queue.Message)
pool.done = make(chan struct{})
pool.cfg = cfg
pool.poolHeader = make(chan struct{}, 2)
......@@ -235,7 +235,7 @@ func (mem *Mempool) pollLastHeader() {
time.Sleep(time.Second)
continue
}
h := lastHeader.(queue.Message).Data.(*types.Header)
h := lastHeader.(*queue.Message).Data.(*types.Header)
mem.setHeader(h)
return
}
......
......@@ -11,7 +11,7 @@ import (
)
// CheckExpireValid 检查交易过期有效性,过期返回false,未过期返回true
func (mem *Mempool) CheckExpireValid(msg queue.Message) (bool, error) {
func (mem *Mempool) CheckExpireValid(msg *queue.Message) (bool, error) {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if mem.header == nil {
......@@ -54,7 +54,7 @@ func (mem *Mempool) checkExpireValid(tx *types.Transaction) bool {
}
// CheckTx 初步检查并筛选交易消息
func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
func (mem *Mempool) checkTx(msg *queue.Message) *queue.Message {
tx := msg.GetData().(types.TxGroup).Tx()
// 检查接收地址是否合法
if err := address.CheckAddress(tx.To); err != nil {
......@@ -77,7 +77,7 @@ func (mem *Mempool) checkTx(msg queue.Message) queue.Message {
}
// CheckTxs 初步检查并筛选交易消息
func (mem *Mempool) checkTxs(msg queue.Message) queue.Message {
func (mem *Mempool) checkTxs(msg *queue.Message) *queue.Message {
// 判断消息是否含有nil交易
if msg.GetData() == nil {
msg.Data = types.ErrEmptyTx
......@@ -105,7 +105,7 @@ func (mem *Mempool) checkTxs(msg queue.Message) queue.Message {
}
//txgroup 的交易
for i := 0; i < len(txs.Txs); i++ {
msgitem := mem.checkTx(queue.Message{Data: txs.Txs[i]})
msgitem := mem.checkTx(&queue.Message{Data: txs.Txs[i]})
if msgitem.Err() != nil {
msg.Data = msgitem.Err()
return msg
......@@ -115,7 +115,7 @@ func (mem *Mempool) checkTxs(msg queue.Message) queue.Message {
}
//checkTxList 检查账户余额是否足够,并加入到Mempool,成功则传入goodChan,若加入Mempool失败则传入badChan
func (mem *Mempool) checkTxRemote(msg queue.Message) queue.Message {
func (mem *Mempool) checkTxRemote(msg *queue.Message) *queue.Message {
tx := msg.GetData().(types.TxGroup)
txlist := &types.ExecTxList{}
txlist.Txs = append(txlist.Txs, tx.Tx())
......
......@@ -19,28 +19,28 @@ func (mem *Mempool) reply() {
}
}
func (mem *Mempool) pipeLine() <-chan queue.Message {
func (mem *Mempool) pipeLine() <-chan *queue.Message {
//check sign
step1 := func(data queue.Message) queue.Message {
step1 := func(data *queue.Message) *queue.Message {
if data.Err() != nil {
return data
}
return mem.checkSign(data)
}
chs := make([]<-chan queue.Message, processNum)
chs := make([]<-chan *queue.Message, processNum)
for i := 0; i < processNum; i++ {
chs[i] = step(mem.done, mem.in, step1)
}
out1 := merge(mem.done, chs)
//checktx remote
step2 := func(data queue.Message) queue.Message {
step2 := func(data *queue.Message) *queue.Message {
if data.Err() != nil {
return data
}
return mem.checkTxRemote(data)
}
chs2 := make([]<-chan queue.Message, processNum)
chs2 := make([]<-chan *queue.Message, processNum)
for i := 0; i < processNum; i++ {
chs2[i] = step(mem.done, out1, step2)
}
......@@ -94,7 +94,7 @@ func (mem *Mempool) eventProcess() {
}
//EventTx 初步筛选后存入mempool
func (mem *Mempool) eventTx(msg queue.Message) {
func (mem *Mempool) eventTx(msg *queue.Message) {
if !mem.getSync() {
msg.Reply(mem.client.NewMessage("", types.EventReply, &types.Reply{Msg: []byte(types.ErrNotSync.Error())}))
mlog.Debug("wrong tx", "err", types.ErrNotSync.Error())
......@@ -108,13 +108,13 @@ func (mem *Mempool) eventTx(msg queue.Message) {
}
// EventGetMempool 获取Mempool内所有交易
func (mem *Mempool) eventGetMempool(msg queue.Message) {
func (mem *Mempool) eventGetMempool(msg *queue.Message) {
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: mem.filterTxList(0, nil)}))
}
// EventDelTxList 获取Mempool中一定数量交易,并把这些交易从Mempool中删除
func (mem *Mempool) eventDelTxList(msg queue.Message) {
func (mem *Mempool) eventDelTxList(msg *queue.Message) {
hashList := msg.GetData().(*types.TxHashList)
if len(hashList.GetHashes()) == 0 {
msg.ReplyErr("EventDelTxList", types.ErrSize)
......@@ -125,7 +125,7 @@ func (mem *Mempool) eventDelTxList(msg queue.Message) {
}
// EventTxList 获取mempool中一定数量交易
func (mem *Mempool) eventTxList(msg queue.Message) {
func (mem *Mempool) eventTxList(msg *queue.Message) {
hashList := msg.GetData().(*types.TxHashList)
if hashList.Count <= 0 {
msg.Reply(mem.client.NewMessage("", types.EventReplyTxList, types.ErrSize))
......@@ -137,7 +137,7 @@ func (mem *Mempool) eventTxList(msg queue.Message) {
}
// EventAddBlock 将添加到区块内的交易从mempool中删除
func (mem *Mempool) eventAddBlock(msg queue.Message) {
func (mem *Mempool) eventAddBlock(msg *queue.Message) {
block := msg.GetData().(*types.BlockDetail).Block
if block.Height > mem.Height() || (block.Height == 0 && mem.Height() == 0) {
header := &types.Header{}
......@@ -150,21 +150,21 @@ func (mem *Mempool) eventAddBlock(msg queue.Message) {
}
// EventGetMempoolSize 获取mempool大小
func (mem *Mempool) eventGetMempoolSize(msg queue.Message) {
func (mem *Mempool) eventGetMempoolSize(msg *queue.Message) {
memSize := int64(mem.Size())
msg.Reply(mem.client.NewMessage("rpc", types.EventMempoolSize,
&types.MempoolSize{Size: memSize}))
}
// EventGetLastMempool 获取最新十条加入到mempool的交易
func (mem *Mempool) eventGetLastMempool(msg queue.Message) {
func (mem *Mempool) eventGetLastMempool(msg *queue.Message) {
txList := mem.GetLatestTx()
msg.Reply(mem.client.NewMessage("rpc", types.EventReplyTxList,
&types.ReplyTxList{Txs: txList}))
}
// EventDelBlock 回滚区块,把该区块内交易重新加回mempool
func (mem *Mempool) eventDelBlock(msg queue.Message) {
func (mem *Mempool) eventDelBlock(msg *queue.Message) {
block := msg.GetData().(*types.BlockDetail).Block
if block.Height != mem.GetHeader().GetHeight() {
return
......@@ -174,19 +174,19 @@ func (mem *Mempool) eventDelBlock(msg queue.Message) {
mlog.Error(err.Error())
return
}
h := lastHeader.(queue.Message).Data.(*types.Header)
h := lastHeader.(*queue.Message).Data.(*types.Header)
mem.setHeader(h)
mem.delBlock(block)
}
// eventGetAddrTxs 获取mempool中对应账户(组)所有交易
func (mem *Mempool) eventGetAddrTxs(msg queue.Message) {
func (mem *Mempool) eventGetAddrTxs(msg *queue.Message) {
addrs := msg.GetData().(*types.ReqAddrs)
txlist := mem.GetAccTxs(addrs)
msg.Reply(mem.client.NewMessage("", types.EventReplyAddrTxs, txlist))
}
func (mem *Mempool) checkSign(data queue.Message) queue.Message {
func (mem *Mempool) checkSign(data *queue.Message) *queue.Message {
tx, ok := data.GetData().(types.TxGroup)
if ok && tx.CheckSign() {
return data
......
......@@ -12,8 +12,8 @@ import (
//pipeline 适用于 一个问题,分成很多步完成,每步的输出作为下一步的输入
func step(done <-chan struct{}, in <-chan queue.Message, cb func(queue.Message) queue.Message) <-chan queue.Message {
out := make(chan queue.Message)
func step(done <-chan struct{}, in <-chan *queue.Message, cb func(*queue.Message) *queue.Message) <-chan *queue.Message {
out := make(chan *queue.Message)
go func() {
defer close(out)
for n := range in {
......@@ -27,10 +27,10 @@ func step(done <-chan struct{}, in <-chan queue.Message, cb func(queue.Message)
return out
}
func merge(done <-chan struct{}, cs []<-chan queue.Message) <-chan queue.Message {
func merge(done <-chan struct{}, cs []<-chan *queue.Message) <-chan *queue.Message {
var wg sync.WaitGroup
out := make(chan queue.Message)
output := func(c <-chan queue.Message) {
out := make(chan *queue.Message)
output := func(c <-chan *queue.Message) {
defer wg.Done()
for n := range c {
select {
......
......@@ -15,9 +15,9 @@ import (
func TestStep(t *testing.T) {
done := make(chan struct{})
in := make(chan queue.Message)
msg := queue.Message{ID: 0}
cb := func(in queue.Message) queue.Message {
in := make(chan *queue.Message)
msg := &queue.Message{ID: 0}
cb := func(in *queue.Message) *queue.Message {
in.ID++
time.Sleep(time.Microsecond)
return in
......@@ -31,15 +31,15 @@ func TestStep(t *testing.T) {
func TestMutiStep(t *testing.T) {
done := make(chan struct{})
in := make(chan queue.Message)
msg := queue.Message{ID: 0}
step1 := func(in queue.Message) queue.Message {
in := make(chan *queue.Message)
msg := &queue.Message{ID: 0}
step1 := func(in *queue.Message) *queue.Message {
in.ID++
time.Sleep(time.Microsecond)
return in
}
out1 := step(done, in, step1)
step2 := func(in queue.Message) queue.Message {
step2 := func(in *queue.Message) *queue.Message {
in.ID++
time.Sleep(time.Microsecond)
return in
......@@ -56,9 +56,9 @@ func TestMutiStep(t *testing.T) {
func BenchmarkStep(b *testing.B) {
done := make(chan struct{})
in := make(chan queue.Message)
msg := queue.Message{ID: 0}
cb := func(in queue.Message) queue.Message {
in := make(chan *queue.Message)
msg := &queue.Message{ID: 0}
cb := func(in *queue.Message) *queue.Message {
in.ID++
time.Sleep(100 * time.Microsecond)
return in
......@@ -78,14 +78,14 @@ func BenchmarkStep(b *testing.B) {
func BenchmarkStepMerge(b *testing.B) {
done := make(chan struct{})
in := make(chan queue.Message)
msg := queue.Message{ID: 0}
cb := func(in queue.Message) queue.Message {
in := make(chan *queue.Message)
msg := &queue.Message{ID: 0}
cb := func(in *queue.Message) *queue.Message {
in.ID++
time.Sleep(100 * time.Microsecond)
return in
}
chs := make([]<-chan queue.Message, runtime.NumCPU())
chs := make([]<-chan *queue.Message, runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
chs[i] = step(done, in, cb)
}
......@@ -102,6 +102,6 @@ func BenchmarkStepMerge(b *testing.B) {
close(done)
}
func mergeList(done <-chan struct{}, cs ...<-chan queue.Message) <-chan queue.Message {
func mergeList(done <-chan struct{}, cs ...<-chan *queue.Message) <-chan *queue.Message {
return merge(done, cs)
}
......@@ -48,7 +48,7 @@ type SubStore interface {
Rollback(req *types.ReqHash) ([]byte, error)
Del(req *types.StoreDel) ([]byte, error)
IterateRangeByStateHash(statehash []byte, start []byte, end []byte, ascending bool, fn func(key, value []byte) bool)
ProcEvent(msg queue.Message)
ProcEvent(msg *queue.Message)
}
// BaseStore 基础的store结构体
......@@ -87,7 +87,7 @@ func (store *BaseStore) SetQueueClient(c queue.Client) {
//Wait wait for basestore ready
func (store *BaseStore) Wait() {}
func (store *BaseStore) processMessage(msg queue.Message) {
func (store *BaseStore) processMessage(msg *queue.Message) {
client := store.qclient
if msg.Ty == types.EventStoreSet {
go func() {
......
......@@ -49,7 +49,7 @@ func (s *storeChild) IterateRangeByStateHash(statehash []byte, start []byte, end
}
func (s *storeChild) ProcEvent(msg queue.Message) {}
func (s *storeChild) ProcEvent(msg *queue.Message) {}
func init() {
log.SetLogLevel("error")
......
......@@ -88,7 +88,7 @@ func (mavls *Store) Get(datas *types.StoreGet) [][]byte {
var err error
values := make([][]byte, len(datas.Keys))
search := string(datas.StateHash)
if data, ok := mavls.trees.Load(search); ok {
if data, ok := mavls.trees.Load(search); ok && data != nil {
tree = data.(*mavl.Tree)
} else {
tree = mavl.NewTree(mavls.GetDB(), true)
......@@ -179,7 +179,7 @@ func (mavls *Store) IterateRangeByStateHash(statehash []byte, start []byte, end
}
// ProcEvent not support message
func (mavls *Store) ProcEvent(msg queue.Message) {
func (mavls *Store) ProcEvent(msg *queue.Message) {
msg.ReplyErr("Store", types.ErrActionNotSupport)
}
......
......@@ -16,7 +16,7 @@ func (wallet *Wallet) ProcRecvMsg() {
for msg := range wallet.client.Recv() {
walletlog.Debug("wallet recv", "msg", types.GetEventName(int(msg.Ty)), "Id", msg.ID)
beg := types.Now()
reply, err := wallet.ExecWallet(&msg)
reply, err := wallet.ExecWallet(msg)
if err != nil {
//only for test ,del when test end
msg.Reply(wallet.api.NewMessage("", 0, err))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment