Commit ac298e42 authored by madengji's avatar madengji Committed by 33cn

add bls debug info

parent fc31458f
...@@ -327,14 +327,14 @@ func (client *client) ProcEvent(msg *queue.Message) bool { ...@@ -327,14 +327,14 @@ func (client *client) ProcEvent(msg *queue.Message) bool {
plog.Error("paracross ProcEvent decode", "ty", types.EventReceiveSubData) plog.Error("paracross ProcEvent decode", "ty", types.EventReceiveSubData)
return true return true
} }
plog.Info("paracross ProcEvent from", "from", req.GetFrom(), "topic:", req.GetTopic(), "ty", sub.GetTy()) plog.Info("paracross ProcEvent", "from", req.GetFrom(), "topic:", req.GetTopic(), "ty", sub.GetTy())
switch sub.GetTy() { switch sub.GetTy() {
case P2pSubCommitTx: case P2pSubCommitTx:
go client.blsSignCli.rcvCommitTx(sub.GetCommitTx()) go client.blsSignCli.rcvCommitTx(sub.GetCommitTx())
case P2pSubLeaderSyncMsg: case P2pSubLeaderSyncMsg:
err := client.blsSignCli.rcvLeaderSyncTx(sub.GetSyncMsg()) err := client.blsSignCli.rcvLeaderSyncTx(sub.GetSyncMsg())
if err != nil { if err != nil {
plog.Info("paracross ProcEvent leader sync msg", "err", err) plog.Error("paracross ProcEvent leader sync msg", "err", err)
} }
default: default:
plog.Error("paracross ProcEvent not support", "ty", sub.GetTy()) plog.Error("paracross ProcEvent not support", "ty", sub.GetTy())
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
const ( const (
P2pSubCommitTx = 1 P2pSubCommitTx = 1
P2pSubLeaderSyncMsg = 2 P2pSubLeaderSyncMsg = 2
moduleName = "consensus"
) )
func (client *client) sendP2PMsg(ty int64, data interface{}) ([]byte, error) { func (client *client) sendP2PMsg(ty int64, data interface{}) ([]byte, error) {
...@@ -40,14 +41,29 @@ func (client *client) SendPubP2PMsg(topic string, msg []byte) error { ...@@ -40,14 +41,29 @@ func (client *client) SendPubP2PMsg(topic string, msg []byte) error {
} }
func (client *client) SendSubP2PTopic(topic string) error { func (client *client) SendSubP2PTopic(topic string) error {
data := &types.SubTopic{Topic: topic, Module: "consensus"} data := &types.SubTopic{Topic: topic, Module: moduleName}
_, err := client.sendP2PMsg(types.EventSubTopic, data) _, err := client.sendP2PMsg(types.EventSubTopic, data)
return err return err
} }
func (client *client) SendRmvP2PTopic(topic string) error { func (client *client) SendRmvP2PTopic(topic string) error {
data := &types.RemoveTopic{Topic: topic, Module: "consensus"} data := &types.RemoveTopic{Topic: topic, Module: moduleName}
_, err := client.sendP2PMsg(types.EventRemoveTopic, data) _, err := client.sendP2PMsg(types.EventRemoveTopic, data)
return err return err
} }
func (client *client) SendFetchP2PTopic() (*types.TopicList, error) {
data := &types.FetchTopicList{Module: moduleName}
msg, err := client.sendP2PMsg(types.EventFetchTopics, data)
if err != nil {
return nil, errors.Wrap(err, "reply fail")
}
var reply types.TopicList
err = types.Decode(msg, &reply)
if err != nil {
return nil, errors.Wrap(err, "decode fail")
}
return &reply, err
}
...@@ -6,6 +6,7 @@ package para ...@@ -6,6 +6,7 @@ package para
import ( import (
"bytes" "bytes"
"fmt"
"sort" "sort"
"strings" "strings"
"sync" "sync"
...@@ -101,6 +102,11 @@ out: ...@@ -101,6 +102,11 @@ out:
} }
case <-watchDogTicker: case <-watchDogTicker:
//排除不在Nodegroup里面的Node
if !b.isValidNodes(b.selfID) {
plog.Info("procLeaderSync watchdog, not in nodegroup", "self", b.selfID)
continue
}
//至少1分钟内要收到leader喂狗消息,否则认为leader挂了,index++ //至少1分钟内要收到leader喂狗消息,否则认为leader挂了,index++
if atomic.LoadUint32(&b.feedDog) == 0 { if atomic.LoadUint32(&b.feedDog) == 0 {
nodes, leader, _, off, _ := b.getLeaderInfo() nodes, leader, _, off, _ := b.getLeaderInfo()
...@@ -538,6 +544,12 @@ func (b *blsClient) showTxBuffInfo() *pt.ParaBlsSignSumInfo { ...@@ -538,6 +544,12 @@ func (b *blsClient) showTxBuffInfo() *pt.ParaBlsSignSumInfo {
b.mutex.Lock() b.mutex.Lock()
defer b.mutex.Unlock() defer b.mutex.Unlock()
reply, err := b.paraClient.SendFetchP2PTopic()
if err != nil {
plog.Error("fetch p2p topic", "err", err)
}
plog.Info("fetch p2p topics", "list", fmt.Sprint(reply.Topics))
var seq []int64 var seq []int64
var ret pt.ParaBlsSignSumInfo var ret pt.ParaBlsSignSumInfo
for k := range b.commitsPool { for k := range b.commitsPool {
......
...@@ -49,6 +49,11 @@ func (mem *Mempool) SetQueueClient(client queue.Client) { ...@@ -49,6 +49,11 @@ func (mem *Mempool) SetQueueClient(client queue.Client) {
reply, err = mem.mainGrpcCli.SendTransaction(context.Background(), tx) reply, err = mem.mainGrpcCli.SendTransaction(context.Background(), tx)
case types.EventGetProperFee: case types.EventGetProperFee:
reply, err = mem.mainGrpcCli.GetProperFee(context.Background(), &types.ReqProperFee{}) reply, err = mem.mainGrpcCli.GetProperFee(context.Background(), &types.ReqProperFee{})
case types.EventGetMempoolSize:
// 消息类型EventGetMempoolSize:获取mempool大小
size := types.Conf(client.GetConfig(), "config.mempool").GInt("poolCacheSize")
msg.Reply(mem.client.NewMessage("rpc", types.EventMempoolSize, &types.MempoolSize{Size: size}))
continue
default: default:
msg.Reply(client.NewMessage(mem.key, types.EventReply, types.ErrActionNotSupport)) msg.Reply(client.NewMessage(mem.key, types.EventReply, types.ErrActionNotSupport))
continue continue
......
...@@ -43,4 +43,13 @@ func TestParaNodeMempool(t *testing.T) { ...@@ -43,4 +43,13 @@ func TestParaNodeMempool(t *testing.T) {
tx := util.CreateTxWithExecer(chainCfg, mockpara.Para.GetGenesisKey(), "user.p.guodun.none") tx := util.CreateTxWithExecer(chainCfg, mockpara.Para.GetGenesisKey(), "user.p.guodun.none")
hash := mockpara.Para.SendTx(tx) hash := mockpara.Para.SendTx(tx)
assert.Equal(t, tx.Hash(), hash) assert.Equal(t, tx.Hash(), hash)
msg := para.GetClient().NewMessage("mempool", types.EventGetMempoolSize, nil)
para.GetClient().Send(msg, true)
reply, err := para.GetClient().Wait(msg)
if err != nil {
t.Error(err)
return
}
t.Log("TestGetMempoolSize ", reply.GetData().(*types.MempoolSize).Size)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment