Commit d9cf7b75 authored by caopingcp's avatar caopingcp Committed by vipwzw

fix qbft ut

parent 140fe92a
...@@ -337,8 +337,8 @@ Enable=0 ...@@ -337,8 +337,8 @@ Enable=0
Enable=0 Enable=0
ForkStorageLocalDB=0 ForkStorageLocalDB=0
[fork.sub.qbftNode]
Enable=0
[fork.sub.multisig] [fork.sub.multisig]
Enable=0 Enable=0
......
Title="qbft" Title="local"
[log] [log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit # 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
...@@ -86,8 +86,7 @@ timeoutPrecommit=2000 ...@@ -86,8 +86,7 @@ timeoutPrecommit=2000
timeoutPrecommitDelta=500 timeoutPrecommitDelta=500
timeoutCommit=500 timeoutCommit=500
skipTimeoutCommit=false skipTimeoutCommit=false
createEmptyBlocks=true emptyBlockInterval=2
createEmptyBlocksInterval=2
genesisFile="genesis_file.json" genesisFile="genesis_file.json"
privFile="priv_validator_0.json" privFile="priv_validator_0.json"
dbPath="datadir/qbft" dbPath="datadir/qbft"
......
...@@ -312,7 +312,7 @@ func (cs *ConsensusState) updateToState(state State) { ...@@ -312,7 +312,7 @@ func (cs *ConsensusState) updateToState(state State) {
return return
} }
// disable gossip votes // disable gossip votes
if useAggSig && gossipVotes.Load().(bool) { if UseAggSig() && gossipVotes.Load().(bool) {
gossipVotes.Store(false) gossipVotes.Store(false)
} }
...@@ -582,7 +582,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { ...@@ -582,7 +582,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
qbftlog.Info(fmt.Sprintf("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) qbftlog.Info(fmt.Sprintf("enterNewRound(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
// disable gossip votes // disable gossip votes
if useAggSig && gossipVotes.Load().(bool) { if UseAggSig() && gossipVotes.Load().(bool) {
gossipVotes.Store(false) gossipVotes.Store(false)
} }
// Increment validators if necessary // Increment validators if necessary
...@@ -614,10 +614,10 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { ...@@ -614,10 +614,10 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
// Wait for txs to be available in the mempool // Wait for txs to be available in the mempool
// before we enterPropose in round 0. // before we enterPropose in round 0.
waitForTxs := cs.WaitForTxs() && round == getStartRound(cs.state) waitForTxs := round == getStartRound(cs.state)
if waitForTxs { if waitForTxs {
if createEmptyBlocksInterval > 0 { if emptyBlockInterval.Load().(int32) > 0 {
cs.scheduleTimeout(cs.EmptyBlocksInterval(), height, round, ttypes.RoundStepNewRound) cs.scheduleTimeout(cs.EmptyBlockInterval(), height, round, ttypes.RoundStepNewRound)
} }
go cs.proposalHeartbeat(height, round) go cs.proposalHeartbeat(height, round)
} else { } else {
...@@ -731,7 +731,7 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) { ...@@ -731,7 +731,7 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) {
// Decide on block // Decide on block
if cs.ValidBlock != nil { if cs.ValidBlock != nil {
if preExec { if preExec.Load().(bool) {
// If there is valid block, PreExec that. // If there is valid block, PreExec that.
pblockNew := cs.client.PreExecBlock(cs.ValidBlock.Data, false) pblockNew := cs.client.PreExecBlock(cs.ValidBlock.Data, false)
if pblockNew == nil { if pblockNew == nil {
...@@ -826,7 +826,7 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.QbftBlock) { ...@@ -826,7 +826,7 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.QbftBlock) {
} }
block.Data.TxHash = merkle.CalcMerkleRoot(cfg, block.Data.Height, block.Data.Txs) block.Data.TxHash = merkle.CalcMerkleRoot(cfg, block.Data.Height, block.Data.Txs)
if preExec { if preExec.Load().(bool) {
pblockNew := cs.client.PreExecBlock(block.Data, false) pblockNew := cs.client.PreExecBlock(block.Data, false)
if pblockNew == nil { if pblockNew == nil {
qbftlog.Error("createProposalBlock PreExecBlock fail") qbftlog.Error("createProposalBlock PreExecBlock fail")
...@@ -905,7 +905,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) { ...@@ -905,7 +905,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
return return
} }
if preExec && !cs.isProposer() { if preExec.Load().(bool) && !cs.isProposer() {
// PreExec proposal block // PreExec proposal block
blockCopy := *cs.ProposalBlock.Data blockCopy := *cs.ProposalBlock.Data
blockNew := cs.client.PreExecBlock(&blockCopy, true) blockNew := cs.client.PreExecBlock(&blockCopy, true)
...@@ -959,7 +959,7 @@ func (cs *ConsensusState) enterPrevoteWait(height int64, round int) { ...@@ -959,7 +959,7 @@ func (cs *ConsensusState) enterPrevoteWait(height int64, round int) {
}() }()
// enable gossip votes in case other validators enterAggPrevoteWait // enable gossip votes in case other validators enterAggPrevoteWait
if useAggSig && !cs.isProposer() && cs.Votes.Prevotes(round).GetAggVote() == nil { if UseAggSig() && !cs.isProposer() && cs.Votes.Prevotes(round).GetAggVote() == nil {
gossipVotes.Store(true) gossipVotes.Store(true)
} }
...@@ -1090,7 +1090,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) { ...@@ -1090,7 +1090,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
}() }()
// enable gossip votes in case other validators enterAggPrecommitWait // enable gossip votes in case other validators enterAggPrecommitWait
if useAggSig && !cs.isProposer() && cs.Votes.Precommits(round).GetAggVote() == nil { if UseAggSig() && !cs.isProposer() && cs.Votes.Precommits(round).GetAggVote() == nil {
gossipVotes.Store(true) gossipVotes.Store(true)
} }
...@@ -1528,7 +1528,7 @@ func (cs *ConsensusState) tryAddAggVote(aggVoteRaw *tmtypes.QbftAggVote, peerID ...@@ -1528,7 +1528,7 @@ func (cs *ConsensusState) tryAddAggVote(aggVoteRaw *tmtypes.QbftAggVote, peerID
cs.enterPrecommit(height, int(aggVote.Round)) cs.enterPrecommit(height, int(aggVote.Round))
if len(blockID.Hash) != 0 { if len(blockID.Hash) != 0 {
cs.enterCommit(height, int(aggVote.Round)) cs.enterCommit(height, int(aggVote.Round))
if skipTimeoutCommit && precommits.HasAll() { if skipTimeoutCommit.Load().(bool) && precommits.HasAll() {
cs.enterNewRound(cs.Height, 0) cs.enterNewRound(cs.Height, 0)
} }
} else { } else {
...@@ -1601,7 +1601,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin ...@@ -1601,7 +1601,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin
cs.broadcastChannel <- MsgInfo{TypeID: ttypes.HasVoteID, Msg: hasVoteMsg, PeerID: "", PeerIP: ""} cs.broadcastChannel <- MsgInfo{TypeID: ttypes.HasVoteID, Msg: hasVoteMsg, PeerID: "", PeerIP: ""}
// if we can skip timeoutCommit and have all the votes now, // if we can skip timeoutCommit and have all the votes now,
if skipTimeoutCommit && cs.LastCommit.HasAll() { if skipTimeoutCommit.Load().(bool) && cs.LastCommit.HasAll() {
// go straight to new round (skip timeout commit) // go straight to new round (skip timeout commit)
// cs.scheduleTimeout(time.Duration(0), cs.Height, 0, cstypes.RoundStepNewHeight) // cs.scheduleTimeout(time.Duration(0), cs.Height, 0, cstypes.RoundStepNewHeight)
cs.enterNewRound(cs.Height, 0) cs.enterNewRound(cs.Height, 0)
...@@ -1696,7 +1696,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin ...@@ -1696,7 +1696,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin
blockID, ok := prevotes.TwoThirdsMajority() blockID, ok := prevotes.TwoThirdsMajority()
if ok { if ok {
if ttypes.RoundStepPrevote <= cs.Step && (cs.isProposalComplete() || len(blockID.Hash) == 0) { if ttypes.RoundStepPrevote <= cs.Step && (cs.isProposalComplete() || len(blockID.Hash) == 0) {
if useAggSig && cs.isProposer() && prevotes.GetAggVote() == nil { if UseAggSig() && cs.isProposer() && prevotes.GetAggVote() == nil {
err := prevotes.SetAggVote() err := prevotes.SetAggVote()
if err != nil { if err != nil {
qbftlog.Error("prevotes SetAggVote fail", "err", err) qbftlog.Error("prevotes SetAggVote fail", "err", err)
...@@ -1734,7 +1734,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin ...@@ -1734,7 +1734,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin
// Executed as TwoThirdsMajority could be from a higher round // Executed as TwoThirdsMajority could be from a higher round
cs.enterNewRound(height, int(vote.Round)) cs.enterNewRound(height, int(vote.Round))
cs.enterPrecommit(height, int(vote.Round)) cs.enterPrecommit(height, int(vote.Round))
if useAggSig && cs.isProposer() && precommits.GetAggVote() == nil { if UseAggSig() && cs.isProposer() && precommits.GetAggVote() == nil {
err := precommits.SetAggVote() err := precommits.SetAggVote()
if err != nil { if err != nil {
qbftlog.Error("precommits SetAggVote fail", "err", err) qbftlog.Error("precommits SetAggVote fail", "err", err)
...@@ -1746,7 +1746,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin ...@@ -1746,7 +1746,7 @@ func (cs *ConsensusState) addVote(vote *ttypes.Vote, peerID string, peerIP strin
} }
if len(blockID.Hash) != 0 { if len(blockID.Hash) != 0 {
cs.enterCommit(height, int(vote.Round)) cs.enterCommit(height, int(vote.Round))
if skipTimeoutCommit && precommits.HasAll() { if skipTimeoutCommit.Load().(bool) && precommits.HasAll() {
cs.enterNewRound(cs.Height, 0) cs.enterNewRound(cs.Height, 0)
} }
} else { } else {
...@@ -1779,7 +1779,7 @@ func (cs *ConsensusState) signVote(voteType byte, hash []byte) (*ttypes.Vote, er ...@@ -1779,7 +1779,7 @@ func (cs *ConsensusState) signVote(voteType byte, hash []byte) (*ttypes.Vote, er
Type: uint32(voteType), Type: uint32(voteType),
BlockID: &tmtypes.QbftBlockID{Hash: hash}, BlockID: &tmtypes.QbftBlockID{Hash: hash},
Signature: nil, Signature: nil,
UseAggSig: useAggSig, UseAggSig: UseAggSig(),
}, },
} }
err := cs.privValidator.SignVote(cs.state.ChainID, vote) err := cs.privValidator.SignVote(cs.state.ChainID, vote)
...@@ -1797,7 +1797,7 @@ func (cs *ConsensusState) signAddVote(voteType byte, hash []byte) *ttypes.Vote { ...@@ -1797,7 +1797,7 @@ func (cs *ConsensusState) signAddVote(voteType byte, hash []byte) *ttypes.Vote {
if err == nil { if err == nil {
// send to self // send to self
cs.sendInternalMessage(MsgInfo{TypeID: ttypes.VoteID, Msg: vote.QbftVote, PeerID: cs.ourID, PeerIP: ""}) cs.sendInternalMessage(MsgInfo{TypeID: ttypes.VoteID, Msg: vote.QbftVote, PeerID: cs.ourID, PeerIP: ""})
if useAggSig { if UseAggSig() {
// send to proposer // send to proposer
cs.unicastChannel <- MsgInfo{TypeID: ttypes.VoteID, Msg: vote.QbftVote, PeerID: cs.getProposerID(), PeerIP: ""} cs.unicastChannel <- MsgInfo{TypeID: ttypes.VoteID, Msg: vote.QbftVote, PeerID: cs.getProposerID(), PeerIP: ""}
// wait for aggregate vote // wait for aggregate vote
...@@ -1837,32 +1837,27 @@ func CompareHRS(h1 int64, r1 int, s1 ttypes.RoundStepType, h2 int64, r2 int, s2 ...@@ -1837,32 +1837,27 @@ func CompareHRS(h1 int64, r1 int, s1 ttypes.RoundStepType, h2 int64, r2 int, s2
// Commit returns the amount of time to wait for straggler votes after receiving +2/3 precommits for a single block (ie. a commit). // Commit returns the amount of time to wait for straggler votes after receiving +2/3 precommits for a single block (ie. a commit).
func (cs *ConsensusState) Commit(t time.Time) time.Time { func (cs *ConsensusState) Commit(t time.Time) time.Time {
return t.Add(time.Duration(timeoutCommit) * time.Millisecond) return t.Add(time.Duration(timeoutCommit.Load().(int32)) * time.Millisecond)
} }
// Propose returns the amount of time to wait for a proposal // Propose returns the amount of time to wait for a proposal
func (cs *ConsensusState) Propose(round int) time.Duration { func (cs *ConsensusState) Propose(round int) time.Duration {
return time.Duration(timeoutPropose+timeoutProposeDelta*int32(round)) * time.Millisecond return time.Duration(timeoutPropose.Load().(int32)+timeoutProposeDelta.Load().(int32)*int32(round)) * time.Millisecond
} }
// Prevote returns the amount of time to wait for straggler votes after receiving any +2/3 prevotes // Prevote returns the amount of time to wait for straggler votes after receiving any +2/3 prevotes
func (cs *ConsensusState) Prevote(round int) time.Duration { func (cs *ConsensusState) Prevote(round int) time.Duration {
return time.Duration(timeoutPrevote+timeoutPrevoteDelta*int32(round)) * time.Millisecond return time.Duration(timeoutPrevote.Load().(int32)+timeoutPrevoteDelta.Load().(int32)*int32(round)) * time.Millisecond
} }
// Precommit returns the amount of time to wait for straggler votes after receiving any +2/3 precommits // Precommit returns the amount of time to wait for straggler votes after receiving any +2/3 precommits
func (cs *ConsensusState) Precommit(round int) time.Duration { func (cs *ConsensusState) Precommit(round int) time.Duration {
return time.Duration(timeoutPrecommit+timeoutPrecommitDelta*int32(round)) * time.Millisecond return time.Duration(timeoutPrecommit.Load().(int32)+timeoutPrecommitDelta.Load().(int32)*int32(round)) * time.Millisecond
} }
// WaitForTxs returns true if the consensus should wait for transactions before entering the propose step // EmptyBlockInterval returns the amount of time to wait before proposing an empty block or starting the propose timer if there are no txs available
func (cs *ConsensusState) WaitForTxs() bool { func (cs *ConsensusState) EmptyBlockInterval() time.Duration {
return !createEmptyBlocks || createEmptyBlocksInterval > 0 return time.Duration(emptyBlockInterval.Load().(int32)) * time.Second
}
// EmptyBlocksInterval returns the amount of time to wait before proposing an empty block or starting the propose timer if there are no txs available
func (cs *ConsensusState) EmptyBlocksInterval() time.Duration {
return time.Duration(createEmptyBlocksInterval) * time.Second
} }
// PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor // PeerGossipSleep returns the amount of time to sleep if there is nothing to send from the ConsensusReactor
......
...@@ -74,7 +74,7 @@ func updateState(s State, blockID ttypes.BlockID, block *ttypes.QbftBlock) (Stat ...@@ -74,7 +74,7 @@ func updateState(s State, blockID ttypes.BlockID, block *ttypes.QbftBlock) (Stat
seq := s.Sequence + 1 seq := s.Sequence + 1
// include situation multiBlock=1 // include situation multiBlock=1
if seq == multiBlocks { if seq == multiBlocks.Load().(int64) {
// Update validator accums and set state variables // Update validator accums and set state variables
nextValSet.IncrementAccum(1) nextValSet.IncrementAccum(1)
seq = 0 seq = 0
......
...@@ -398,26 +398,16 @@ func (pc *peerConn) TrySend(msg MsgInfo) bool { ...@@ -398,26 +398,16 @@ func (pc *peerConn) TrySend(msg MsgInfo) bool {
// PickSendVote picks a vote and sends it to the peer. // PickSendVote picks a vote and sends it to the peer.
// Returns true if vote was sent. // Returns true if vote was sent.
func (pc *peerConn) PickSendVote(votes ttypes.VoteSetReader) bool { func (pc *peerConn) PickSendVote(votes ttypes.VoteSetReader) bool {
if useAggSig { if UseAggSig() {
aggVote := votes.GetAggVote() if votes.GetAggVote() != nil {
if aggVote != nil { if aggVote, ok := pc.state.PickAggVoteToSend(votes); ok {
if votes.IsCommit() {
pc.state.ensureCatchupCommitRound(votes.Height(), votes.Round(), votes.Size())
}
if pc.state.Height != aggVote.Height ||
(pc.state.Round != int(aggVote.Round) && pc.state.CatchupCommitRound != int(aggVote.Round)) {
return false
}
if (aggVote.Type == uint32(ttypes.VoteTypePrevote) && pc.state.AggPrevote) ||
(aggVote.Type == uint32(ttypes.VoteTypePrecommit) && pc.state.AggPrecommit) {
return false
}
msg := MsgInfo{TypeID: ttypes.AggVoteID, Msg: aggVote.QbftAggVote, PeerID: pc.id, PeerIP: pc.ip.String()} msg := MsgInfo{TypeID: ttypes.AggVoteID, Msg: aggVote.QbftAggVote, PeerID: pc.id, PeerIP: pc.ip.String()}
qbftlog.Debug("Sending aggregate vote message", "msg", msg) qbftlog.Debug("Sending aggregate vote message", "msg", msg)
if pc.Send(msg) { if pc.Send(msg) {
pc.state.SetHasAggVote(aggVote) pc.state.SetHasAggVote(aggVote)
return true return true
} }
}
return false return false
} }
} }
...@@ -575,6 +565,7 @@ FOR_LOOP: ...@@ -575,6 +565,7 @@ FOR_LOOP:
} else if pkt.TypeID == ttypes.AggVoteID { } else if pkt.TypeID == ttypes.AggVoteID {
aggVote := &ttypes.AggVote{QbftAggVote: realMsg.(*tmtypes.QbftAggVote)} aggVote := &ttypes.AggVote{QbftAggVote: realMsg.(*tmtypes.QbftAggVote)}
qbftlog.Debug("Receiving aggregate vote", "aggVote-height", aggVote.Height, "peerip", pc.ip.String()) qbftlog.Debug("Receiving aggregate vote", "aggVote-height", aggVote.Height, "peerip", pc.ip.String())
pc.state.SetHasAggVote(aggVote)
} }
} else if pkt.TypeID == ttypes.ProposalHeartbeatID { } else if pkt.TypeID == ttypes.ProposalHeartbeatID {
pc.heartbeatQueue <- realMsg.(*tmtypes.QbftHeartbeat) pc.heartbeatQueue <- realMsg.(*tmtypes.QbftHeartbeat)
...@@ -689,7 +680,7 @@ OUTER_LOOP: ...@@ -689,7 +680,7 @@ OUTER_LOOP:
} }
rs := pc.myState.GetRoundState() rs := pc.myState.GetRoundState()
prs := pc.state prs := pc.state.GetRoundState()
// If the peer is on a previous height, help catch up. // If the peer is on a previous height, help catch up.
if (0 < prs.Height) && (prs.Height < rs.Height) { if (0 < prs.Height) && (prs.Height < rs.Height) {
...@@ -713,7 +704,7 @@ OUTER_LOOP: ...@@ -713,7 +704,7 @@ OUTER_LOOP:
"selfHeight", rs.Height, "peer(H/R/S)", fmt.Sprintf("%v/%v/%v", prs.Height, prs.Round, prs.Step), "selfHeight", rs.Height, "peer(H/R/S)", fmt.Sprintf("%v/%v/%v", prs.Height, prs.Round, prs.Step),
"block(H/R/hash)", fmt.Sprintf("%v/%v/%X", proposalBlock.Header.Height, proposalBlock.Header.Round, newBlock.Hash())) "block(H/R/hash)", fmt.Sprintf("%v/%v/%X", proposalBlock.Header.Height, proposalBlock.Header.Round, newBlock.Hash()))
if pc.Send(msg) { if pc.Send(msg) {
prs.SetHasProposalBlock(newBlock) pc.state.SetHasProposalBlock(newBlock)
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
...@@ -738,7 +729,7 @@ OUTER_LOOP: ...@@ -738,7 +729,7 @@ OUTER_LOOP:
qbftlog.Debug(fmt.Sprintf("Sending proposal. Self state: %v/%v/%v", rs.Height, rs.Round, rs.Step), qbftlog.Debug(fmt.Sprintf("Sending proposal. Self state: %v/%v/%v", rs.Height, rs.Round, rs.Step),
"peerip", pc.ip.String(), "proposal-height", rs.Proposal.Height, "proposal-round", rs.Proposal.Round) "peerip", pc.ip.String(), "proposal-height", rs.Proposal.Height, "proposal-round", rs.Proposal.Round)
if pc.Send(msg) { if pc.Send(msg) {
prs.SetHasProposal(rs.Proposal) pc.state.SetHasProposal(rs.Proposal)
} }
} }
// ProposalPOL: lets peer know which POL votes we have so far. // ProposalPOL: lets peer know which POL votes we have so far.
...@@ -764,7 +755,7 @@ OUTER_LOOP: ...@@ -764,7 +755,7 @@ OUTER_LOOP:
qbftlog.Debug(fmt.Sprintf("Sending proposal block. Self state: %v/%v/%v", rs.Height, rs.Round, rs.Step), qbftlog.Debug(fmt.Sprintf("Sending proposal block. Self state: %v/%v/%v", rs.Height, rs.Round, rs.Step),
"peerip", pc.ip.String(), "block-height", rs.ProposalBlock.Header.Height, "block-round", rs.ProposalBlock.Header.Round) "peerip", pc.ip.String(), "block-height", rs.ProposalBlock.Header.Height, "block-round", rs.ProposalBlock.Header.Round)
if pc.Send(msg) { if pc.Send(msg) {
prs.SetHasProposalBlock(rs.ProposalBlock) pc.state.SetHasProposalBlock(rs.ProposalBlock)
} }
continue OUTER_LOOP continue OUTER_LOOP
} }
...@@ -789,7 +780,7 @@ OUTER_LOOP: ...@@ -789,7 +780,7 @@ OUTER_LOOP:
} }
rs := pc.myState.GetRoundState() rs := pc.myState.GetRoundState()
prs := pc.state prs := pc.state.GetRoundState()
switch sleeping { switch sleeping {
case 1: // First sleep case 1: // First sleep
...@@ -800,8 +791,8 @@ OUTER_LOOP: ...@@ -800,8 +791,8 @@ OUTER_LOOP:
// If height matches, then send LastCommit, Prevotes, Precommits. // If height matches, then send LastCommit, Prevotes, Precommits.
if rs.Height == prs.Height { if rs.Height == prs.Height {
if !useAggSig || gossipVotes.Load().(bool) { if !UseAggSig() || gossipVotes.Load().(bool) {
if pc.gossipVotesForHeight(rs, prs.GetRoundState()) { if pc.gossipVotesForHeight(rs, prs) {
continue OUTER_LOOP continue OUTER_LOOP
} }
} }
...@@ -917,7 +908,7 @@ OUTER_LOOP: ...@@ -917,7 +908,7 @@ OUTER_LOOP:
// Maybe send Height/Round/Prevotes // Maybe send Height/Round/Prevotes
{ {
rs := pc.myState.GetRoundState() rs := pc.myState.GetRoundState()
prs := pc.state prs := pc.state.GetRoundState()
if rs.Height == prs.Height { if rs.Height == prs.Height {
if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok { if maj23, ok := rs.Votes.Prevotes(prs.Round).TwoThirdsMajority(); ok {
msg := MsgInfo{TypeID: ttypes.VoteSetMaj23ID, Msg: &tmtypes.QbftVoteSetMaj23Msg{ msg := MsgInfo{TypeID: ttypes.VoteSetMaj23ID, Msg: &tmtypes.QbftVoteSetMaj23Msg{
...@@ -1029,7 +1020,7 @@ func (ps *PeerConnState) SetHasProposal(proposal *tmtypes.QbftProposal) { ...@@ -1029,7 +1020,7 @@ func (ps *PeerConnState) SetHasProposal(proposal *tmtypes.QbftProposal) {
if ps.Proposal { if ps.Proposal {
return return
} }
qbftlog.Debug("Peer set proposal", "peerip", ps.ip.String(), qbftlog.Debug("Peer set proposal", "peerIP", ps.ip.String(),
"peer-state", fmt.Sprintf("%v/%v/%v", ps.Height, ps.Round, ps.Step), "peer-state", fmt.Sprintf("%v/%v/%v", ps.Height, ps.Round, ps.Step),
"proposal(H/R/Hash)", fmt.Sprintf("%v/%v/%X", proposal.Height, proposal.Round, proposal.Blockhash)) "proposal(H/R/Hash)", fmt.Sprintf("%v/%v/%X", proposal.Height, proposal.Round, proposal.Blockhash))
ps.Proposal = true ps.Proposal = true
...@@ -1080,6 +1071,30 @@ func (ps *PeerConnState) SetHasAggVote(aggVote *ttypes.AggVote) { ...@@ -1080,6 +1071,30 @@ func (ps *PeerConnState) SetHasAggVote(aggVote *ttypes.AggVote) {
} }
} }
// PickAggVoteToSend picks aggregate vote to send to the peer.
// Returns true if a vote was picked.
func (ps *PeerConnState) PickAggVoteToSend(votes ttypes.VoteSetReader) (vote *ttypes.AggVote, ok bool) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
aggVote := votes.GetAggVote()
if aggVote == nil {
return nil, false
}
if votes.IsCommit() {
ps.ensureCatchupCommitRound(votes.Height(), votes.Round(), votes.Size())
}
if ps.Height != aggVote.Height ||
(ps.Round != int(aggVote.Round) && ps.CatchupCommitRound != int(aggVote.Round)) {
return nil, false
}
if (aggVote.Type == uint32(ttypes.VoteTypePrevote) && ps.AggPrevote) ||
(aggVote.Type == uint32(ttypes.VoteTypePrecommit) && ps.AggPrecommit) {
return nil, false
}
return aggVote, true
}
// PickVoteToSend picks a vote to send to the peer. // PickVoteToSend picks a vote to send to the peer.
// Returns true if a vote was picked. // Returns true if a vote was picked.
// NOTE: `votes` must be the correct Size() for the Height(). // NOTE: `votes` must be the correct Size() for the Height().
......
...@@ -38,31 +38,30 @@ var ( ...@@ -38,31 +38,30 @@ var (
genesis string genesis string
genesisAmount int64 = 1e8 genesisAmount int64 = 1e8
genesisBlockTime int64 genesisBlockTime int64
timeoutTxAvail int32 = 1000 timeoutTxAvail atomic.Value // 1000 millisecond
timeoutPropose int32 = 3000 // millisecond timeoutPropose atomic.Value // 3000
timeoutProposeDelta int32 = 500 timeoutProposeDelta atomic.Value // 500
timeoutPrevote int32 = 1000 timeoutPrevote atomic.Value // 1000
timeoutPrevoteDelta int32 = 500 timeoutPrevoteDelta atomic.Value // 500
timeoutPrecommit int32 = 1000 timeoutPrecommit atomic.Value // 1000
timeoutPrecommitDelta int32 = 500 timeoutPrecommitDelta atomic.Value // 500
timeoutCommit int32 = 1000 timeoutCommit atomic.Value // 1000
skipTimeoutCommit = false skipTimeoutCommit atomic.Value // false
createEmptyBlocks = false emptyBlockInterval atomic.Value // 0 second
createEmptyBlocksInterval int32 // second
genesisFile = "genesis.json" genesisFile = "genesis.json"
privFile = "priv_validator.json" privFile = "priv_validator.json"
dbPath = fmt.Sprintf("datadir%sqbft", string(os.PathSeparator)) dbPath = fmt.Sprintf("datadir%sqbft", string(os.PathSeparator))
port int32 = DefaultQbftPort port int32 = DefaultQbftPort
validatorNodes = []string{"127.0.0.1:33001"} validatorNodes = []string{"127.0.0.1:33001"}
fastSync = false fastSync = false
preExec = false preExec atomic.Value // false
signName = "ed25519" signName atomic.Value // "ed25519"
useAggSig = false useAggSig atomic.Value // false
multiBlocks int64 = 1 multiBlocks atomic.Value // 1
gossipVotes atomic.Value
zeroHash [32]byte zeroHash [32]byte
random *rand.Rand random *rand.Rand
gossipVotes atomic.Value
peerGossipSleepDuration int32 = 100 peerGossipSleepDuration int32 = 100
peerQueryMaj23SleepDuration int32 = 2000 peerQueryMaj23SleepDuration int32 = 2000
) )
...@@ -101,8 +100,7 @@ type subConfig struct { ...@@ -101,8 +100,7 @@ type subConfig struct {
TimeoutPrecommitDelta int32 `json:"timeoutPrecommitDelta"` TimeoutPrecommitDelta int32 `json:"timeoutPrecommitDelta"`
TimeoutCommit int32 `json:"timeoutCommit"` TimeoutCommit int32 `json:"timeoutCommit"`
SkipTimeoutCommit bool `json:"skipTimeoutCommit"` SkipTimeoutCommit bool `json:"skipTimeoutCommit"`
CreateEmptyBlocks bool `json:"createEmptyBlocks"` EmptyBlockInterval int32 `json:"emptyBlockInterval"`
CreateEmptyBlocksInterval int32 `json:"createEmptyBlocksInterval"`
GenesisFile string `json:"genesisFile"` GenesisFile string `json:"genesisFile"`
PrivFile string `json:"privFile"` PrivFile string `json:"privFile"`
DbPath string `json:"dbPath"` DbPath string `json:"dbPath"`
...@@ -120,44 +118,20 @@ func applyConfig(sub []byte) { ...@@ -120,44 +118,20 @@ func applyConfig(sub []byte) {
if sub != nil { if sub != nil {
types.MustDecode(sub, &subcfg) types.MustDecode(sub, &subcfg)
} }
if subcfg.Genesis != "" {
genesis = subcfg.Genesis genesis = subcfg.Genesis
}
if subcfg.GenesisAmount > 0 {
genesisAmount = subcfg.GenesisAmount genesisAmount = subcfg.GenesisAmount
}
if subcfg.GenesisBlockTime > 0 {
genesisBlockTime = subcfg.GenesisBlockTime genesisBlockTime = subcfg.GenesisBlockTime
}
if subcfg.TimeoutTxAvail > 0 { timeoutTxAvail.Store(subcfg.TimeoutTxAvail)
timeoutTxAvail = subcfg.TimeoutTxAvail timeoutPropose.Store(subcfg.TimeoutPropose)
} timeoutProposeDelta.Store(subcfg.TimeoutProposeDelta)
if subcfg.TimeoutPropose > 0 { timeoutPrevote.Store(subcfg.TimeoutPrevote)
timeoutPropose = subcfg.TimeoutPropose timeoutPrevoteDelta.Store(subcfg.TimeoutPrevoteDelta)
} timeoutPrecommit.Store(subcfg.TimeoutPrecommit)
if subcfg.TimeoutProposeDelta > 0 { timeoutPrecommitDelta.Store(subcfg.TimeoutPrecommitDelta)
timeoutProposeDelta = subcfg.TimeoutProposeDelta timeoutCommit.Store(subcfg.TimeoutCommit)
} skipTimeoutCommit.Store(subcfg.SkipTimeoutCommit)
if subcfg.TimeoutPrevote > 0 { emptyBlockInterval.Store(subcfg.EmptyBlockInterval)
timeoutPrevote = subcfg.TimeoutPrevote
}
if subcfg.TimeoutPrevoteDelta > 0 {
timeoutPrevoteDelta = subcfg.TimeoutPrevoteDelta
}
if subcfg.TimeoutPrecommit > 0 {
timeoutPrecommit = subcfg.TimeoutPrecommit
}
if subcfg.TimeoutPrecommitDelta > 0 {
timeoutPrecommitDelta = subcfg.TimeoutPrecommitDelta
}
if subcfg.TimeoutCommit > 0 {
timeoutCommit = subcfg.TimeoutCommit
}
skipTimeoutCommit = subcfg.SkipTimeoutCommit
createEmptyBlocks = subcfg.CreateEmptyBlocks
if subcfg.CreateEmptyBlocksInterval > 0 {
createEmptyBlocksInterval = subcfg.CreateEmptyBlocksInterval
}
if subcfg.GenesisFile != "" { if subcfg.GenesisFile != "" {
genesisFile = subcfg.GenesisFile genesisFile = subcfg.GenesisFile
} }
...@@ -174,19 +148,26 @@ func applyConfig(sub []byte) { ...@@ -174,19 +148,26 @@ func applyConfig(sub []byte) {
validatorNodes = subcfg.ValidatorNodes validatorNodes = subcfg.ValidatorNodes
} }
fastSync = subcfg.FastSync fastSync = subcfg.FastSync
preExec = subcfg.PreExec preExec.Store(subcfg.PreExec)
signName.Store("ed25519")
if subcfg.SignName != "" { if subcfg.SignName != "" {
signName = subcfg.SignName signName.Store(subcfg.SignName)
} }
useAggSig = subcfg.UseAggregateSignature useAggSig.Store(subcfg.UseAggregateSignature)
gossipVotes.Store(true) multiBlocks.Store(int64(1))
if subcfg.MultiBlocks > 0 { if subcfg.MultiBlocks > 0 {
multiBlocks = subcfg.MultiBlocks multiBlocks.Store(subcfg.MultiBlocks)
} }
gossipVotes.Store(true)
}
// UseAggSig returns whether use aggregate signature
func UseAggSig() bool {
return useAggSig.Load().(bool)
} }
// DefaultDBProvider returns a database using the DBBackend and DBDir // DefaultDBProvider returns a database
// specified in the ctx.Config.
func DefaultDBProvider(name string) dbm.DB { func DefaultDBProvider(name string) dbm.DB {
return dbm.NewDB(name, "leveldb", dbPath, 0) return dbm.NewDB(name, "leveldb", dbPath, 0)
} }
...@@ -207,7 +188,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -207,7 +188,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
//init rand //init rand
ttypes.Init() ttypes.Init()
signType, ok := ttypes.SignMap[signName] signType, ok := ttypes.SignMap[signName.Load().(string)]
if !ok { if !ok {
qbftlog.Error("invalid sign name") qbftlog.Error("invalid sign name")
return nil return nil
...@@ -221,7 +202,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -221,7 +202,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
} }
ttypes.ConsensusCrypto = cr ttypes.ConsensusCrypto = cr
if useAggSig { if UseAggSig() {
_, err = crypto.ToAggregate(ttypes.ConsensusCrypto) _, err = crypto.ToAggregate(ttypes.ConsensusCrypto)
if err != nil { if err != nil {
qbftlog.Error("qbft crypto not support aggregate signature", "name", ttypes.CryptoName) qbftlog.Error("qbft crypto not support aggregate signature", "name", ttypes.CryptoName)
...@@ -475,7 +456,7 @@ func (client *Client) ProcEvent(msg *queue.Message) bool { ...@@ -475,7 +456,7 @@ func (client *Client) ProcEvent(msg *queue.Message) bool {
// CreateBlock trigger consensus forward when tx available // CreateBlock trigger consensus forward when tx available
func (client *Client) CreateBlock() { func (client *Client) CreateBlock() {
ticker := time.NewTicker(time.Duration(timeoutTxAvail) * time.Millisecond) ticker := time.NewTicker(time.Duration(timeoutTxAvail.Load().(int32)) * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for { for {
......
...@@ -464,7 +464,7 @@ func CreateBlockInfoTx(priv crypto.PrivKey, state *tmtypes.QbftState, block *tmt ...@@ -464,7 +464,7 @@ func CreateBlockInfoTx(priv crypto.PrivKey, state *tmtypes.QbftState, block *tmt
tx := &types.Transaction{Execer: []byte("qbftNode"), Payload: types.Encode(action), Fee: fee} tx := &types.Transaction{Execer: []byte("qbftNode"), Payload: types.Encode(action), Fee: fee}
tx.To = address.ExecAddress("qbftNode") tx.To = address.ExecAddress("qbftNode")
tx.Nonce = random.Int63() tx.Nonce = random.Int63()
tx.Sign(int32(ttypes.SignMap[signName]), priv) tx.Sign(int32(ttypes.SignMap[signName.Load().(string)]), priv)
return tx return tx
} }
...@@ -281,8 +281,8 @@ func (voteSet *VoteSet) addVerifiedVote(vote *Vote, blockKey string, votingPower ...@@ -281,8 +281,8 @@ func (voteSet *VoteSet) addVerifiedVote(vote *Vote, blockKey string, votingPower
if origSum < quorum && quorum <= votesByBlock.sum { if origSum < quorum && quorum <= votesByBlock.sum {
// Only consider the first quorum reached // Only consider the first quorum reached
if voteSet.maj23 == nil { if voteSet.maj23 == nil {
maj23BlockID := *vote.BlockID voteSet.maj23 = &tmtypes.QbftBlockID{Hash: make([]byte, len(vote.BlockID.Hash))}
voteSet.maj23 = &maj23BlockID copy(voteSet.maj23.Hash, vote.BlockID.Hash)
// And also copy votes over to voteSet.votes // And also copy votes over to voteSet.votes
for i, vote := range votesByBlock.votes { for i, vote := range votesByBlock.votes {
if vote != nil { if vote != nil {
...@@ -356,7 +356,8 @@ func (voteSet *VoteSet) AddAggVote(vote *AggVote) (bool, error) { ...@@ -356,7 +356,8 @@ func (voteSet *VoteSet) AddAggVote(vote *AggVote) (bool, error) {
voteSet.votesBitArray = arr.copy() voteSet.votesBitArray = arr.copy()
voteSet.aggVote = vote voteSet.aggVote = vote
voteSet.maj23 = vote.BlockID voteSet.maj23 = &tmtypes.QbftBlockID{Hash: make([]byte, len(vote.BlockID.Hash))}
copy(voteSet.maj23.Hash, vote.BlockID.Hash)
voteSet.sum = sum voteSet.sum = sum
votesByBlock := newBlockVotes(false, voteSet.valSet.Size()) votesByBlock := newBlockVotes(false, voteSet.valSet.Size())
votesByBlock.bitArray = arr.copy() votesByBlock.bitArray = arr.copy()
...@@ -560,14 +561,16 @@ func (voteSet *VoteSet) HasAll() bool { ...@@ -560,14 +561,16 @@ func (voteSet *VoteSet) HasAll() bool {
// TwoThirdsMajority Returns either a blockhash (or nil) that received +2/3 majority. // TwoThirdsMajority Returns either a blockhash (or nil) that received +2/3 majority.
// If there exists no such majority, returns (nil, PartSetHeader{}, false). // If there exists no such majority, returns (nil, PartSetHeader{}, false).
func (voteSet *VoteSet) TwoThirdsMajority() (blockID tmtypes.QbftBlockID, ok bool) { func (voteSet *VoteSet) TwoThirdsMajority() (tmtypes.QbftBlockID, bool) {
if voteSet == nil { if voteSet == nil {
return tmtypes.QbftBlockID{}, false return tmtypes.QbftBlockID{}, false
} }
voteSet.mtx.Lock() voteSet.mtx.Lock()
defer voteSet.mtx.Unlock() defer voteSet.mtx.Unlock()
if voteSet.maj23 != nil { if voteSet.maj23 != nil {
return *voteSet.maj23, true blockID := tmtypes.QbftBlockID{Hash: make([]byte, len(voteSet.maj23.Hash))}
copy(blockID.Hash, voteSet.maj23.Hash)
return blockID, true
} }
return tmtypes.QbftBlockID{}, false return tmtypes.QbftBlockID{}, false
} }
......
...@@ -316,7 +316,7 @@ func createFiles(cmd *cobra.Command, args []string) { ...@@ -316,7 +316,7 @@ func createFiles(cmd *cobra.Command, args []string) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
// create private validator file // create private validator file
pvFileName := pvFile + strconv.Itoa(i) + ".json" pvFileName := pvFile + strconv.Itoa(i) + ".json"
privValidator := ttypes.GenPrivValidatorImp(pvFileName) privValidator := ttypes.LoadOrGenPrivValidatorFS(pvFileName)
if privValidator == nil { if privValidator == nil {
fmt.Println("create priv_validator file fail") fmt.Println("create priv_validator file fail")
break break
......
...@@ -253,3 +253,5 @@ ForkParaAssetTransferRbk=0 ...@@ -253,3 +253,5 @@ ForkParaAssetTransferRbk=0
#仅平行链适用,开启挖矿交易的高度,已有代码版本可能未在0高度开启挖矿,需要设置这个高度,新版本默认从0开启挖矿,通过交易配置分阶段奖励 #仅平行链适用,开启挖矿交易的高度,已有代码版本可能未在0高度开启挖矿,需要设置这个高度,新版本默认从0开启挖矿,通过交易配置分阶段奖励
ForkParaFullMinerHeight=0 ForkParaFullMinerHeight=0
[fork.sub.qbftNode]
Enable=0
...@@ -410,6 +410,9 @@ ForkIssuanceTableUpdate=0 ...@@ -410,6 +410,9 @@ ForkIssuanceTableUpdate=0
Enable=0 Enable=0
ForkCollateralizeTableUpdate=0 ForkCollateralizeTableUpdate=0
[fork.sub.qbftNode]
Enable=0
#对已有的平行链如果不是从0开始同步数据,需要设置这个kvmvccmavl的对应平行链高度的fork,如果从0开始同步,statehash会跟以前mavl的不同 #对已有的平行链如果不是从0开始同步数据,需要设置这个kvmvccmavl的对应平行链高度的fork,如果从0开始同步,statehash会跟以前mavl的不同
[fork.sub.store-kvmvccmavl] [fork.sub.store-kvmvccmavl]
ForkKvmvccmavl=1 ForkKvmvccmavl=1
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment