Commit 69e7b23f authored by caopingcp's avatar caopingcp

improve tendermint stability

parent 8f8ae28f
...@@ -200,7 +200,7 @@ func (node *Node) Start() { ...@@ -200,7 +200,7 @@ func (node *Node) Start() {
ip, _ := splitHostPort(addr) ip, _ := splitHostPort(addr)
_, ok := node.localIPs[ip] _, ok := node.localIPs[ip]
if ok { if ok {
tendermintlog.Info("find our ip ", "ourip", ip) tendermintlog.Info("find our ip ", "ourIP", ip)
node.IP = ip node.IP = ip
return return
} }
...@@ -325,6 +325,8 @@ func (node *Node) UnicastRoutine() { ...@@ -325,6 +325,8 @@ func (node *Node) UnicastRoutine() {
} }
for _, peer := range node.peerSet.List() { for _, peer := range node.peerSet.List() {
if peer.ID() == msg.PeerID { if peer.ID() == msg.PeerID {
peerIP, _ := peer.RemoteIP()
msg.PeerIP = peerIP.String()
success := peer.Send(msg) success := peer.Send(msg)
if !success { if !success {
tendermintlog.Error("send failure in UnicastRoutine") tendermintlog.Error("send failure in UnicastRoutine")
...@@ -400,6 +402,7 @@ func (node *Node) addPeer(pc *peerConn) error { ...@@ -400,6 +402,7 @@ func (node *Node) addPeer(pc *peerConn) error {
ID: node.ID, ID: node.ID,
Network: node.Network, Network: node.Network,
Version: node.Version, Version: node.Version,
IP: node.IP,
} }
// Exchange NodeInfo on the conn // Exchange NodeInfo on the conn
peerNodeInfo, err := pc.HandshakeTimeout(nodeinfo, handshakeTimeout*time.Second) peerNodeInfo, err := pc.HandshakeTimeout(nodeinfo, handshakeTimeout*time.Second)
...@@ -460,6 +463,9 @@ func (node *Node) addPeer(pc *peerConn) error { ...@@ -460,6 +463,9 @@ func (node *Node) addPeer(pc *peerConn) error {
} }
tendermintlog.Info("Added peer", "peer", pc.ip) tendermintlog.Info("Added peer", "peer", pc.ip)
stateMsg := MsgInfo{TypeID: ttypes.NewRoundStepID, Msg: node.state.RoundStateMessage(), PeerID: pc.id, PeerIP: pc.ip.String()}
pc.Send(stateMsg)
tendermintlog.Info("Send state msg", "msg", stateMsg, "ourIP", node.IP, "ourID", node.ID)
return nil return nil
} }
...@@ -472,6 +478,9 @@ func (node *Node) Broadcast(msg MsgInfo) chan bool { ...@@ -472,6 +478,9 @@ func (node *Node) Broadcast(msg MsgInfo) chan bool {
wg.Add(1) wg.Add(1)
go func(peer Peer) { go func(peer Peer) {
defer wg.Done() defer wg.Done()
msg.PeerID = peer.ID()
peerIP, _ := peer.RemoteIP()
msg.PeerIP = peerIP.String()
success := peer.Send(msg) success := peer.Send(msg)
successChan <- success successChan <- success
}(peer) }(peer)
......
...@@ -165,6 +165,20 @@ func (ps *PeerSet) hasIP(peerIP net.IP) bool { ...@@ -165,6 +165,20 @@ func (ps *PeerSet) hasIP(peerIP net.IP) bool {
return false return false
} }
//
func (ps *PeerSet) GetIP(peerKey ID) net.IP {
ps.mtx.Lock()
ps.mtx.Unlock()
if item, ok := ps.lookup[peerKey]; ok {
ip, err := item.peer.RemoteIP()
if err == nil {
return ip
}
}
return nil
}
// Size of list // Size of list
func (ps *PeerSet) Size() int { func (ps *PeerSet) Size() int {
ps.mtx.Lock() ps.mtx.Lock()
...@@ -356,6 +370,14 @@ func (pc *peerConn) Send(msg MsgInfo) bool { ...@@ -356,6 +370,14 @@ func (pc *peerConn) Send(msg MsgInfo) bool {
atomic.AddInt32(&pc.sendQueueSize, 1) atomic.AddInt32(&pc.sendQueueSize, 1)
return true return true
case <-time.After(defaultSendTimeout): case <-time.After(defaultSendTimeout):
if msg.TypeID == ttypes.ProposalBlockID {
if propBlock, ok := msg.Msg.(*tmtypes.TendermintBlock); ok {
msg.Msg = &tmtypes.TendermintBlock{
Header: propBlock.Header,
LastCommit: propBlock.LastCommit,
}
}
}
tendermintlog.Error("send msg timeout", "peerip", msg.PeerIP, "msg", msg) tendermintlog.Error("send msg timeout", "peerip", msg.PeerIP, "msg", msg)
return false return false
} }
...@@ -378,29 +400,35 @@ func (pc *peerConn) TrySend(msg MsgInfo) bool { ...@@ -378,29 +400,35 @@ func (pc *peerConn) TrySend(msg MsgInfo) bool {
// Returns true if vote was sent. // Returns true if vote was sent.
func (pc *peerConn) PickSendVote(votes ttypes.VoteSetReader) bool { func (pc *peerConn) PickSendVote(votes ttypes.VoteSetReader) bool {
if useAggSig { if useAggSig {
if pc.state.AggPrecommit {
time.Sleep(pc.myState.PeerGossipSleep())
return false
}
aggVote := votes.GetAggVote() aggVote := votes.GetAggVote()
if aggVote != nil { if aggVote != nil {
if votes.IsCommit() {
pc.state.ensureCatchupCommitRound(votes.Height(), votes.Round(), votes.Size())
}
if pc.state.Height != aggVote.Height ||
(pc.state.Round != int(aggVote.Round) && pc.state.CatchupCommitRound != int(aggVote.Round)) {
return false
}
if (aggVote.Type == uint32(ttypes.VoteTypePrevote) && pc.state.AggPrevote) ||
(aggVote.Type == uint32(ttypes.VoteTypePrecommit) && pc.state.AggPrecommit) {
return false
}
msg := MsgInfo{TypeID: ttypes.AggVoteID, Msg: aggVote.AggVote, PeerID: pc.id, PeerIP: pc.ip.String()} msg := MsgInfo{TypeID: ttypes.AggVoteID, Msg: aggVote.AggVote, PeerID: pc.id, PeerIP: pc.ip.String()}
tendermintlog.Debug("Sending aggregate vote message", "msg", msg) tendermintlog.Debug("Sending aggregate vote message", "msg", msg)
if pc.Send(msg) { if pc.Send(msg) {
pc.state.SetHasAggPrecommit(aggVote) pc.state.SetHasAggVote(aggVote)
time.Sleep(pc.myState.PeerGossipSleep())
return true return true
} }
}
return false return false
} else if vote, ok := pc.state.PickVoteToSend(votes); ok { }
}
if vote, ok := pc.state.PickVoteToSend(votes); ok {
msg := MsgInfo{TypeID: ttypes.VoteID, Msg: vote.Vote, PeerID: pc.id, PeerIP: pc.ip.String()} msg := MsgInfo{TypeID: ttypes.VoteID, Msg: vote.Vote, PeerID: pc.id, PeerIP: pc.ip.String()}
tendermintlog.Debug("Sending vote message", "msg", msg) tendermintlog.Debug("Sending vote message", "msg", msg)
if pc.Send(msg) { if pc.Send(msg) {
pc.state.SetHasVote(vote) pc.state.SetHasVote(vote)
return true return true
} }
return false
} }
return false return false
} }
...@@ -666,7 +694,7 @@ FOR_LOOP: ...@@ -666,7 +694,7 @@ FOR_LOOP:
if ok { if ok {
tendermintlog.Debug("Received proposal heartbeat message", tendermintlog.Debug("Received proposal heartbeat message",
"height", msg.Height, "round", msg.Round, "sequence", msg.Sequence, "height", msg.Height, "round", msg.Round, "sequence", msg.Sequence,
"valIdx", msg.ValidatorIndex, "valAddr", msg.ValidatorAddress) "valIdx", msg.ValidatorIndex, "valAddr", fmt.Sprintf("%X", msg.ValidatorAddress))
} }
} }
} }
...@@ -688,26 +716,31 @@ OUTER_LOOP: ...@@ -688,26 +716,31 @@ OUTER_LOOP:
// If the peer is on a previous height, help catch up. // If the peer is on a previous height, help catch up.
if (0 < prs.Height) && (prs.Height < rs.Height) { if (0 < prs.Height) && (prs.Height < rs.Height) {
time.Sleep(2 * pc.myState.PeerGossipSleep()) if prs.ProposalBlockHash != nil && !prs.ProposalBlock {
if prs.Height >= rs.Height {
continue OUTER_LOOP
}
proposalBlock := pc.myState.client.LoadProposalBlock(prs.Height) proposalBlock := pc.myState.client.LoadProposalBlock(prs.Height)
if proposalBlock == nil { if proposalBlock == nil {
tendermintlog.Error("load proposal block fail", "selfHeight", rs.Height, tendermintlog.Error("load proposal block fail", "selfHeight", rs.Height,
"blockHeight", pc.myState.client.GetCurrentHeight()) "blockHeight", pc.myState.client.GetCurrentHeight())
time.Sleep(pc.myState.PeerGossipSleep())
continue OUTER_LOOP continue OUTER_LOOP
} }
newBlock := &ttypes.TendermintBlock{TendermintBlock: proposalBlock} newBlock := &ttypes.TendermintBlock{TendermintBlock: proposalBlock}
if !newBlock.HashesTo(prs.ProposalBlockHash) {
tendermintlog.Error(fmt.Sprintf("Wrong proposal block hash. Expected %X, got %X", prs.ProposalBlockHash,
newBlock.Hash()), "height", prs.Height)
time.Sleep(pc.myState.PeerGossipSleep())
continue OUTER_LOOP
}
msg := MsgInfo{TypeID: ttypes.ProposalBlockID, Msg: proposalBlock, PeerID: pc.id, PeerIP: pc.ip.String()} msg := MsgInfo{TypeID: ttypes.ProposalBlockID, Msg: proposalBlock, PeerID: pc.id, PeerIP: pc.ip.String()}
tendermintlog.Info("Sending block for catchup", "peerip", pc.ip.String(), tendermintlog.Info("Sending block for catchup", "peerIP", pc.ip.String(),
"selfHeight", rs.Height, "peerHeight", prs.Height, "block(H/R/hash)", "selfHeight", rs.Height, "peer(H/R/S)", fmt.Sprintf("%v/%v/%v", prs.Height, prs.Round, prs.Step),
fmt.Sprintf("%v/%v/%X", proposalBlock.Header.Height, proposalBlock.Header.Round, newBlock.Hash())) "block(H/R/hash)", fmt.Sprintf("%v/%v/%X", proposalBlock.Header.Height, proposalBlock.Header.Round, newBlock.Hash()))
if !pc.Send(msg) { if pc.Send(msg) {
tendermintlog.Error("send catchup block fail") prs.SetHasProposalBlock(newBlock)
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
}
// If height and round don't match, sleep. // If height and round don't match, sleep.
if (rs.Height != prs.Height) || (rs.Round != prs.Round) { if (rs.Height != prs.Height) || (rs.Round != prs.Round) {
...@@ -790,10 +823,12 @@ OUTER_LOOP: ...@@ -790,10 +823,12 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits. // If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height { if rs.Height == prs.Height {
if !useAggSig && pc.gossipVotesForHeight(rs, &prs.PeerRoundState) { if !useAggSig || gossipVotes.Load().(bool) {
if pc.gossipVotesForHeight(rs, &prs.PeerRoundState) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
}
// Special catchup logic. // Special catchup logic.
// If peer is lagging by height 1, send LastCommit. // If peer is lagging by height 1, send LastCommit.
...@@ -815,7 +850,7 @@ OUTER_LOOP: ...@@ -815,7 +850,7 @@ OUTER_LOOP:
tendermintlog.Info("Picked Catchup commit to send", tendermintlog.Info("Picked Catchup commit to send",
"commit(H/R)", fmt.Sprintf("%v/%v", commitObj.Height(), commitObj.Round()), "commit(H/R)", fmt.Sprintf("%v/%v", commitObj.Height(), commitObj.Round()),
"BitArray", commitObj.BitArray().String(), "BitArray", commitObj.BitArray().String(),
"peerip", pc.ip.String(), "height", prs.Height) "peerip", pc.ip.String(), "peer(H/R/S)", fmt.Sprintf("%v/%v/%v", prs.Height, prs.Round, prs.Step))
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
...@@ -1030,33 +1065,40 @@ func (ps *PeerConnState) SetHasProposalBlock(block *ttypes.TendermintBlock) { ...@@ -1030,33 +1065,40 @@ func (ps *PeerConnState) SetHasProposalBlock(block *ttypes.TendermintBlock) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
if ps.Height != block.Header.Height || ps.Round != int(block.Header.Round) { if ps.Height != block.Header.Height ||
(ps.Round != int(block.Header.Round) && ps.CatchupCommitRound != int(block.Header.Round)) {
return return
} }
if ps.ProposalBlock { if ps.ProposalBlock {
return return
} }
tendermintlog.Debug("Peer set proposal block", "peerip", ps.ip.String(), tendermintlog.Debug("Peer set proposal block", "peerIP", ps.ip.String(),
"peer-state", fmt.Sprintf("%v/%v/%v", ps.Height, ps.Round, ps.Step), "peer-state", fmt.Sprintf("%v/%v(%v)/%v", ps.Height, ps.Round, ps.CatchupCommitRound, ps.Step),
"block(H/R)", fmt.Sprintf("%v/%v", block.Header.Height, block.Header.Round)) "block(H/R)", fmt.Sprintf("%v/%v", block.Header.Height, block.Header.Round))
ps.ProposalBlock = true ps.ProposalBlock = true
} }
// SetHasAggPrecommit sets the given aggregate precommit as known for the peer. // SetHasAggVote sets the given aggregate precommit as known for the peer.
func (ps *PeerConnState) SetHasAggPrecommit(aggVote *ttypes.AggVote) { func (ps *PeerConnState) SetHasAggVote(aggVote *ttypes.AggVote) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
if ps.Height != aggVote.Height || ps.Round != int(aggVote.Round) { if ps.Height != aggVote.Height ||
(ps.Round != int(aggVote.Round) && ps.CatchupCommitRound != int(aggVote.Round)) {
return return
} }
if ps.AggPrecommit { if (aggVote.Type == uint32(ttypes.VoteTypePrevote) && ps.AggPrevote) ||
(aggVote.Type == uint32(ttypes.VoteTypePrecommit) && ps.AggPrecommit) {
return return
} }
tendermintlog.Debug("Peer set aggregate precommit", "peerip", ps.ip.String(), tendermintlog.Debug("Peer set aggregate vote", "peerIP", ps.ip.String(),
"peer-state", fmt.Sprintf("%v/%v/%v", ps.Height, ps.Round, ps.Step), "peer-state", fmt.Sprintf("%v/%v(%v)/%v", ps.Height, ps.Round, ps.CatchupCommitRound, ps.Step),
"aggVote(H/R)", fmt.Sprintf("%v/%v", aggVote.Height, aggVote.Round)) "aggVote(H/R/T)", fmt.Sprintf("%v/%v/%v", aggVote.Height, aggVote.Round, aggVote.Type))
if aggVote.Type == uint32(ttypes.VoteTypePrevote) {
ps.AggPrevote = true
} else if aggVote.Type == uint32(ttypes.VoteTypePrecommit) {
ps.AggPrecommit = true ps.AggPrecommit = true
}
} }
// PickVoteToSend picks a vote to send to the peer. // PickVoteToSend picks a vote to send to the peer.
...@@ -1086,7 +1128,7 @@ func (ps *PeerConnState) PickVoteToSend(votes ttypes.VoteSetReader) (vote *ttype ...@@ -1086,7 +1128,7 @@ func (ps *PeerConnState) PickVoteToSend(votes ttypes.VoteSetReader) (vote *ttype
if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok { if index, ok := votes.BitArray().Sub(psVotes).PickRandom(); ok {
tendermintlog.Debug("PickVoteToSend", "peer(H/R)", fmt.Sprintf("%v/%v", ps.Height, ps.Round), tendermintlog.Debug("PickVoteToSend", "peer(H/R)", fmt.Sprintf("%v/%v", ps.Height, ps.Round),
"vote(H/R)", fmt.Sprintf("%v/%v", height, round), "type", voteType, "selfVotes", votes.BitArray().String(), "vote(H/R)", fmt.Sprintf("%v/%v", height, round), "type", voteType, "selfVotes", votes.BitArray().String(),
"peerVotes", psVotes.String(), "peerip", ps.ip.String()) "peerVotes", psVotes.String(), "peerIP", ps.ip.String())
return votes.GetByIndex(index), true return votes.GetByIndex(index), true
} }
return nil, false return nil, false
...@@ -1252,6 +1294,7 @@ func (ps *PeerConnState) ApplyNewRoundStepMessage(msg *tmtypes.NewRoundStepMsg) ...@@ -1252,6 +1294,7 @@ func (ps *PeerConnState) ApplyNewRoundStepMessage(msg *tmtypes.NewRoundStepMsg)
// We'll update the BitArray capacity later. // We'll update the BitArray capacity later.
ps.Prevotes = nil ps.Prevotes = nil
ps.Precommits = nil ps.Precommits = nil
ps.AggPrevote = false
ps.AggPrecommit = false ps.AggPrecommit = false
} }
if psHeight == msg.Height && psRound != int(msg.Round) && int(msg.Round) == psCatchupCommitRound { if psHeight == msg.Height && psRound != int(msg.Round) && int(msg.Round) == psCatchupCommitRound {
...@@ -1291,11 +1334,17 @@ func (ps *PeerConnState) ApplyValidBlockMessage(msg *tmtypes.ValidBlockMsg) { ...@@ -1291,11 +1334,17 @@ func (ps *PeerConnState) ApplyValidBlockMessage(msg *tmtypes.ValidBlockMsg) {
if ps.Round != int(msg.Round) && !msg.IsCommit { if ps.Round != int(msg.Round) && !msg.IsCommit {
return return
} }
tendermintlog.Debug("ApplyValidBlockMessage", "peerip", ps.ip.String(), tendermintlog.Debug("ApplyValidBlockMessage", "peerIP", ps.ip.String(),
"peer(H/R)", fmt.Sprintf("%v/%v", ps.Height, ps.Round), "peer(H/R/S)", fmt.Sprintf("%v/%v/%v", ps.Height, ps.Round, ps.Step),
"blockhash", fmt.Sprintf("%X", msg.Blockhash)) "blockhash", fmt.Sprintf("%X", msg.Blockhash))
ps.ProposalBlockHash = msg.Blockhash ps.ProposalBlockHash = msg.Blockhash
if ps.CatchupCommitRound == int(msg.Round) && msg.IsCommit {
tendermintlog.Info("Set ProposalBlockHash for catchup", "peerIP", ps.ip.String(),
"peer(H/R/S)", fmt.Sprintf("%v/%v/%v", ps.Height, ps.Round, ps.Step),
"CommitRound", ps.CatchupCommitRound,
"ProposalBlockHash", fmt.Sprintf("%X", ps.ProposalBlockHash))
}
} }
// ApplyProposalPOLMessage updates the peer state for the new proposal POL. // ApplyProposalPOLMessage updates the peer state for the new proposal POL.
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
"sync/atomic"
"time" "time"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
...@@ -47,12 +48,13 @@ var ( ...@@ -47,12 +48,13 @@ var (
preExec = false preExec = false
createEmptyBlocksInterval int32 // second createEmptyBlocksInterval int32 // second
validatorNodes = []string{"127.0.0.1:46656"} validatorNodes = []string{"127.0.0.1:46656"}
peerGossipSleepDuration int32 = 200 peerGossipSleepDuration int32 = 100
peerQueryMaj23SleepDuration int32 = 2000 peerQueryMaj23SleepDuration int32 = 2000
zeroHash [32]byte zeroHash [32]byte
random *rand.Rand random *rand.Rand
signName = "ed25519" signName = "ed25519"
useAggSig = false useAggSig = false
gossipVotes atomic.Value
) )
func init() { func init() {
...@@ -150,6 +152,7 @@ func applyConfig(sub []byte) { ...@@ -150,6 +152,7 @@ func applyConfig(sub []byte) {
signName = subcfg.SignName signName = subcfg.SignName
} }
useAggSig = subcfg.UseAggregateSignature useAggSig = subcfg.UseAggregateSignature
gossipVotes.Store(true)
} }
// DefaultDBProvider returns a database using the DBBackend and DBDir // DefaultDBProvider returns a database using the DBBackend and DBDir
...@@ -306,12 +309,11 @@ OuterLoop: ...@@ -306,12 +309,11 @@ OuterLoop:
} }
tendermintlog.Info("Save state from block") tendermintlog.Info("Save state from block")
} }
tendermintlog.Debug("Load state finish", "state", state)
// start // start
tendermintlog.Info("StartConsensus", tendermintlog.Info("StartConsensus",
"privValidator", fmt.Sprintf("%X", ttypes.Fingerprint(client.privValidator.GetAddress())), "privValidator", fmt.Sprintf("%X", ttypes.Fingerprint(client.privValidator.GetAddress())),
"Validators", state.Validators.String()) "state", state)
// Log whether this node is a validator or an observer // Log whether this node is a validator or an observer
if state.Validators.HasAddress(client.privValidator.GetAddress()) { if state.Validators.HasAddress(client.privValidator.GetAddress()) {
tendermintlog.Info("This node is a validator") tendermintlog.Info("This node is a validator")
...@@ -424,7 +426,6 @@ func (client *Client) ProcEvent(msg *queue.Message) bool { ...@@ -424,7 +426,6 @@ func (client *Client) ProcEvent(msg *queue.Message) bool {
// CreateBlock a routine monitor whether some transactions available and tell client by available channel // CreateBlock a routine monitor whether some transactions available and tell client by available channel
func (client *Client) CreateBlock() { func (client *Client) CreateBlock() {
issleep := true
for { for {
if client.IsClosed() { if client.IsClosed() {
tendermintlog.Info("CreateBlock quit") tendermintlog.Info("CreateBlock quit")
...@@ -432,23 +433,18 @@ func (client *Client) CreateBlock() { ...@@ -432,23 +433,18 @@ func (client *Client) CreateBlock() {
} }
if !client.csState.IsRunning() { if !client.csState.IsRunning() {
tendermintlog.Info("consensus not running") tendermintlog.Info("consensus not running")
time.Sleep(time.Second) time.Sleep(500 * time.Millisecond)
continue continue
} }
if issleep {
time.Sleep(time.Second)
}
height, err := client.getLastHeight() height, err := client.getLastHeight()
if err != nil { if err != nil {
issleep = true
continue continue
} }
if !client.CheckTxsAvailable(height) { if !client.CheckTxsAvailable(height) {
issleep = true time.Sleep(500 * time.Millisecond)
continue continue
} }
issleep = false
client.txsAvailable <- height + 1 client.txsAvailable <- height + 1
time.Sleep(time.Duration(timeoutTxAvail) * time.Millisecond) time.Sleep(time.Duration(timeoutTxAvail) * time.Millisecond)
...@@ -590,7 +586,7 @@ func (client *Client) QueryValidatorsByHeight(height int64) (*tmtypes.ValNodes, ...@@ -590,7 +586,7 @@ func (client *Client) QueryValidatorsByHeight(height int64) (*tmtypes.ValNodes,
if height < 1 { if height < 1 {
return nil, ttypes.ErrHeightLessThanOne return nil, ttypes.ErrHeightLessThanOne
} }
req := &tmtypes.ReqNodeInfo{Height: height} req := &tmtypes.ReqValNodes{Height: height}
param, err := proto.Marshal(req) param, err := proto.Marshal(req)
if err != nil { if err != nil {
tendermintlog.Error("QueryValidatorsByHeight marshal", "err", err) tendermintlog.Error("QueryValidatorsByHeight marshal", "err", err)
...@@ -670,22 +666,43 @@ func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error) ...@@ -670,22 +666,43 @@ func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error)
// Query_NodeInfo query validator node info // Query_NodeInfo query validator node info
func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) { func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) {
nodes := client.csState.GetRoundState().Validators.Validators vals := client.csState.GetRoundState().Validators.Validators
validators := make([]*tmtypes.Validator, 0) nodes := make([]*tmtypes.ValNodeInfo, 0)
for _, node := range nodes { for _, val := range vals {
if node == nil { if val == nil {
validators = append(validators, &tmtypes.Validator{}) nodes = append(nodes, &tmtypes.ValNodeInfo{})
} else {
ipstr, idstr := "UNKOWN", "UNKOWN"
pub, err := ttypes.ConsensusCrypto.PubKeyFromBytes(val.PubKey)
if err != nil {
tendermintlog.Error("Query_NodeInfo invalid pubkey", "err", err)
} else {
id := GenIDByPubKey(pub)
idstr = string(id)
if id == client.node.ID {
ipstr = client.node.IP
} else { } else {
item := &tmtypes.Validator{ ip := client.node.peerSet.GetIP(id)
Address: node.Address, if ip == nil {
PubKey: node.PubKey, tendermintlog.Error("Query_NodeInfo nil ip", "id", idstr)
VotingPower: node.VotingPower, } else {
Accum: node.Accum, ipstr = ip.String()
}
}
}
item := &tmtypes.ValNodeInfo{
NodeIP: ipstr,
NodeID: idstr,
Address: fmt.Sprintf("%X", val.Address),
PubKey: fmt.Sprintf("%X", val.PubKey),
VotingPower: val.VotingPower,
Accum: val.Accum,
} }
validators = append(validators, item) nodes = append(nodes, item)
} }
} }
return &tmtypes.ValidatorSet{Validators: validators, Proposer: &tmtypes.Validator{}}, nil return &tmtypes.ValNodeInfoSet{Nodes: nodes}, nil
} }
// CmpBestBlock 比较newBlock是不是最优区块 // CmpBestBlock 比较newBlock是不是最优区块
......
...@@ -272,7 +272,7 @@ func CheckState(t *testing.T, client *Client) { ...@@ -272,7 +272,7 @@ func CheckState(t *testing.T, client *Client) {
assert.Equal(t, client.csState.Prevote(0), 1000*time.Millisecond) assert.Equal(t, client.csState.Prevote(0), 1000*time.Millisecond)
assert.Equal(t, client.csState.Precommit(0), 1000*time.Millisecond) assert.Equal(t, client.csState.Precommit(0), 1000*time.Millisecond)
assert.Equal(t, client.csState.PeerGossipSleep(), 200*time.Millisecond) assert.Equal(t, client.csState.PeerGossipSleep(), 100*time.Millisecond)
assert.Equal(t, client.csState.PeerQueryMaj23Sleep(), 2000*time.Millisecond) assert.Equal(t, client.csState.PeerQueryMaj23Sleep(), 2000*time.Millisecond)
assert.Equal(t, client.csState.IsProposer(), true) assert.Equal(t, client.csState.IsProposer(), true)
assert.Nil(t, client.csState.GetPrevotesState(state.LastBlockHeight, 0, nil)) assert.Nil(t, client.csState.GetPrevotesState(state.LastBlockHeight, 0, nil))
...@@ -286,7 +286,7 @@ func CheckState(t *testing.T, client *Client) { ...@@ -286,7 +286,7 @@ func CheckState(t *testing.T, client *Client) {
msg2, err := client.Query_NodeInfo(&types.ReqNil{}) msg2, err := client.Query_NodeInfo(&types.ReqNil{})
assert.Nil(t, err) assert.Nil(t, err)
tvals := msg2.(*vty.ValidatorSet).Validators tvals := msg2.(*vty.ValNodeInfoSet).Nodes
assert.Len(t, tvals, 1) assert.Len(t, tvals, 1)
err = client.CommitBlock(client.GetCurrentBlock()) err = client.CommitBlock(client.GetCurrentBlock())
......
...@@ -201,7 +201,7 @@ func (h *Header) StringIndented(indent string) string { ...@@ -201,7 +201,7 @@ func (h *Header) StringIndented(indent string) string {
%s LastCommit: %v %s LastCommit: %v
%s Validators: %v %s Validators: %v
%s App: %v %s App: %v
%s Conensus: %v %s Consensus: %v
%s Results: %v %s Results: %v
%s}#%v`, %s}#%v`,
indent, h.ChainID, indent, h.ChainID,
...@@ -304,7 +304,7 @@ func (commit *Commit) IsCommit() bool { ...@@ -304,7 +304,7 @@ func (commit *Commit) IsCommit() bool {
// GetAggVote ... // GetAggVote ...
func (commit *Commit) GetAggVote() *AggVote { func (commit *Commit) GetAggVote() *AggVote {
if commit == nil { if commit == nil || commit.AggVote == nil {
return nil return nil
} }
aggVote := &AggVote{commit.AggVote} aggVote := &AggVote{commit.AggVote}
......
...@@ -26,10 +26,12 @@ const ( ...@@ -26,10 +26,12 @@ const (
RoundStepNewRound = RoundStepType(0x02) // Setup new round and go to RoundStepPropose RoundStepNewRound = RoundStepType(0x02) // Setup new round and go to RoundStepPropose
RoundStepPropose = RoundStepType(0x03) // Did propose, gossip proposal RoundStepPropose = RoundStepType(0x03) // Did propose, gossip proposal
RoundStepPrevote = RoundStepType(0x04) // Did prevote, gossip prevotes RoundStepPrevote = RoundStepType(0x04) // Did prevote, gossip prevotes
RoundStepPrevoteWait = RoundStepType(0x05) // Did receive any +2/3 prevotes, start timeout RoundStepAggPrevoteWait = RoundStepType(0x05) // Did send prevote for aggregate, start timeout
RoundStepPrecommit = RoundStepType(0x06) // Did precommit, gossip precommits RoundStepPrevoteWait = RoundStepType(0x06) // Did receive any +2/3 prevotes, start timeout
RoundStepPrecommitWait = RoundStepType(0x07) // Did receive any +2/3 precommits, start timeout RoundStepPrecommit = RoundStepType(0x07) // Did precommit, gossip precommits
RoundStepCommit = RoundStepType(0x08) // Entered commit state machine RoundStepAggPrecommitWait = RoundStepType(0x08) // Did send precommit for aggregate, start timeout
RoundStepPrecommitWait = RoundStepType(0x09) // Did receive any +2/3 precommits, start timeout
RoundStepCommit = RoundStepType(0x10) // Entered commit state machine
// NOTE: RoundStepNewHeight acts as RoundStepCommitWait. // NOTE: RoundStepNewHeight acts as RoundStepCommitWait.
NewRoundStepID = byte(0x01) NewRoundStepID = byte(0x01)
...@@ -84,6 +86,10 @@ func (rs RoundStepType) String() string { ...@@ -84,6 +86,10 @@ func (rs RoundStepType) String() string {
return "RoundStepPrecommitWait" return "RoundStepPrecommitWait"
case RoundStepCommit: case RoundStepCommit:
return "RoundStepCommit" return "RoundStepCommit"
case RoundStepAggPrevoteWait:
return "RoundStepAggPrevoteWait"
case RoundStepAggPrecommitWait:
return "RoundStepAggPrecommitWait"
default: default:
return "RoundStepUnknown" // Cannot panic. return "RoundStepUnknown" // Cannot panic.
} }
...@@ -188,6 +194,7 @@ type PeerRoundState struct { ...@@ -188,6 +194,7 @@ type PeerRoundState struct {
LastCommit *BitArray // All commit precommits of commit for last height. LastCommit *BitArray // All commit precommits of commit for last height.
CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none. CatchupCommitRound int // Round that we have commit for. Not necessarily unique. -1 if none.
CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound CatchupCommit *BitArray // All commit precommits peer has for this height & CatchupCommitRound
AggPrevote bool // True if peer has aggregate prevote for this round
AggPrecommit bool // True if peer has aggregate precommit for this round AggPrecommit bool // True if peer has aggregate precommit for this round
} }
...@@ -208,6 +215,7 @@ func (prs PeerRoundState) StringIndented(indent string) string { ...@@ -208,6 +215,7 @@ func (prs PeerRoundState) StringIndented(indent string) string {
%s Precommits %v %s Precommits %v
%s LastCommit %v (round %v) %s LastCommit %v (round %v)
%s CatchupCommit %v (round %v) %s CatchupCommit %v (round %v)
%s AggPrevote %v
%s AggPrecommit %v %s AggPrecommit %v
%s}`, %s}`,
indent, prs.Height, prs.Round, prs.Step, prs.StartTime, indent, prs.Height, prs.Round, prs.Step, prs.StartTime,
...@@ -219,6 +227,7 @@ func (prs PeerRoundState) StringIndented(indent string) string { ...@@ -219,6 +227,7 @@ func (prs PeerRoundState) StringIndented(indent string) string {
indent, prs.Precommits, indent, prs.Precommits,
indent, prs.LastCommit, prs.LastCommitRound, indent, prs.LastCommit, prs.LastCommitRound,
indent, prs.CatchupCommit, prs.CatchupCommitRound, indent, prs.CatchupCommit, prs.CatchupCommitRound,
indent, prs.AggPrevote,
indent, prs.AggPrecommit, indent, prs.AggPrecommit,
indent) indent)
} }
......
...@@ -40,6 +40,7 @@ func ValCmd() *cobra.Command { ...@@ -40,6 +40,7 @@ func ValCmd() *cobra.Command {
IsSyncCmd(), IsSyncCmd(),
GetBlockInfoCmd(), GetBlockInfoCmd(),
GetNodeInfoCmd(), GetNodeInfoCmd(),
GetPerfStatCmd(),
AddNodeCmd(), AddNodeCmd(),
CreateCmd(), CreateCmd(),
) )
...@@ -75,7 +76,7 @@ func GetNodeInfoCmd() *cobra.Command { ...@@ -75,7 +76,7 @@ func GetNodeInfoCmd() *cobra.Command {
func getNodeInfo(cmd *cobra.Command, args []string) { func getNodeInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr") rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res []*vt.Validator var res *vt.ValNodeInfoSet
ctx := jsonclient.NewRPCCtx(rpcLaddr, "valnode.GetNodeInfo", nil, &res) ctx := jsonclient.NewRPCCtx(rpcLaddr, "valnode.GetNodeInfo", nil, &res)
ctx.Run() ctx.Run()
} }
...@@ -113,6 +114,41 @@ func getBlockInfo(cmd *cobra.Command, args []string) { ...@@ -113,6 +114,41 @@ func getBlockInfo(cmd *cobra.Command, args []string) {
ctx.Run() ctx.Run()
} }
// GetPerfStatCmd get block info
func GetPerfStatCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "stat",
Short: "Get tendermint performance statistics",
Run: getPerfStat,
}
addGetPerfStatFlags(cmd)
return cmd
}
func addGetPerfStatFlags(cmd *cobra.Command) {
cmd.Flags().Int64P("start", "s", 0, "start block height")
cmd.Flags().Int64P("end", "e", 0, "end block height")
}
func getPerfStat(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
start, _ := cmd.Flags().GetInt64("start")
end, _ := cmd.Flags().GetInt64("end")
req := &vt.ReqPerfStat{
Start: start,
End: end,
}
params := rpctypes.Query4Jrpc{
Execer: vt.ValNodeX,
FuncName: "GetPerfState",
Payload: types.MustPBToJSON(req),
}
var res vt.PerfStat
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.Query", params, &res)
ctx.Run()
}
// AddNodeCmd add validator node // AddNodeCmd add validator node
func AddNodeCmd() *cobra.Command { func AddNodeCmd() *cobra.Command {
cmd := &cobra.Command{ cmd := &cobra.Command{
......
...@@ -10,7 +10,7 @@ import ( ...@@ -10,7 +10,7 @@ import (
) )
// Query_GetValNodeByHeight method // Query_GetValNodeByHeight method
func (val *ValNode) Query_GetValNodeByHeight(in *pty.ReqNodeInfo) (types.Message, error) { func (val *ValNode) Query_GetValNodeByHeight(in *pty.ReqValNodes) (types.Message, error) {
height := in.GetHeight() height := in.GetHeight()
if height <= 0 { if height <= 0 {
...@@ -60,3 +60,60 @@ func (val *ValNode) Query_GetBlockInfoByHeight(in *pty.ReqBlockInfo) (types.Mess ...@@ -60,3 +60,60 @@ func (val *ValNode) Query_GetBlockInfoByHeight(in *pty.ReqBlockInfo) (types.Mess
} }
return reply, nil return reply, nil
} }
// Query_GetPerfState method
func (val *ValNode) Query_GetPerfState(in *pty.ReqPerfStat) (types.Message, error) {
start := in.GetStart()
end := in.GetEnd()
if start < 0 || end < 0 || start > end || end > val.GetHeight() {
return nil, types.ErrInvalidParam
}
if start == 0 {
start = 1
}
if end == 0 {
end = val.GetHeight()
}
startKey := CalcValNodeBlockInfoHeightKey(start)
startValue, err := val.GetLocalDB().Get(startKey)
if err != nil {
return nil, err
}
if len(startValue) == 0 {
return nil, types.ErrNotFound
}
startInfo := &pty.TendermintBlockInfo{}
err = types.Decode(startValue, startInfo)
if err != nil {
return nil, err
}
endKey := CalcValNodeBlockInfoHeightKey(end)
endValue, err := val.GetLocalDB().Get(endKey)
if err != nil {
return nil, err
}
if len(endValue) == 0 {
return nil, types.ErrNotFound
}
endInfo := &pty.TendermintBlockInfo{}
err = types.Decode(endValue, endInfo)
if err != nil {
return nil, err
}
startHeader := startInfo.Block.Header
endHeader := endInfo.Block.Header
totalTx := endHeader.TotalTxs - startHeader.TotalTxs
totalBlock := endHeader.Height - startHeader.Height + 1
totalSecond := endHeader.Time - startHeader.Time + 1
return &pty.PerfStat{
TotalTx: totalTx,
TotalBlock: totalBlock,
TxPerBlock: totalTx / totalBlock,
TotalSecond: totalSecond,
TxPerSecond: totalTx / totalSecond,
}, nil
}
...@@ -21,7 +21,7 @@ message ValNodeAction { ...@@ -21,7 +21,7 @@ message ValNodeAction {
int32 Ty = 3; int32 Ty = 3;
} }
message ReqNodeInfo { message ReqValNodes {
int64 height = 1; int64 height = 1;
} }
...@@ -29,7 +29,33 @@ message ReqBlockInfo { ...@@ -29,7 +29,33 @@ message ReqBlockInfo {
int64 height = 1; int64 height = 1;
} }
message ValNodeInfo {
string nodeIP = 1;
string nodeID = 2;
string address = 3;
string pubKey = 4;
int64 votingPower = 5;
int64 accum = 6;
}
message ValNodeInfoSet {
repeated ValNodeInfo nodes = 1;
}
message PerfStat {
int64 totalTx = 1;
int64 totalBlock = 2;
int64 txPerBlock = 3;
int64 totalSecond = 4;
int64 txPerSecond = 5;
}
message ReqPerfStat {
int64 start = 1;
int64 end = 2;
}
service valnode { service valnode {
rpc IsSync(ReqNil) returns (IsHealthy) {} rpc IsSync(ReqNil) returns (IsHealthy) {}
rpc GetNodeInfo(ReqNil) returns (ValidatorSet) {} rpc GetNodeInfo(ReqNil) returns (ValNodeInfoSet) {}
} }
\ No newline at end of file
...@@ -34,12 +34,12 @@ func (c *Jrpc) IsSync(req *types.ReqNil, result *interface{}) error { ...@@ -34,12 +34,12 @@ func (c *Jrpc) IsSync(req *types.ReqNil, result *interface{}) error {
} }
// GetNodeInfo query node info // GetNodeInfo query node info
func (c *channelClient) GetNodeInfo(ctx context.Context, req *types.ReqNil) (*vt.ValidatorSet, error) { func (c *channelClient) GetNodeInfo(ctx context.Context, req *types.ReqNil) (*vt.ValNodeInfoSet, error) {
data, err := c.QueryConsensusFunc("tendermint", "NodeInfo", &types.ReqNil{}) data, err := c.QueryConsensusFunc("tendermint", "NodeInfo", &types.ReqNil{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if resp, ok := data.(*vt.ValidatorSet); ok { if resp, ok := data.(*vt.ValNodeInfoSet); ok {
return resp, nil return resp, nil
} }
return nil, types.ErrDecode return nil, types.ErrDecode
...@@ -51,6 +51,6 @@ func (c *Jrpc) GetNodeInfo(req *types.ReqNil, result *interface{}) error { ...@@ -51,6 +51,6 @@ func (c *Jrpc) GetNodeInfo(req *types.ReqNil, result *interface{}) error {
if err != nil { if err != nil {
return err return err
} }
*result = data.Validators *result = data
return nil return nil
} }
...@@ -63,15 +63,16 @@ func TestChannelClient_GetNodeInfo(t *testing.T) { ...@@ -63,15 +63,16 @@ func TestChannelClient_GetNodeInfo(t *testing.T) {
client := newGrpc(api) client := newGrpc(api)
client.Init("valnode", nil, nil, nil) client.Init("valnode", nil, nil, nil)
req := &types.ReqNil{} req := &types.ReqNil{}
node := &vt.Validator{ node := &vt.ValNodeInfo{
Address: []byte("aaa"), NodeIP: "127.0.0.1",
PubKey: []byte("bbb"), NodeID: "001",
Address: "aaa",
PubKey: "bbb",
VotingPower: 10, VotingPower: 10,
Accum: -1, Accum: -1,
} }
set := &vt.ValidatorSet{ set := &vt.ValNodeInfoSet{
Validators: []*vt.Validator{node}, Nodes: []*vt.ValNodeInfo{node},
Proposer: node,
} }
api.On("QueryConsensusFunc", "tendermint", "NodeInfo", req).Return(set, nil) api.On("QueryConsensusFunc", "tendermint", "NodeInfo", req).Return(set, nil)
result, err := client.GetNodeInfo(context.Background(), req) result, err := client.GetNodeInfo(context.Background(), req)
...@@ -84,18 +85,19 @@ func TestJrpc_GetNodeInfo(t *testing.T) { ...@@ -84,18 +85,19 @@ func TestJrpc_GetNodeInfo(t *testing.T) {
J := newJrpc(api) J := newJrpc(api)
req := &types.ReqNil{} req := &types.ReqNil{}
var result interface{} var result interface{}
node := &vt.Validator{ node := &vt.ValNodeInfo{
Address: []byte("aaa"), NodeIP: "127.0.0.1",
PubKey: []byte("bbb"), NodeID: "001",
Address: "aaa",
PubKey: "bbb",
VotingPower: 10, VotingPower: 10,
Accum: -1, Accum: -1,
} }
set := &vt.ValidatorSet{ set := &vt.ValNodeInfoSet{
Validators: []*vt.Validator{node}, Nodes: []*vt.ValNodeInfo{node},
Proposer: node,
} }
api.On("QueryConsensusFunc", "tendermint", "NodeInfo", req).Return(set, nil) api.On("QueryConsensusFunc", "tendermint", "NodeInfo", req).Return(set, nil)
err := J.GetNodeInfo(req, &result) err := J.GetNodeInfo(req, &result)
assert.Nil(t, err) assert.Nil(t, err)
assert.EqualValues(t, set.Validators, result) assert.EqualValues(t, set, result)
} }
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment