Commit 318c9aa7 authored by pengjun's avatar pengjun

Merge branch 'master' into issue-627-collateralize

parents f947ef51 b09e1e21
...@@ -90,13 +90,13 @@ ineffassign: ...@@ -90,13 +90,13 @@ ineffassign:
@golangci-lint run --no-config --issues-exit-code=1 --deadline=2m --disable-all --enable=ineffassign -n ${PKG_LIST_INEFFASSIGN} @golangci-lint run --no-config --issues-exit-code=1 --deadline=2m --disable-all --enable=ineffassign -n ${PKG_LIST_INEFFASSIGN}
race: ## Run data race detector race: ## Run data race detector
@go test -race -short $(PKG_LIST) @go test -parallel=8 -race -short $(PKG_LIST)
test: ## Run unittests test: ## Run unittests
@go test -race $(PKG_LIST) @go test -parallel=8 -race $(PKG_LIST)
testq: ## Run unittests testq: ## Run unittests
@go test $(PKG_LIST) @go test -parallel=8 $(PKG_LIST)
fmt: fmt_proto fmt_shell ## go fmt fmt: fmt_proto fmt_shell ## go fmt
@go fmt ./... @go fmt ./...
......
...@@ -8,18 +8,14 @@ function dapp_test_rpc() { ...@@ -8,18 +8,14 @@ function dapp_test_rpc() {
local ip=$1 local ip=$1
echo "============ # dapp rpc test begin =============" echo "============ # dapp rpc test begin ============="
if [ -d dapptest ]; then if [ -d dapptest ]; then
cp $DAPP_TEST_COMMON dapptest/ cp "$DAPP_TEST_COMMON" dapptest/
cd dapptest || return cd dapptest || return
dir=$(find . -maxdepth 1 -type d ! -name dapptest ! -name ticket ! -name . | sed 's/^\.\///' | sort)
echo "dapps list: $dir"
for app in $dir; do
echo "=========== # $app rpc test ============="
./"$app/${RPC_TESTFILE}" "$ip"
echo "=========== # $app rpc end ============="
done
##ticket用例最后执行 dapps=$(find . -maxdepth 1 -type d ! -name dapptest ! -name . | sed 's/^\.\///' | sort)
./ticket/"${RPC_TESTFILE}" "$ip" echo "dapps list: $dapps"
parallel -k --retries 3 --joblog ./testlog ./{}/"${RPC_TESTFILE}" "$ip" ::: "$dapps"
echo "check dapps test log"
cat ./testlog
fi fi
echo "============ # dapp rpc test end =============" echo "============ # dapp rpc test end ============="
} }
...@@ -184,16 +184,12 @@ function miner() { ...@@ -184,16 +184,12 @@ function miner() {
exit 1 exit 1
fi fi
sleep 1
echo "=========== # unlock wallet =============" echo "=========== # unlock wallet ============="
result=$(${1} wallet unlock -p 1314fuzamei -t 0 | jq ".isok") result=$(${1} wallet unlock -p 1314fuzamei -t 0 | jq ".isok")
if [ "${result}" = "false" ]; then if [ "${result}" = "false" ]; then
exit 1 exit 1
fi fi
sleep 1
echo "=========== # import private key returnAddr =============" echo "=========== # import private key returnAddr ============="
result=$(${1} account import_key -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944 -l returnAddr | jq ".label") result=$(${1} account import_key -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944 -l returnAddr | jq ".label")
echo "${result}" echo "${result}"
...@@ -201,8 +197,6 @@ function miner() { ...@@ -201,8 +197,6 @@ function miner() {
exit 1 exit 1
fi fi
sleep 1
echo "=========== # import private key mining =============" echo "=========== # import private key mining ============="
result=$(${1} account import_key -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01 -l minerAddr | jq ".label") result=$(${1} account import_key -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01 -l minerAddr | jq ".label")
echo "${result}" echo "${result}"
...@@ -210,13 +204,11 @@ function miner() { ...@@ -210,13 +204,11 @@ function miner() {
exit 1 exit 1
fi fi
sleep 1
echo "=========== # close auto mining =============" echo "=========== # close auto mining ============="
#result=$(${1} wallet auto_mine -f 0 | jq ".isok") result=$(${1} wallet auto_mine -f 1 | jq ".isok")
#if [ "${result}" = "false" ]; then if [ "${result}" = "false" ]; then
# exit 1 exit 1
#fi fi
} }
function block_wait() { function block_wait() {
...@@ -289,7 +281,7 @@ function check_docker_container() { ...@@ -289,7 +281,7 @@ function check_docker_container() {
function sync_status() { function sync_status() {
echo "=========== query sync status========== " echo "=========== query sync status========== "
local sync_status local sync_status
local count=100 local count=1000
local wait_sec=0 local wait_sec=0
while [ $count -gt 0 ]; do while [ $count -gt 0 ]; do
sync_status=$(${1} net is_sync) sync_status=$(${1} net is_sync)
...@@ -302,9 +294,9 @@ function sync_status() { ...@@ -302,9 +294,9 @@ function sync_status() {
fi fi
((count--)) ((count--))
wait_sec=$((wait_sec + 1)) wait_sec=$((wait_sec + 1))
sleep 1 sleep 0.1
done done
echo "sync wait ${wait_sec} s" echo "sync wait ${wait_sec}/10 s"
} }
function sync() { function sync() {
...@@ -373,8 +365,6 @@ function dapp_test_address() { ...@@ -373,8 +365,6 @@ function dapp_test_address() {
exit 1 exit 1
fi fi
sleep 1
echo "=========== # import private key dapptest2 mining =============" echo "=========== # import private key dapptest2 mining ============="
result=$(${1} account import_key -k 2116459C0EC8ED01AA0EEAE35CAC5C96F94473F7816F114873291217303F6989 -l dapptest2 | jq ".label") result=$(${1} account import_key -k 2116459C0EC8ED01AA0EEAE35CAC5C96F94473F7816F114873291217303F6989 -l dapptest2 | jq ".label")
echo "${result}" echo "${result}"
...@@ -387,7 +377,7 @@ function dapp_test_address() { ...@@ -387,7 +377,7 @@ function dapp_test_address() {
exit 1 exit 1
fi fi
sleep 1 block_wait "${1}" 1
hash=$(${1} send coins transfer -a 1500 -n transfer -t 1PUiGcbsccfxW3zuvHXZBJfznziph5miAo -k 2116459C0EC8ED01AA0EEAE35CAC5C96F94473F7816F114873291217303F6989) hash=$(${1} send coins transfer -a 1500 -n transfer -t 1PUiGcbsccfxW3zuvHXZBJfznziph5miAo -k 2116459C0EC8ED01AA0EEAE35CAC5C96F94473F7816F114873291217303F6989)
echo "${hash}" echo "${hash}"
......
#!/bin/bash
# 在 plugin/plugin_type/plugin_name 找出fork
function subdir_forks() {
plugin_dir=$1
plugin_name=$2
full_dir=$1
forks=$(grep types.RegisterDappFork "${full_dir}" -R | cut -d '(' -f 2 | cut -d ')' -f 1 | sed 's/ //g')
if [ -z "${forks}" ]; then
return
fi
cnt=$(echo "${forks}" | grep "^\"" | wc -l)
if [ $cnt -gt 0 ]; then
name=$(echo $forks | head -n1 | cut -d ',' -f 1 | sed 's/"//g')
echo "[fork.sub.${name}]"
else
echo "[fork.sub.${plugin_name}]";
fi
for fork in "${forks}"
do
echo "${fork}" | awk -F ',' '{ \
if(match($2,"\"")) gsub("\"","",$2); else gsub("X$","",$2); \
print $2 "=" $3}'
#/*print "debug" $1 $2 $3;*/ \
done
echo
}
dir=$(go list -f '{{.Dir}}' github.com/33cn/plugin)/plugin/
plugins=$(find $dir -maxdepth 2 -mindepth 2 -type d | sort)
for plugin in ${plugins}
do
name=$(echo $plugin | sed 's/.*\///g')
subdir_forks $plugin $name
done
...@@ -12,7 +12,7 @@ mkdir -p "$COVERAGE_DIR" ...@@ -12,7 +12,7 @@ mkdir -p "$COVERAGE_DIR"
# Create a coverage file for each package # Create a coverage file for each package
for package in ${PKG_LIST}; do for package in ${PKG_LIST}; do
go test -covermode=count -coverprofile "${COVERAGE_DIR}/${package##*/}.cov" "$package" go test -parallel=8 -covermode=count -coverprofile "${COVERAGE_DIR}/${package##*/}.cov" "$package"
done done
# Merge the coverage profile files # Merge the coverage profile files
......
...@@ -85,8 +85,8 @@ maxTxNumber = 1600 ...@@ -85,8 +85,8 @@ maxTxNumber = 1600
[mver.consensus.paracross] [mver.consensus.paracross]
coinReward = 18 coinReward=18
coinDevFund = 12 coinDevFund=12
[consensus.sub.para] [consensus.sub.para]
...@@ -97,7 +97,6 @@ ParaRemoteGrpcClient="localhost:8802" ...@@ -97,7 +97,6 @@ ParaRemoteGrpcClient="localhost:8802"
startHeight=345850 startHeight=345850
#打包时间间隔,单位秒 #打包时间间隔,单位秒
writeBlockSeconds=2 writeBlockSeconds=2
#验证账户,验证节点需要配置自己的账户,并且钱包导入对应种子,非验证节点留空 #验证账户,验证节点需要配置自己的账户,并且钱包导入对应种子,非验证节点留空
authAccount="" authAccount=""
#云端主链节点切换后,平行链适配新主链节点block,回溯查找和自己记录的相同blockhash的深度 #云端主链节点切换后,平行链适配新主链节点block,回溯查找和自己记录的相同blockhash的深度
...@@ -108,8 +107,8 @@ genesisAmount=100000000 ...@@ -108,8 +107,8 @@ genesisAmount=100000000
MainForkParacrossCommitTx=2270000 MainForkParacrossCommitTx=2270000
#平行链自共识开启对应的主链高度,需要大于等于MainForkParacrossCommitTx=2270000, -1 不开启 #平行链自共识开启对应的主链高度,需要大于等于MainForkParacrossCommitTx=2270000, -1 不开启
MainParaSelfConsensusForkHeight=-1 MainParaSelfConsensusForkHeight=-1
#主链开启循环检查共识交易done的fork高度 #主链开启循环检查共识交易done的fork高度,需要和主链保持严格一致,不可修改,4320000是bityuan主链对应高度, ycc或其他按实际修改
MainLoopCheckCommitTxDoneForkHeight=-1 MainLoopCheckCommitTxDoneForkHeight=4320000
#主链每隔几个没有相关平行链交易的区块,平行链上打包空区块,缺省从平行链blockHeight=0开始,依次增长,空块间隔不能为0 #主链每隔几个没有相关平行链交易的区块,平行链上打包空区块,缺省从平行链blockHeight=0开始,依次增长,空块间隔不能为0
[[consensus.sub.para.emptyBlockInterval]] [[consensus.sub.para.emptyBlockInterval]]
blockHeight=0 blockHeight=0
...@@ -250,7 +249,7 @@ ForkTradePrice = 0 ...@@ -250,7 +249,7 @@ ForkTradePrice = 0
Enable=0 Enable=0
ForkParacrossWithdrawFromParachain=0 ForkParacrossWithdrawFromParachain=0
ForkParacrossCommitTx=0 ForkParacrossCommitTx=0
ForkLoopCheckCommitTxDone=-1 ForkLoopCheckCommitTxDone=0
[fork.sub.evm] [fork.sub.evm]
Enable=0 Enable=0
......
Title="chain33" Title="chain33"
TestNet=true TestNet=true
FixTime=false FixTime=false
version="6.2.0" version="6.3.0"
[log] [log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit # 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
......
...@@ -3,7 +3,7 @@ module github.com/33cn/plugin ...@@ -3,7 +3,7 @@ module github.com/33cn/plugin
go 1.12 go 1.12
require ( require (
github.com/33cn/chain33 v0.0.0-20190916043245-c5f98f6a92f5 github.com/33cn/chain33 v0.0.0-20190925142515-31e357c36c74
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/NebulousLabs/Sia v1.3.7 github.com/NebulousLabs/Sia v1.3.7
github.com/btcsuite/btcd v0.0.0-20181013004428-67e573d211ac github.com/btcsuite/btcd v0.0.0-20181013004428-67e573d211ac
......
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/33cn/chain33 v0.0.0-20190916043245-c5f98f6a92f5 h1:Nr0twQPrU5o8JE7jjXlaUNS8qQpREQxLi9IAYSAGAIc= github.com/33cn/chain33 v0.0.0-20190925142515-31e357c36c74 h1:8PC5TDbLIV5haxz3uhiSS2zrgDwkAFQKOSa6KgNTn9c=
github.com/33cn/chain33 v0.0.0-20190916043245-c5f98f6a92f5/go.mod h1:4I8n+Zyf3t0UKM5jjpqJY627Tub62oXkLsdzIv4r6rQ= github.com/33cn/chain33 v0.0.0-20190925142515-31e357c36c74/go.mod h1:4I8n+Zyf3t0UKM5jjpqJY627Tub62oXkLsdzIv4r6rQ=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7 h1:PqzgE6kAMi81xWQA2QIVxjWkFHptGgC547vchpUbtFo= github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7 h1:PqzgE6kAMi81xWQA2QIVxjWkFHptGgC547vchpUbtFo=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
......
...@@ -53,16 +53,6 @@ var ( ...@@ -53,16 +53,6 @@ var (
msgQueueSize = 1000 msgQueueSize = 1000
) )
// internally generated messages which may update the state
type timeoutInfo struct {
Duration time.Duration `json:"duration"`
State int `json:"state"`
}
func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v", ti.Duration)
}
// ConsensusState handles execution of the consensus algorithm. // ConsensusState handles execution of the consensus algorithm.
type ConsensusState struct { type ConsensusState struct {
// config details // config details
...@@ -78,7 +68,7 @@ type ConsensusState struct { ...@@ -78,7 +68,7 @@ type ConsensusState struct {
// msgs from ourself, or by timeouts // msgs from ourself, or by timeouts
peerMsgQueue chan MsgInfo peerMsgQueue chan MsgInfo
internalMsgQueue chan MsgInfo internalMsgQueue chan MsgInfo
timeoutTicker TimeoutTicker timer *time.Timer
broadcastChannel chan<- MsgInfo broadcastChannel chan<- MsgInfo
ourID ID ourID ID
...@@ -120,7 +110,6 @@ func NewConsensusState(client *Client, valMgr ValidatorMgr) *ConsensusState { ...@@ -120,7 +110,6 @@ func NewConsensusState(client *Client, valMgr ValidatorMgr) *ConsensusState {
client: client, client: client,
peerMsgQueue: make(chan MsgInfo, msgQueueSize), peerMsgQueue: make(chan MsgInfo, msgQueueSize),
internalMsgQueue: make(chan MsgInfo, msgQueueSize), internalMsgQueue: make(chan MsgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
Quit: make(chan struct{}), Quit: make(chan struct{}),
dposState: InitStateObj, dposState: InitStateObj,
...@@ -184,55 +173,35 @@ func (cs *ConsensusState) SetPrivValidator(priv ttypes.PrivValidator, index int) ...@@ -184,55 +173,35 @@ func (cs *ConsensusState) SetPrivValidator(priv ttypes.PrivValidator, index int)
cs.privValidatorIndex = index cs.privValidatorIndex = index
} }
// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
//func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
// cs.mtx.Lock()
// defer cs.mtx.Unlock()
// cs.timeoutTicker = timeoutTicker
//}
// Start It start first time starts the timeout receive routines. // Start It start first time starts the timeout receive routines.
func (cs *ConsensusState) Start() { func (cs *ConsensusState) Start() {
if atomic.CompareAndSwapUint32(&cs.started, 0, 1) { if atomic.CompareAndSwapUint32(&cs.started, 0, 1) {
if atomic.LoadUint32(&cs.stopped) == 1 { if atomic.LoadUint32(&cs.stopped) == 1 {
dposlog.Error("ConsensusState already stoped") dposlog.Error("ConsensusState already stoped")
} }
cs.timeoutTicker.Start()
// now start the receiveRoutine // now start the receiveRoutine
go cs.receiveRoutine() go cs.receiveRoutine()
// schedule the first round!
cs.scheduleDPosTimeout(time.Second*3, InitStateType)
} }
} }
// Stop timer and receive routine // Stop timer and receive routine
func (cs *ConsensusState) Stop() { func (cs *ConsensusState) Stop() {
cs.timeoutTicker.Stop()
cs.Quit <- struct{}{} cs.Quit <- struct{}{}
} }
// Attempt to schedule a timeout (by sending timeoutInfo on the tickChan) // Attempt to reset the timer
func (cs *ConsensusState) scheduleDPosTimeout(duration time.Duration, stateType int) { func (cs *ConsensusState) resetTimer(duration time.Duration, stateType int) {
cs.timeoutTicker.ScheduleTimeout(timeoutInfo{Duration: duration, State: stateType}) dposlog.Info("set timer", "duration", duration, "state", StateTypeMapping[stateType])
} if !cs.timer.Stop() {
// send a msg into the receiveRoutine regarding our own proposal, block part, or vote
/*
func (cs *ConsensusState) sendInternalMessage(mi MsgInfo) {
select { select {
case cs.internalMsgQueue <- mi: case <-cs.timer.C:
default: default:
// NOTE: using the go-routine means our votes can
// be processed out of order.
// TODO: use CList here for strict determinism and
// attempt push to internalMsgQueue in receiveRoutine
dposlog.Info("Internal msg queue is full. Using a go-routine")
go func() { cs.internalMsgQueue <- mi }()
} }
}
cs.timer.Reset(duration)
} }
*/
// Updates ConsensusState and increments height to match that of state. // Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes ttypes.RoundStepNewHeight. // The round becomes 0 and cs.Step becomes ttypes.RoundStepNewHeight.
func (cs *ConsensusState) updateToValMgr(valMgr ValidatorMgr) { func (cs *ConsensusState) updateToValMgr(valMgr ValidatorMgr) {
...@@ -254,6 +223,8 @@ func (cs *ConsensusState) receiveRoutine() { ...@@ -254,6 +223,8 @@ func (cs *ConsensusState) receiveRoutine() {
} }
}() }()
cs.timer = time.NewTimer(time.Second * 3)
for { for {
var mi MsgInfo var mi MsgInfo
...@@ -265,12 +236,11 @@ func (cs *ConsensusState) receiveRoutine() { ...@@ -265,12 +236,11 @@ func (cs *ConsensusState) receiveRoutine() {
case mi = <-cs.internalMsgQueue: case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes // handles proposals, block parts, votes
cs.handleMsg(mi) cs.handleMsg(mi)
case ti := <-cs.timeoutTicker.Chan(): // tockChan: case <-cs.timer.C:
// if the timeout is relevant to the rs cs.handleTimeout()
// go to the next step
cs.handleTimeout(ti)
case <-cs.Quit: case <-cs.Quit:
dposlog.Info("ConsensusState recv quit signal.") dposlog.Info("ConsensusState recv quit signal.")
cs.timer.Stop()
return return
} }
} }
...@@ -302,9 +272,7 @@ func (cs *ConsensusState) handleMsg(mi MsgInfo) { ...@@ -302,9 +272,7 @@ func (cs *ConsensusState) handleMsg(mi MsgInfo) {
} }
} }
func (cs *ConsensusState) handleTimeout(ti timeoutInfo) { func (cs *ConsensusState) handleTimeout() {
dposlog.Debug("Received tock", "timeout", ti.Duration, "state", StateTypeMapping[ti.State])
// the timeout will now cause a state transition // the timeout will now cause a state transition
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
...@@ -445,14 +413,6 @@ func (cs *ConsensusState) CacheVotes(vote *dpostype.DPosVote) { ...@@ -445,14 +413,6 @@ func (cs *ConsensusState) CacheVotes(vote *dpostype.DPosVote) {
if !addrExistFlag { if !addrExistFlag {
cs.cachedVotes = append(cs.cachedVotes, vote) cs.cachedVotes = append(cs.cachedVotes, vote)
} else if vote.VoteTimestamp > cs.cachedVotes[index].VoteTimestamp { } else if vote.VoteTimestamp > cs.cachedVotes[index].VoteTimestamp {
/*
if index == len(cs.cachedVotes) - 1 {
cs.cachedVotes = append(cs.cachedVotes, vote)
}else {
cs.cachedVotes = append(cs.cachedVotes[:index], cs.dposVotes[(index + 1):]...)
cs.cachedVotes = append(cs.cachedVotes, vote)
}
*/
cs.cachedVotes[index] = vote cs.cachedVotes[index] = vote
} }
} }
......
...@@ -200,13 +200,6 @@ func DposPerf() { ...@@ -200,13 +200,6 @@ func DposPerf() {
fmt.Println("Verify CB failed.") fmt.Println("Verify CB failed.")
} }
//fmt.Println("=======start GetCBInfoByCircle!=======")
////first time, not hit
//dposClient.csState.GetCBInfoByCircle(task.Cycle)
//time.Sleep(1 * time.Second)
////second time, hit cache
//dposClient.csState.GetCBInfoByCircle(task.Cycle)
fmt.Println("=======start VoteVerify!=======") fmt.Println("=======start VoteVerify!=======")
vote := generateVote(dposClient.csState) vote := generateVote(dposClient.csState)
if nil == vote { if nil == vote {
...@@ -368,7 +361,6 @@ func DposPerf() { ...@@ -368,7 +361,6 @@ func DposPerf() {
} else { } else {
fmt.Println("SendTopNRegistTx failed") fmt.Println("SendTopNRegistTx failed")
} }
//sendTopNRegistTx(dposClient.csState, reg)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
fmt.Println("=======start QueryTopNCandidators!=======") fmt.Println("=======start QueryTopNCandidators!=======")
...@@ -431,7 +423,6 @@ func createConn2() error { ...@@ -431,7 +423,6 @@ func createConn2() error {
return err return err
} }
c = types.NewChain33Client(conn) c = types.NewChain33Client(conn)
//r = rand.New(rand.NewSource(types.Now().UnixNano()))
return nil return nil
} }
...@@ -510,7 +501,6 @@ func NormPut() { ...@@ -510,7 +501,6 @@ func NormPut() {
// SendCBTx method // SendCBTx method
func verifyCB(cs *ConsensusState, info *dty.DposCBInfo) bool { func verifyCB(cs *ConsensusState, info *dty.DposCBInfo) bool {
//info.Pubkey = strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()))
canonical := dty.CanonicalOnceCBInfo{ canonical := dty.CanonicalOnceCBInfo{
Cycle: info.Cycle, Cycle: info.Cycle,
StopHeight: info.StopHeight, StopHeight: info.StopHeight,
...@@ -582,49 +572,6 @@ func sendRegistVrfRPTx(cs *ConsensusState, info *dty.DposVrfRPRegist) bool { ...@@ -582,49 +572,6 @@ func sendRegistVrfRPTx(cs *ConsensusState, info *dty.DposVrfRPRegist) bool {
return true return true
} }
/*
func sendTopNRegistTx(cs *ConsensusState, reg *dty.TopNCandidatorRegist) bool {
//info.Pubkey = strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()))
obj := dty.CanonicalTopNCandidator(reg.Cand)
reg.Cand.Hash = obj.ID()
reg.Cand.SignerPubkey = cs.privValidator.GetPubKey().Bytes()
byteCB, err := json.Marshal(reg.Cand)
if err != nil {
dposlog.Error("marshal TopNCandidator failed", "err", err)
}
sig, err := cs.privValidator.SignMsg(byteCB)
if err != nil {
dposlog.Error("TopNCandidator failed.", "err", err)
return false
}
reg.Cand.Signature = sig.Bytes()
tx, err := cs.client.CreateTopNRegistTx(reg)
if err != nil {
dposlog.Error("CreateTopNRegistTx failed.", "err", err)
return false
}
tx.Fee = fee
cs.privValidator.SignTx(tx)
dposlog.Info("Sign TopNRegistTx ok.")
reply, err := c.SendTransaction(context.Background(), tx)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return false
}
if !reply.IsOk {
fmt.Fprintln(os.Stderr, errors.New(string(reply.GetMsg())))
return false
}
return true
}
*/
func sendTransferTx(fromKey, to string, amount int64) bool { func sendTransferTx(fromKey, to string, amount int64) bool {
signer := util.HexToPrivkey(fromKey) signer := util.HexToPrivkey(fromKey)
var tx *types.Transaction var tx *types.Transaction
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/rpc" "github.com/33cn/chain33/rpc"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
...@@ -454,7 +455,6 @@ func TestNode(t *testing.T) { ...@@ -454,7 +455,6 @@ func TestNode(t *testing.T) {
fmt.Println("=======start TestNode!=======") fmt.Println("=======start TestNode!=======")
Init() Init()
q1, chain1, s1, mem1, exec1, cs1, p2p1 := initEnvDpos1("chain33.test1.toml") q1, chain1, s1, mem1, exec1, cs1, p2p1 := initEnvDpos1("chain33.test1.toml")
//q2, chain2, s2, mem2, exec2, cs2, p2p2 := initEnvDpos2("chain33.test2.toml")
defer clearTestData1() defer clearTestData1()
defer chain1.Close() defer chain1.Close()
...@@ -465,14 +465,6 @@ func TestNode(t *testing.T) { ...@@ -465,14 +465,6 @@ func TestNode(t *testing.T) {
defer cs1.Close() defer cs1.Close()
defer p2p1.Close() defer p2p1.Close()
//defer chain2.Close()
//defer mem2.Close()
//defer exec2.Close()
//defer s2.Close()
//defer q2.Close()
//defer cs2.Close()
//defer p2p2.Close()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
_, _, err := createConn("127.0.0.1:8802") _, _, err := createConn("127.0.0.1:8802")
...@@ -480,11 +472,6 @@ func TestNode(t *testing.T) { ...@@ -480,11 +472,6 @@ func TestNode(t *testing.T) {
_, _, err = createConn("127.0.0.1:8802") _, _, err = createConn("127.0.0.1:8802")
} }
//_, _, err = createConn("127.0.0.1:8804")
//for err != nil {
// _, _, err = createConn("127.0.0.1:8804")
//}
fmt.Println("node1 ip:", cs1.(*Client).GetNode().IP) fmt.Println("node1 ip:", cs1.(*Client).GetNode().IP)
fmt.Println("node1 id:", cs1.(*Client).GetNode().ID) fmt.Println("node1 id:", cs1.(*Client).GetNode().ID)
fmt.Println("node1 network:", cs1.(*Client).GetNode().Network) fmt.Println("node1 network:", cs1.(*Client).GetNode().Network)
...@@ -513,8 +500,6 @@ func TestNode(t *testing.T) { ...@@ -513,8 +500,6 @@ func TestNode(t *testing.T) {
fmt.Println("TestNodeCompatibleWith ok") fmt.Println("TestNodeCompatibleWith ok")
//time.Sleep(2 * time.Second)
fmt.Println(q1.Name()) fmt.Println(q1.Name())
fmt.Println(cs1.(*Client).testFlag) fmt.Println(cs1.(*Client).testFlag)
fmt.Println(cs1.(*Client).GetConsensusState() != nil) fmt.Println(cs1.(*Client).GetConsensusState() != nil)
...@@ -528,23 +513,34 @@ func TestNode(t *testing.T) { ...@@ -528,23 +513,34 @@ func TestNode(t *testing.T) {
fmt.Println(cs1.(*Client).GenesisDoc().ChainID) fmt.Println(cs1.(*Client).GenesisDoc().ChainID)
fmt.Println("Validator index: ", cs1.(*Client).ValidatorIndex()) fmt.Println("Validator index: ", cs1.(*Client).ValidatorIndex())
//go cs2.(*Client).GetNode().DialPeerWithAddress("127.0.0.1:36656")
//require.Nil(t, err)
//err = cs1.(*Client).GetNode().DialPeerWithAddress("127.0.0.1:36657")
//require.Nil(t, err)
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
//cs1.(*Client).StopC()
if cs1.(*Client).GetNode().IsRunning() { if cs1.(*Client).GetNode().IsRunning() {
fmt.Println("=======cs1 is running=======") fmt.Println("=======cs1 is running=======")
//cs1.(*Client).GetConsensusState().Stop()
//cs1.(*Client).GetNode().Stop()
} else { } else {
fmt.Println("======= cs1 is not running=======") fmt.Println("======= cs1 is not running=======")
} }
fmt.Println("=======test state machine=======")
vote := &ttypes.DPosVote{}
InitStateObj.sendVote(cs1.(*Client).GetConsensusState(), vote)
voteReply := &ttypes.DPosVoteReply{}
InitStateObj.sendVoteReply(cs1.(*Client).GetConsensusState(), voteReply)
InitStateObj.recvVoteReply(cs1.(*Client).GetConsensusState(), voteReply)
notify := &ttypes.DPosNotify{}
InitStateObj.sendNotify(cs1.(*Client).GetConsensusState(), notify)
VotingStateObj.sendVoteReply(cs1.(*Client).GetConsensusState(), voteReply)
VotingStateObj.sendNotify(cs1.(*Client).GetConsensusState(), notify)
VotingStateObj.recvNotify(cs1.(*Client).GetConsensusState(), notify)
VotedStateObj.sendVote(cs1.(*Client).GetConsensusState(), vote)
WaitNotifyStateObj.sendVote(cs1.(*Client).GetConsensusState(), vote)
WaitNotifyStateObj.sendVoteReply(cs1.(*Client).GetConsensusState(), voteReply)
WaitNotifyStateObj.recvVoteReply(cs1.(*Client).GetConsensusState(), voteReply)
WaitNotifyStateObj.sendNotify(cs1.(*Client).GetConsensusState(), notify)
fmt.Println("=======testNode ok=======") fmt.Println("=======testNode ok=======")
} }
...@@ -601,43 +597,3 @@ func initEnvDpos1(configName string) (queue.Queue, *blockchain.BlockChain, queue ...@@ -601,43 +597,3 @@ func initEnvDpos1(configName string) (queue.Queue, *blockchain.BlockChain, queue
return q, chain, s, mem, exec, cs, network return q, chain, s, mem, exec, cs, network
} }
/*
func initEnvDpos2(configName string) (queue.Queue, *blockchain.BlockChain, queue.Module, queue.Module, *executor.Executor, queue.Module, queue.Module) {
var q = queue.New("channel2")
flag.Parse()
cfg, sub := types.InitCfg(configName)
types.Init(cfg.Title, cfg)
chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client())
exec := executor.New(cfg.Exec, sub.Exec)
exec.SetQueueClient(q.Client())
types.SetMinFee(0)
s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client())
var subcfg subConfig
if sub != nil {
types.MustDecode(sub.Consensus["dpos"], &subcfg)
}
encode, _ := json.Marshal(subcfg)
fmt.Println(string(encode))
cs := New(cfg.Consensus, sub.Consensus["dpos"])
cs.(*Client).SetTestFlag()
cs.SetQueueClient(q.Client())
mem := mempool.New(cfg.Mempool, nil)
mem.SetQueueClient(q.Client())
network := p2p.New(cfg.P2P)
network.SetQueueClient(q.Client())
rpc.InitCfg(cfg.RPC)
gapi := rpc.NewGRpcServer(q.Client(), nil)
go gapi.Listen()
return q, chain, s, mem, exec, cs, network
}
*/
...@@ -388,17 +388,17 @@ func (init *InitState) timeOut(cs *ConsensusState) { ...@@ -388,17 +388,17 @@ func (init *InitState) timeOut(cs *ConsensusState) {
cs.ClearVotes() cs.ClearVotes()
//设定超时时间,超时后再检查链接数量 //设定超时时间,超时后再检查链接数量
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} else { } else {
vote := generateVote(cs) vote := generateVote(cs)
if nil == vote { if nil == vote {
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil { if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil {
dposlog.Error("SignVote failed", "vote", vote.String()) dposlog.Error("SignVote failed", "vote", vote.String())
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
...@@ -413,7 +413,7 @@ func (init *InitState) timeOut(cs *ConsensusState) { ...@@ -413,7 +413,7 @@ func (init *InitState) timeOut(cs *ConsensusState) {
dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix()) dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix())
cs.dposState.sendVote(cs, vote.DPosVote) cs.dposState.sendVote(cs, vote.DPosVote)
cs.scheduleDPosTimeout(time.Duration(timeoutVoting)*time.Millisecond, VotingStateType) cs.resetTimer(time.Duration(timeoutVoting)*time.Millisecond, VotingStateType)
//处理之前缓存的投票信息 //处理之前缓存的投票信息
for i := 0; i < len(cs.cachedVotes); i++ { for i := 0; i < len(cs.cachedVotes); i++ {
cs.dposState.recvVote(cs, cs.cachedVotes[i]) cs.dposState.recvVote(cs, cs.cachedVotes[i])
...@@ -480,7 +480,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) { ...@@ -480,7 +480,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) {
} }
} }
//1s后检查是否出块,是否需要重新投票 //1s后检查是否出块,是否需要重新投票
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
} }
return return
} }
...@@ -494,7 +494,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) { ...@@ -494,7 +494,7 @@ func (voting *VotingState) timeOut(cs *ConsensusState) {
dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState") dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState")
//由于连接多数情况下正常,快速触发InitState的超时处理 //由于连接多数情况下正常,快速触发InitState的超时处理
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} }
func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) { func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) {
...@@ -529,7 +529,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) ...@@ -529,7 +529,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
} }
} }
//1s后检查是否出块,是否需要重新投票 //1s后检查是否出块,是否需要重新投票
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
} else if result == continueToVote { } else if result == continueToVote {
dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...") dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...")
} else { } else {
...@@ -538,7 +538,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) ...@@ -538,7 +538,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
cs.ClearVotes() cs.ClearVotes()
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState") dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} }
} }
...@@ -580,7 +580,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -580,7 +580,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。 //如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height+1 < cs.currentVote.Height { if block.Height+1 < cs.currentVote.Height {
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height) dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType) cs.resetTimer(time.Second*1, VotedStateType)
return return
} }
...@@ -639,7 +639,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -639,7 +639,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
...@@ -659,7 +659,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -659,7 +659,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return return
} }
...@@ -674,7 +674,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -674,7 +674,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
if block.BlockTime >= task.BlockStop { if block.BlockTime >= task.BlockStop {
//已出块,或者时间落后了。 //已出块,或者时间落后了。
dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.BlockStop, "now", now) dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.BlockStop, "now", now)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType) cs.resetTimer(time.Second*1, VotedStateType)
return return
} else if block.BlockTime < task.BlockStart { } else if block.BlockTime < task.BlockStart {
...@@ -684,12 +684,12 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -684,12 +684,12 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.client.SetBlockTime(task.BlockStop) cs.client.SetBlockTime(task.BlockStop)
cs.client.CreateBlock() cs.client.CreateBlock()
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
return return
} }
dposlog.Info("Wait time to create block near blockStop.") dposlog.Info("Wait time to create block near blockStop.")
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
return return
} else { } else {
...@@ -697,7 +697,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -697,7 +697,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
dposlog.Info("Wait to next block cycle.", "waittime", task.BlockStop-now+1) dposlog.Info("Wait to next block cycle.", "waittime", task.BlockStop-now+1)
//cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType) //cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType)
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType) cs.resetTimer(time.Millisecond*500, VotedStateType)
return return
} }
} else { } else {
...@@ -717,7 +717,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -717,7 +717,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.ClearVotes() cs.ClearVotes()
cs.SetState(WaitNotifyStateObj) cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState") dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState")
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType) cs.resetTimer(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil { if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify) cs.dposState.recvNotify(cs, cs.cachedNotify)
} }
...@@ -726,7 +726,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) { ...@@ -726,7 +726,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//设置超时时间 //设置超时时间
dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop-now+1) dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop-now+1)
cs.scheduleDPosTimeout(time.Second*time.Duration(cs.currentVote.PeriodStop-now+1), VotedStateType) cs.resetTimer(time.Second*time.Duration(cs.currentVote.PeriodStop-now+1), VotedStateType)
return return
} }
} }
...@@ -776,7 +776,7 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot ...@@ -776,7 +776,7 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot
cs.ClearVotes() cs.ClearVotes()
cs.SetState(WaitNotifyStateObj) cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState") dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState")
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType) cs.resetTimer(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil { if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify) cs.dposState.recvNotify(cs, cs.cachedNotify)
} }
...@@ -806,7 +806,7 @@ func (wait *WaitNofifyState) timeOut(cs *ConsensusState) { ...@@ -806,7 +806,7 @@ func (wait *WaitNofifyState) timeOut(cs *ConsensusState) {
cs.SetState(InitStateObj) cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState") dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType) cs.resetTimer(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} }
func (wait *WaitNofifyState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) { func (wait *WaitNofifyState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote) {
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"time"
)
var (
tickTockBufferSize = 10
)
// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface {
Start()
Stop()
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
}
// timeoutTicker wraps time.Timer,
// scheduling timeouts only for greater height/round/step
// than what it's already seen.
// Timeouts are scheduled along the tickChan,
// and fired on the tockChan.
type timeoutTicker struct {
timer *time.Timer
tickChan chan timeoutInfo // for scheduling timeouts
tockChan chan timeoutInfo // for notifying about them
}
// NewTimeoutTicker returns a new TimeoutTicker.
func NewTimeoutTicker() TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
tt.stopTimer() // don't want to fire until the first scheduled timeout
return tt
}
// OnStart implements cmn.Service. It starts the timeout routine.
func (t *timeoutTicker) Start() {
go t.timeoutRoutine()
}
// OnStop implements cmn.Service. It stops the timeout routine.
func (t *timeoutTicker) Stop() {
t.stopTimer()
}
// Chan returns a channel on which timeouts are sent.
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
return t.tockChan
}
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
//-------------------------------------------------------------
// stop the timer and drain if necessary
func (t *timeoutTicker) stopTimer() {
// Stop() returns false if it was already fired or was stopped
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
dposlog.Debug("Timer already stopped")
}
}
}
// send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) timeoutRoutine() {
dposlog.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-t.tickChan:
dposlog.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// stop the last timer
t.stopTimer()
// update timeoutInfo and reset timer
// NOTE time.Timer allows duration to be non-positive
ti = newti
t.timer.Reset(ti.Duration)
dposlog.Debug("Scheduled timeout", "dur", ti.Duration)
case <-t.timer.C:
dposlog.Info("Timed out", "dur", ti.Duration, "state", StateTypeMapping[ti.State])
// go routine here guarantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
}
}
}
package dpos
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTicker(t *testing.T) {
ticker := NewTimeoutTicker()
ticker.Start()
ti := timeoutInfo{
Duration: time.Second * time.Duration(3),
State: InitStateType,
}
fmt.Println("timeoutInfo:", ti.String())
now := time.Now().Unix()
ticker.ScheduleTimeout(ti)
<-ticker.Chan()
end := time.Now().Unix()
ticker.Stop()
assert.True(t, end-now >= 2)
fmt.Println("TestTicker ok")
}
...@@ -69,20 +69,23 @@ func initEnvRaft() (queue.Queue, *blockchain.BlockChain, queue.Module, queue.Mod ...@@ -69,20 +69,23 @@ func initEnvRaft() (queue.Queue, *blockchain.BlockChain, queue.Module, queue.Mod
flag.Parse() flag.Parse()
cfg, sub := types.InitCfg("chain33.test.toml") cfg, sub := types.InitCfg("chain33.test.toml")
types.Init(cfg.Title, cfg) types.Init(cfg.Title, cfg)
s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client())
chain := blockchain.New(cfg.BlockChain) chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client()) chain.SetQueueClient(q.Client())
exec := executor.New(cfg.Exec, sub.Exec) exec := executor.New(cfg.Exec, sub.Exec)
exec.SetQueueClient(q.Client()) exec.SetQueueClient(q.Client())
types.SetMinFee(0) types.SetMinFee(0)
s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client()) mem := mempool.New(cfg.Mempool, nil)
mem.SetQueueClient(q.Client())
cs := NewRaftCluster(cfg.Consensus, sub.Consensus["raft"]) cs := NewRaftCluster(cfg.Consensus, sub.Consensus["raft"])
cs.SetQueueClient(q.Client()) cs.SetQueueClient(q.Client())
mem := mempool.New(cfg.Mempool, nil)
mem.SetQueueClient(q.Client())
network := p2p.New(cfg.P2P) network := p2p.New(cfg.P2P)
network.SetQueueClient(q.Client()) network.SetQueueClient(q.Client())
......
...@@ -2,6 +2,7 @@ Title="local" ...@@ -2,6 +2,7 @@ Title="local"
TestNet=true TestNet=true
FixTime=false FixTime=false
[log] [log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit # 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
loglevel = "debug" loglevel = "debug"
...@@ -82,7 +83,7 @@ maxTxNumber = 1600 ...@@ -82,7 +83,7 @@ maxTxNumber = 1600
maxTxNumber = 10000 maxTxNumber = 10000
[mver.consensus.ForkChainParamV2] [mver.consensus.ForkChainParamV2]
powLimitBits = "0x1f2fffff" powLimitBits = "0x2f2fffff"
[mver.consensus.ForkTicketFundAddrV1] [mver.consensus.ForkTicketFundAddrV1]
fundKeyAddr = "1Ji3W12KGScCM7C2p8bg635sNkayDM8MGY" fundKeyAddr = "1Ji3W12KGScCM7C2p8bg635sNkayDM8MGY"
...@@ -113,7 +114,7 @@ genesisBlockTime=1514533394 ...@@ -113,7 +114,7 @@ genesisBlockTime=1514533394
[[consensus.sub.ticket.genesis]] [[consensus.sub.ticket.genesis]]
minerAddr="12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv" minerAddr="12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv"
returnAddr="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt" returnAddr="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
count=10000 count=3000
[[consensus.sub.ticket.genesis]] [[consensus.sub.ticket.genesis]]
minerAddr="1PUiGcbsccfxW3zuvHXZBJfznziph5miAo" minerAddr="1PUiGcbsccfxW3zuvHXZBJfznziph5miAo"
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"crypto/ecdsa" "crypto/ecdsa"
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/33cn/chain33/account" "github.com/33cn/chain33/account"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
...@@ -68,14 +69,18 @@ func testTicket(t *testing.T) { ...@@ -68,14 +69,18 @@ func testTicket(t *testing.T) {
status, err = mock33.GetAPI().GetWalletStatus() status, err = mock33.GetAPI().GetWalletStatus()
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, true, status.IsAutoMining) assert.Equal(t, true, status.IsAutoMining)
err = mock33.WaitHeight(50) start := time.Now()
height := int64(0)
hastclose := false
hastopen := false
for {
height += 20
err = mock33.WaitHeight(height)
assert.Nil(t, err) assert.Nil(t, err)
//查询票是否自动close,并且购买了新的票 //查询票是否自动close,并且购买了新的票
req := &types.ReqWalletTransactionList{Count: 1000} req := &types.ReqWalletTransactionList{Count: 1000}
list, err := mock33.GetAPI().WalletTransactionList(req) list, err := mock33.GetAPI().WalletTransactionList(req)
assert.Nil(t, err) assert.Nil(t, err)
hastclose := false
hastopen := false
for _, tx := range list.TxDetails { for _, tx := range list.TxDetails {
if tx.ActionName == "tclose" && tx.Receipt.Ty == 2 { if tx.ActionName == "tclose" && tx.Receipt.Ty == 2 {
hastclose = true hastclose = true
...@@ -84,6 +89,10 @@ func testTicket(t *testing.T) { ...@@ -84,6 +89,10 @@ func testTicket(t *testing.T) {
hastopen = true hastopen = true
} }
} }
if hastopen == true && hastclose == true || time.Since(start) > 100*time.Second {
break
}
}
assert.Equal(t, true, hastclose) assert.Equal(t, true, hastclose)
assert.Equal(t, true, hastopen) assert.Equal(t, true, hastopen)
//查询合约中的余额 //查询合约中的余额
......
...@@ -174,7 +174,7 @@ func TestRealNodeMempool(t *testing.T) { ...@@ -174,7 +174,7 @@ func TestRealNodeMempool(t *testing.T) {
mock33.WaitHeight(0) mock33.WaitHeight(0)
mock33.SendHot() mock33.SendHot()
mock33.WaitHeight(1) mock33.WaitHeight(1)
n := 20 n := 10
done := make(chan struct{}, n) done := make(chan struct{}, n)
keys := make([]crypto.PrivKey, n) keys := make([]crypto.PrivKey, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
...@@ -186,7 +186,7 @@ func TestRealNodeMempool(t *testing.T) { ...@@ -186,7 +186,7 @@ func TestRealNodeMempool(t *testing.T) {
mock33.Wait() mock33.Wait()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go func(priv crypto.PrivKey) { go func(priv crypto.PrivKey) {
for i := 0; i < 100; i++ { for i := 0; i < 30; i++ {
tx := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000) tx := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000)
reply, err := mock33.GetAPI().SendTx(tx) reply, err := mock33.GetAPI().SendTx(tx)
if err != nil { if err != nil {
......
...@@ -163,7 +163,7 @@ func TestRealNodeMempool(t *testing.T) { ...@@ -163,7 +163,7 @@ func TestRealNodeMempool(t *testing.T) {
mock33.WaitHeight(0) mock33.WaitHeight(0)
mock33.SendHot() mock33.SendHot()
mock33.WaitHeight(1) mock33.WaitHeight(1)
n := 20 n := 10
done := make(chan struct{}, n) done := make(chan struct{}, n)
keys := make([]crypto.PrivKey, n) keys := make([]crypto.PrivKey, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
...@@ -175,7 +175,7 @@ func TestRealNodeMempool(t *testing.T) { ...@@ -175,7 +175,7 @@ func TestRealNodeMempool(t *testing.T) {
mock33.Wait() mock33.Wait()
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go func(priv crypto.PrivKey) { go func(priv crypto.PrivKey) {
for i := 0; i < 100; i++ { for i := 0; i < 30; i++ {
tx := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000) tx := util.CreateCoinsTx(priv, mock33.GetGenesisAddress(), types.Coin/1000)
reply, err := mock33.GetAPI().SendTx(tx) reply, err := mock33.GetAPI().SendTx(tx)
if err != nil { if err != nil {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment