Commit 5b17e84a authored by 张振华's avatar 张振华

update code

parent 717d1fb3
...@@ -53,16 +53,6 @@ var ( ...@@ -53,16 +53,6 @@ var (
msgQueueSize = 1000 msgQueueSize = 1000
) )
// internally generated messages which may update the state
type timeoutInfo struct {
Duration time.Duration `json:"duration"`
State int `json:"state"`
}
func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v", ti.Duration)
}
// ConsensusState handles execution of the consensus algorithm. // ConsensusState handles execution of the consensus algorithm.
type ConsensusState struct { type ConsensusState struct {
// config details // config details
...@@ -78,7 +68,7 @@ type ConsensusState struct { ...@@ -78,7 +68,7 @@ type ConsensusState struct {
// msgs from ourself, or by timeouts // msgs from ourself, or by timeouts
peerMsgQueue chan MsgInfo peerMsgQueue chan MsgInfo
internalMsgQueue chan MsgInfo internalMsgQueue chan MsgInfo
timeoutTicker TimeoutTicker timer *time.Timer
broadcastChannel chan<- MsgInfo broadcastChannel chan<- MsgInfo
ourID ID ourID ID
...@@ -120,7 +110,6 @@ func NewConsensusState(client *Client, valMgr ValidatorMgr) *ConsensusState { ...@@ -120,7 +110,6 @@ func NewConsensusState(client *Client, valMgr ValidatorMgr) *ConsensusState {
client: client, client: client,
peerMsgQueue: make(chan MsgInfo, msgQueueSize), peerMsgQueue: make(chan MsgInfo, msgQueueSize),
internalMsgQueue: make(chan MsgInfo, msgQueueSize), internalMsgQueue: make(chan MsgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
Quit: make(chan struct{}), Quit: make(chan struct{}),
dposState: InitStateObj, dposState: InitStateObj,
...@@ -184,55 +173,38 @@ func (cs *ConsensusState) SetPrivValidator(priv ttypes.PrivValidator, index int) ...@@ -184,55 +173,38 @@ func (cs *ConsensusState) SetPrivValidator(priv ttypes.PrivValidator, index int)
cs.privValidatorIndex = index cs.privValidatorIndex = index
} }
// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
//func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
// cs.mtx.Lock()
// defer cs.mtx.Unlock()
// cs.timeoutTicker = timeoutTicker
//}
// Start It start first time starts the timeout receive routines. // Start It start first time starts the timeout receive routines.
func (cs *ConsensusState) Start() { func (cs *ConsensusState) Start() {
if atomic.CompareAndSwapUint32(&cs.started, 0, 1) { if atomic.CompareAndSwapUint32(&cs.started, 0, 1) {
if atomic.LoadUint32(&cs.stopped) == 1 { if atomic.LoadUint32(&cs.stopped) == 1 {
dposlog.Error("ConsensusState already stoped") dposlog.Error("ConsensusState already stoped")
} }
cs.timeoutTicker.Start()
// now start the receiveRoutine // now start the receiveRoutine
go cs.receiveRoutine() go cs.receiveRoutine()
// schedule the first round!
cs.scheduleDPosTimeout(time.Second*3, InitStateType)
} }
} }
// Stop timer and receive routine // Stop timer and receive routine
func (cs *ConsensusState) Stop() { func (cs *ConsensusState) Stop() {
cs.timeoutTicker.Stop()
cs.Quit <- struct{}{} cs.Quit <- struct{}{}
} }
// Attempt to schedule a timeout (by sending timeoutInfo on the tickChan) // Attempt to reset the timer
func (cs *ConsensusState) scheduleDPosTimeout(duration time.Duration, stateType int) { func (cs *ConsensusState) resetTimer(duration time.Duration, stateType int) {
cs.timeoutTicker.ScheduleTimeout(timeoutInfo{Duration: duration, State: stateType}) dposlog.Info("set timer","duration", duration, "state", StateTypeMapping[stateType])
cs.timer.Reset(duration)
} }
// send a msg into the receiveRoutine regarding our own proposal, block part, or vote // Attempt to reset the timer
/* func (cs *ConsensusState) stopAndResetTimer(duration time.Duration, stateType int) {
func (cs *ConsensusState) sendInternalMessage(mi MsgInfo) { dposlog.Info("set timer","duration", duration, "state", StateTypeMapping[stateType])
select { if !cs.timer.Stop() {
case cs.internalMsgQueue <- mi: <-cs.timer.C
default:
// NOTE: using the go-routine means our votes can
// be processed out of order.
// TODO: use CList here for strict determinism and
// attempt push to internalMsgQueue in receiveRoutine
dposlog.Info("Internal msg queue is full. Using a go-routine")
go func() { cs.internalMsgQueue <- mi }()
} }
cs.timer.Reset(duration)
} }
*/
// Updates ConsensusState and increments height to match that of state. // Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes ttypes.RoundStepNewHeight. // The round becomes 0 and cs.Step becomes ttypes.RoundStepNewHeight.
func (cs *ConsensusState) updateToValMgr(valMgr ValidatorMgr) { func (cs *ConsensusState) updateToValMgr(valMgr ValidatorMgr) {
...@@ -254,6 +226,8 @@ func (cs *ConsensusState) receiveRoutine() { ...@@ -254,6 +226,8 @@ func (cs *ConsensusState) receiveRoutine() {
} }
}() }()
cs.timer = time.NewTimer(time.Second * 3)
for { for {
var mi MsgInfo var mi MsgInfo
...@@ -265,12 +239,11 @@ func (cs *ConsensusState) receiveRoutine() { ...@@ -265,12 +239,11 @@ func (cs *ConsensusState) receiveRoutine() {
case mi = <-cs.internalMsgQueue: case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes // handles proposals, block parts, votes
cs.handleMsg(mi) cs.handleMsg(mi)
case ti := <-cs.timeoutTicker.Chan(): // tockChan: case <- cs.timer.C:
// if the timeout is relevant to the rs cs.handleTimeout()
// go to the next step
cs.handleTimeout(ti)
case <-cs.Quit: case <-cs.Quit:
dposlog.Info("ConsensusState recv quit signal.") dposlog.Info("ConsensusState recv quit signal.")
cs.timer.Stop()
return return
} }
} }
...@@ -302,9 +275,7 @@ func (cs *ConsensusState) handleMsg(mi MsgInfo) { ...@@ -302,9 +275,7 @@ func (cs *ConsensusState) handleMsg(mi MsgInfo) {
} }
} }
func (cs *ConsensusState) handleTimeout(ti timeoutInfo) { func (cs *ConsensusState) handleTimeout() {
dposlog.Debug("Received tock", "timeout", ti.Duration, "state", StateTypeMapping[ti.State])
// the timeout will now cause a state transition // the timeout will now cause a state transition
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
...@@ -445,14 +416,6 @@ func (cs *ConsensusState) CacheVotes(vote *dpostype.DPosVote) { ...@@ -445,14 +416,6 @@ func (cs *ConsensusState) CacheVotes(vote *dpostype.DPosVote) {
if !addrExistFlag { if !addrExistFlag {
cs.cachedVotes = append(cs.cachedVotes, vote) cs.cachedVotes = append(cs.cachedVotes, vote)
} else if vote.VoteTimestamp > cs.cachedVotes[index].VoteTimestamp { } else if vote.VoteTimestamp > cs.cachedVotes[index].VoteTimestamp {
/*
if index == len(cs.cachedVotes) - 1 {
cs.cachedVotes = append(cs.cachedVotes, vote)
}else {
cs.cachedVotes = append(cs.cachedVotes[:index], cs.dposVotes[(index + 1):]...)
cs.cachedVotes = append(cs.cachedVotes, vote)
}
*/
cs.cachedVotes[index] = vote cs.cachedVotes[index] = vote
} }
} }
......
...@@ -388,17 +388,17 @@ func (init *InitState) timeOut(cs *ConsensusState) { ...@@ -388,17 +388,17 @@ func (init *InitState) timeOut(cs *ConsensusState) {
cs.ClearVotes() cs.ClearVotes()
//设定超时时间,超时后再检查链接数量 //设定超时时间,超时后再检查链接数量
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
} else { } else {
vote := generateVote(cs) vote := generateVote(cs)
if nil == vote { if nil == vote {
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return return
} }
if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil { if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil {
dposlog.Error("SignVote failed", "vote", vote.String()) dposlog.Error("SignVote failed", "vote", vote.String())
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return return
} }
...@@ -413,7 +413,7 @@ func (init *InitState) timeOut(cs *ConsensusState) { ...@@ -413,7 +413,7 @@ func (init *InitState) timeOut(cs *ConsensusState) {
dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix()) dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix())
cs.dposState.sendVote(cs, vote.DPosVote) cs.dposState.sendVote(cs, vote.DPosVote)
cs.scheduleDPosTimeout(time.Duration(timeoutVoting)*time.Millisecond, VotingStateType) cs.resetTimer(time.Duration(timeoutVoting) * time.Millisecond, VotingStateType)
//处理之前缓存的投票信息 //处理之前缓存的投票信息
for i := 0; i < len(cs.cachedVotes); i++ { for i := 0; i < len(cs.cachedVotes); i++ {
cs.dposState.recvVote(cs, cs.cachedVotes[i]) cs.dposState.recvVote(cs, cs.cachedVotes[i])
...@@ -480,7 +480,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) { ...@@ -480,7 +480,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) {
} }
} }
//1s后检查是否出块,是否需要重新投票 //1s后检查是否出块,是否需要重新投票
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond * 500, VotedStateType)
} }
return return
} }
...@@ -494,7 +494,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) { ...@@ -494,7 +494,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) {
dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState") dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState")
//由于连接多数情况下正常,快速触发InitState的超时处理 //由于连接多数情况下正常,快速触发InitState的超时处理
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
} }
func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) { func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) {
...@@ -529,7 +529,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) ...@@ -529,7 +529,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
} }
} }
//1s后检查是否出块,是否需要重新投票 //1s后检查是否出块,是否需要重新投票
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.stopAndResetTimer(time.Millisecond * 500, VotedStateType)
} else if result == continueToVote { } else if result == continueToVote {
dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...") dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...")
} else { } else {
...@@ -538,7 +538,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) ...@@ -538,7 +538,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
cs.ClearVotes() cs.ClearVotes()
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState") dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.stopAndResetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
} }
} }
...@@ -578,9 +578,9 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -578,9 +578,9 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//当前节点为出块节点 //当前节点为出块节点
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。 //如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height+1 < cs.currentVote.Height { if block.Height + 1 < cs.currentVote.Height {
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height) dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType) cs.resetTimer(time.Second * 1, VotedStateType)
return return
} }
...@@ -639,7 +639,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -639,7 +639,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return return
} }
...@@ -659,7 +659,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -659,7 +659,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return return
} }
...@@ -674,7 +674,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -674,7 +674,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
if block.BlockTime >= task.BlockStop { if block.BlockTime >= task.BlockStop {
//已出块,或者时间落后了。 //已出块,或者时间落后了。
dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.BlockStop, "now", now) dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.BlockStop, "now", now)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType) cs.resetTimer(time.Second * 1, VotedStateType)
return return
} else if block.BlockTime < task.BlockStart { } else if block.BlockTime < task.BlockStart {
...@@ -684,12 +684,12 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -684,12 +684,12 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.client.SetBlockTime(task.BlockStop) cs.client.SetBlockTime(task.BlockStop)
cs.client.CreateBlock() cs.client.CreateBlock()
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond * 500, VotedStateType)
return return
} }
dposlog.Info("Wait time to create block near blockStop.") dposlog.Info("Wait time to create block near blockStop.")
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond * 500, VotedStateType)
return return
} else { } else {
...@@ -697,7 +697,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -697,7 +697,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
dposlog.Info("Wait to next block cycle.", "waittime", task.BlockStop-now+1) dposlog.Info("Wait to next block cycle.", "waittime", task.BlockStop-now+1)
//cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType) //cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType)
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond * 500, VotedStateType)
return return
} }
} else { } else {
...@@ -717,7 +717,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -717,7 +717,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.ClearVotes() cs.ClearVotes()
cs.SetState(WaitNotifyStateObj) cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState")
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType) cs.resetTimer(time.Duration(timeoutWaitNotify) * time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil { if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify) cs.dposState.recvNotify(cs, cs.cachedNotify)
} }
...@@ -726,7 +726,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -726,7 +726,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//设置超时时间 //设置超时时间
dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop-now+1) dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop-now+1)
cs.scheduleDPosTimeout(time.Second*time.Duration(cs.currentVote.PeriodStop-now+1), VotedStateType) cs.resetTimer(time.Second * time.Duration(cs.currentVote.PeriodStop - now + 1), VotedStateType)
return return
} }
} }
...@@ -776,7 +776,7 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot ...@@ -776,7 +776,7 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot
cs.ClearVotes() cs.ClearVotes()
cs.SetState(WaitNotifyStateObj) cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState") dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState")
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType) cs.stopAndResetTimer(time.Duration(timeoutWaitNotify) * time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil { if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify) cs.dposState.recvNotify(cs, cs.cachedNotify)
} }
...@@ -806,7 +806,7 @@ func (wait *WaitNofifyState) timeOut(cs *ConsensusState) { ...@@ -806,7 +806,7 @@ func (wait *WaitNofifyState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
} }
func (wait *WaitNofifyState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) { func (wait *WaitNofifyState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) {
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"time"
)
var (
tickTockBufferSize = 10
)
// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface {
Start()
Stop()
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
}
// timeoutTicker wraps time.Timer,
// scheduling timeouts only for greater height/round/step
// than what it's already seen.
// Timeouts are scheduled along the tickChan,
// and fired on the tockChan.
type timeoutTicker struct {
timer *time.Timer
tickChan chan timeoutInfo // for scheduling timeouts
tockChan chan timeoutInfo // for notifying about them
}
// NewTimeoutTicker returns a new TimeoutTicker.
func NewTimeoutTicker() TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
tt.stopTimer() // don't want to fire until the first scheduled timeout
return tt
}
// OnStart implements cmn.Service. It starts the timeout routine.
func (t *timeoutTicker) Start() {
go t.timeoutRoutine()
}
// OnStop implements cmn.Service. It stops the timeout routine.
func (t *timeoutTicker) Stop() {
t.stopTimer()
}
// Chan returns a channel on which timeouts are sent.
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
return t.tockChan
}
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
//-------------------------------------------------------------
// stop the timer and drain if necessary
func (t *timeoutTicker) stopTimer() {
// Stop() returns false if it was already fired or was stopped
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
dposlog.Debug("Timer already stopped")
}
}
}
// send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) timeoutRoutine() {
dposlog.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-t.tickChan:
dposlog.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// stop the last timer
t.stopTimer()
// update timeoutInfo and reset timer
// NOTE time.Timer allows duration to be non-positive
ti = newti
t.timer.Reset(ti.Duration)
dposlog.Debug("Scheduled timeout", "dur", ti.Duration)
case <-t.timer.C:
dposlog.Info("Timed out", "dur", ti.Duration, "state", StateTypeMapping[ti.State])
// go routine here guarantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
}
}
}
package dpos
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTicker(t *testing.T) {
ticker := NewTimeoutTicker()
ticker.Start()
ti := timeoutInfo{
Duration: time.Millisecond * time.Duration(2000),
State: InitStateType,
}
fmt.Println("timeoutInfo:", ti.String())
now := time.Now().Unix()
ticker.ScheduleTimeout(ti)
ti2 := <-ticker.Chan()
end := time.Now().Unix()
fmt.Println("timeoutInfo2:", ti2.String())
time.Sleep(time.Second * 3)
ticker.Stop()
assert.True(t, end-now >= 2)
fmt.Println("TestTicker ok", end-now)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment