Commit ce7ffc5b authored by lyn's avatar lyn Committed by vipwzw

add para mempool

parent 476b3f91
...@@ -3,4 +3,5 @@ package init ...@@ -3,4 +3,5 @@ package init
import ( import (
_ "github.com/33cn/plugin/plugin/mempool/price" //auto gen _ "github.com/33cn/plugin/plugin/mempool/price" //auto gen
_ "github.com/33cn/plugin/plugin/mempool/score" //auto gen _ "github.com/33cn/plugin/plugin/mempool/score" //auto gen
_ "github.com/33cn/plugin/plugin/mempool/para" //auto gen
) )
package para
import (
"bytes"
"context"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/rpc/grpcclient"
"github.com/33cn/chain33/types"
"fmt"
)
var mlog = log.New("module", "mempool.para")
var topic = "mempool"
//Mempool mempool 基础类
type Mempool struct {
Key string
mainGrpcCli types.Chain33Client
}
//NewMempool 新建mempool 实例
func NewMempool(cfg *types.Mempool) *Mempool {
pool := &Mempool{}
pool.Key = topic
if types.IsPara() {
grpcCli, err := grpcclient.NewMainChainClient("")
if err != nil {
panic(err)
}
pool.mainGrpcCli = grpcCli
}
return pool
}
//SetQueueClient 初始化mempool模块
func (mem *Mempool) SetQueueClient(client queue.Client) {
go func() {
client.Sub(mem.Key)
for msg := range client.Recv() {
switch msg.Ty {
case types.EventTx:
mlog.Info("Receive msg from para mempool")
if bytes.HasPrefix(msg.GetData().(*types.Transaction).Execer, types.ParaKey) {
tx := msg.GetData().(*types.Transaction)
mlog.Info("Receive tx ", tx)
var reply *types.Reply
reply, err := mem.mainGrpcCli.SendTransaction(context.Background(), tx)
if err != nil {
//进行重试
for i := 0; i < 3; i ++ {
reply, err = mem.mainGrpcCli.SendTransaction(context.Background(), tx)
if err != nil {
continue
}
}
}
if err == nil {
msg.Reply(client.NewMessage(topic, types.EventReply, &types.Reply{IsOk: true,
Msg: []byte(reply.GetMsg())}))
} else {
msg.Reply(client.NewMessage(topic, types.EventReply, &types.Reply{IsOk: false,
Msg: []byte(fmt.Sprintf("Send transaction to main chain failed, %v", err))}))
}
}
default:
}
}
}()
}
// Wait for ready
func (mem *Mempool) Wait() {}
// Close method
func (mem *Mempool) Close() {}
package para
import (
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/mempool"
"github.com/33cn/chain33/types"
)
//--------------------------------------------------------------------------------
// Module Mempool
type subConfig struct {
PoolCacheSize int64 `json:"poolCacheSize"`
ProperFee int64 `json:"properFee"`
}
func init() {
drivers.Reg("para", New)
}
//New 创建price cache 结构的 mempool
func New(cfg *types.Mempool, sub []byte) queue.Module {
return NewMempool(cfg)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment