Commit 29677ba5 authored by 张振华's avatar 张振华

Merge branch 'dpos-v2' of https://github.com/zzh33cn/plugin into dpos-v2

parents 4b05a8c7 996fc131
...@@ -68,7 +68,7 @@ type ConsensusState struct { ...@@ -68,7 +68,7 @@ type ConsensusState struct {
// msgs from ourself, or by timeouts // msgs from ourself, or by timeouts
peerMsgQueue chan MsgInfo peerMsgQueue chan MsgInfo
internalMsgQueue chan MsgInfo internalMsgQueue chan MsgInfo
timer *time.Timer timer *time.Timer
broadcastChannel chan<- MsgInfo broadcastChannel chan<- MsgInfo
ourID ID ourID ID
...@@ -192,13 +192,13 @@ func (cs *ConsensusState) Stop() { ...@@ -192,13 +192,13 @@ func (cs *ConsensusState) Stop() {
// Attempt to reset the timer // Attempt to reset the timer
func (cs *ConsensusState) resetTimer(duration time.Duration, stateType int) { func (cs *ConsensusState) resetTimer(duration time.Duration, stateType int) {
dposlog.Info("set timer","duration", duration, "state", StateTypeMapping[stateType]) dposlog.Info("set timer", "duration", duration, "state", StateTypeMapping[stateType])
cs.timer.Reset(duration) cs.timer.Reset(duration)
} }
// Attempt to reset the timer // Attempt to reset the timer
func (cs *ConsensusState) stopAndResetTimer(duration time.Duration, stateType int) { func (cs *ConsensusState) stopAndResetTimer(duration time.Duration, stateType int) {
dposlog.Info("set timer","duration", duration, "state", StateTypeMapping[stateType]) dposlog.Info("set timer", "duration", duration, "state", StateTypeMapping[stateType])
if !cs.timer.Stop() { if !cs.timer.Stop() {
<-cs.timer.C <-cs.timer.C
} }
...@@ -239,7 +239,7 @@ func (cs *ConsensusState) receiveRoutine() { ...@@ -239,7 +239,7 @@ func (cs *ConsensusState) receiveRoutine() {
case mi = <-cs.internalMsgQueue: case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes // handles proposals, block parts, votes
cs.handleMsg(mi) cs.handleMsg(mi)
case <- cs.timer.C: case <-cs.timer.C:
cs.handleTimeout() cs.handleTimeout()
case <-cs.Quit: case <-cs.Quit:
dposlog.Info("ConsensusState recv quit signal.") dposlog.Info("ConsensusState recv quit signal.")
......
...@@ -388,17 +388,17 @@ func (init *InitState) timeOut(cs *ConsensusState) { ...@@ -388,17 +388,17 @@ func (init *InitState) timeOut(cs *ConsensusState) {
cs.ClearVotes() cs.ClearVotes()
//设定超时时间,超时后再检查链接数量 //设定超时时间,超时后再检查链接数量
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} else { } else {
vote := generateVote(cs) vote := generateVote(cs)
if nil == vote { if nil == vote {
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil { if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil {
dposlog.Error("SignVote failed", "vote", vote.String()) dposlog.Error("SignVote failed", "vote", vote.String())
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
...@@ -413,7 +413,7 @@ func (init *InitState) timeOut(cs *ConsensusState) { ...@@ -413,7 +413,7 @@ func (init *InitState) timeOut(cs *ConsensusState) {
dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix()) dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix())
cs.dposState.sendVote(cs, vote.DPosVote) cs.dposState.sendVote(cs, vote.DPosVote)
cs.resetTimer(time.Duration(timeoutVoting) * time.Millisecond, VotingStateType) cs.resetTimer(time.Duration(timeoutVoting)*time.Millisecond, VotingStateType)
//处理之前缓存的投票信息 //处理之前缓存的投票信息
for i := 0; i < len(cs.cachedVotes); i++ { for i := 0; i < len(cs.cachedVotes); i++ {
cs.dposState.recvVote(cs, cs.cachedVotes[i]) cs.dposState.recvVote(cs, cs.cachedVotes[i])
...@@ -480,7 +480,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) { ...@@ -480,7 +480,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) {
} }
} }
//1s后检查是否出块,是否需要重新投票 //1s后检查是否出块,是否需要重新投票
cs.resetTimer(time.Millisecond * 500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
} }
return return
} }
...@@ -494,7 +494,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) { ...@@ -494,7 +494,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) {
dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState") dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState")
//由于连接多数情况下正常,快速触发InitState的超时处理 //由于连接多数情况下正常,快速触发InitState的超时处理
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} }
func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) { func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) {
...@@ -529,7 +529,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) ...@@ -529,7 +529,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
} }
} }
//1s后检查是否出块,是否需要重新投票 //1s后检查是否出块,是否需要重新投票
cs.stopAndResetTimer(time.Millisecond * 500, VotedStateType) cs.stopAndResetTimer(time.Millisecond*500, VotedStateType)
} else if result == continueToVote { } else if result == continueToVote {
dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...") dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...")
} else { } else {
...@@ -538,7 +538,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) ...@@ -538,7 +538,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
cs.ClearVotes() cs.ClearVotes()
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState") dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState")
cs.stopAndResetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.stopAndResetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} }
} }
...@@ -578,9 +578,9 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -578,9 +578,9 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//当前节点为出块节点 //当前节点为出块节点
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。 //如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height + 1 < cs.currentVote.Height { if block.Height+1 < cs.currentVote.Height {
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height) dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.resetTimer(time.Second * 1, VotedStateType) cs.resetTimer(time.Second*1, VotedStateType)
return return
} }
...@@ -639,7 +639,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -639,7 +639,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
...@@ -659,7 +659,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -659,7 +659,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
...@@ -674,7 +674,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -674,7 +674,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
if block.BlockTime >= task.BlockStop { if block.BlockTime >= task.BlockStop {
//已出块,或者时间落后了。 //已出块,或者时间落后了。
dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.BlockStop, "now", now) dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.BlockStop, "now", now)
cs.resetTimer(time.Second * 1, VotedStateType) cs.resetTimer(time.Second*1, VotedStateType)
return return
} else if block.BlockTime < task.BlockStart { } else if block.BlockTime < task.BlockStart {
...@@ -684,12 +684,12 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -684,12 +684,12 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.client.SetBlockTime(task.BlockStop) cs.client.SetBlockTime(task.BlockStop)
cs.client.CreateBlock() cs.client.CreateBlock()
cs.resetTimer(time.Millisecond * 500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
return return
} }
dposlog.Info("Wait time to create block near blockStop.") dposlog.Info("Wait time to create block near blockStop.")
cs.resetTimer(time.Millisecond * 500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
return return
} else { } else {
...@@ -697,7 +697,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -697,7 +697,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
dposlog.Info("Wait to next block cycle.", "waittime", task.BlockStop-now+1) dposlog.Info("Wait to next block cycle.", "waittime", task.BlockStop-now+1)
//cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType) //cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType)
cs.resetTimer(time.Millisecond * 500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
return return
} }
} else { } else {
...@@ -717,7 +717,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -717,7 +717,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.ClearVotes() cs.ClearVotes()
cs.SetState(WaitNotifyStateObj) cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState")
cs.resetTimer(time.Duration(timeoutWaitNotify) * time.Millisecond, WaitNotifyStateType) cs.resetTimer(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil { if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify) cs.dposState.recvNotify(cs, cs.cachedNotify)
} }
...@@ -726,7 +726,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -726,7 +726,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//设置超时时间 //设置超时时间
dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop-now+1) dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop-now+1)
cs.resetTimer(time.Second * time.Duration(cs.currentVote.PeriodStop - now + 1), VotedStateType) cs.resetTimer(time.Second*time.Duration(cs.currentVote.PeriodStop-now+1), VotedStateType)
return return
} }
} }
...@@ -776,7 +776,7 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot ...@@ -776,7 +776,7 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot
cs.ClearVotes() cs.ClearVotes()
cs.SetState(WaitNotifyStateObj) cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState") dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState")
cs.stopAndResetTimer(time.Duration(timeoutWaitNotify) * time.Millisecond, WaitNotifyStateType) cs.stopAndResetTimer(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil { if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify) cs.dposState.recvNotify(cs, cs.cachedNotify)
} }
...@@ -806,7 +806,7 @@ func (wait *WaitNofifyState) timeOut(cs *ConsensusState) { ...@@ -806,7 +806,7 @@ func (wait *WaitNofifyState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState")
cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} }
func (wait *WaitNofifyState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) { func (wait *WaitNofifyState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment