Unverified Commit b8fa1f7b authored by vipwzw's avatar vipwzw Committed by GitHub

Merge branch 'master' into release-6.3.0

parents 10747da8 232e56e2
......@@ -9,12 +9,12 @@ matrix:
include:
- name: check_fmt
sudo: require
go: "1.12.x"
go: "1.13.x"
env:
- GO111MODULE=on
install:
- go get -u golang.org/x/tools/cmd/goimports
- go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.17.1
- go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.18.0
- go get -u mvdan.cc/sh/cmd/shfmt
- go get -u mvdan.cc/sh/cmd/gosh
script:
......@@ -22,7 +22,7 @@ matrix:
- make linter
- name: unit-test
go: "1.12.x"
go: "1.13.x"
env:
- GO111MODULE=on
install: skip
......@@ -31,7 +31,7 @@ matrix:
- name: coverage
if: branch = master
go: "1.12.x"
go: "1.13.x"
env:
- GO111MODULE=on
before_install:
......@@ -43,7 +43,7 @@ matrix:
- bash <(curl -s https://codecov.io/bash)
- name: auto-test
go: "1.12.x"
go: "1.13.x"
env:
- GO111MODULE=on
install: skip
......@@ -56,7 +56,7 @@ matrix:
sudo: required
services:
- docker
go: "1.12.x"
go: "1.13.x"
env:
- DOCKER_COMPOSE_VERSION=1.21.2
- GO111MODULE=on
......
......@@ -25,6 +25,7 @@ default: depends build
build: depends
go build $(BUILD_FLAGS) -v -i -o $(APP)
go build $(BUILD_FLAGS) -v -i -o $(CLI) $(SRC_CLI)
go build $(BUILD_FLAGS) -v -i -o build/fork-config github.com/33cn/plugin/cli/fork_config/
@cp chain33.toml $(CHAIN33_PATH)/build/system-test-rpc.sh build/
@cp chain33.para.toml build/ci/paracross/
......@@ -68,7 +69,7 @@ update: ## version 可以是git tag打的具体版本号,也可以是commit hash
go get github.com/33cn/chain33@master ;fi
@go mod tidy
dep:
@go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.17.1
@go get github.com/golangci/golangci-lint/cmd/golangci-lint@v1.18.0
@go get -u golang.org/x/tools/cmd/goimports
@go get -u github.com/mitchellh/gox
@go get -u github.com/vektra/mockery/.../
......
......@@ -230,6 +230,27 @@ function block_wait() {
echo "wait new block $count/10 s, cur height=$expect,old=$cur_height"
}
function tx_wait() {
if [ "$#" -lt 2 ]; then
echo "wrong tx_wait params"
exit 1
fi
local req=\"${2}\"
txhash=$(${1} tx query -s "${2}" | jq ".tx.hash")
local count=0
while true; do
txhash=$(${1} tx query -s "${2}" | jq ".tx.hash")
if [ "${txhash}" != "${req}" ]; then
count=$((count + 1))
echo "${txhash}" "${req}" "${count}"
sleep 0.1
else
RAW_TX_HASH=$txhash
echo "====query tx=$RAW_TX_HASH success"
break
fi
done
}
function block_wait2height() {
if [ "$#" -lt 3 ]; then
echo "wrong block_wait params"
......@@ -336,7 +357,8 @@ function transfer() {
echo "=========== # withdraw ============="
hash=$(${1} send coins transfer -a 2 -n deposit -t 1wvmD6RNHzwhY4eN75WnM6JcaAvNQ4nHx -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944)
echo "${hash}"
block_wait "${1}" 1
# block_wait "${1}" 2
tx_wait "${1}" "${hash}"
before=$(${1} account balance -a 14KEKbYtKKQm4wMthSK9J4La4nAiidGozt -e retrieve | jq -r ".balance")
if [ "${before}" == "0.0000" ]; then
echo "wrong ticket balance, should not be zero"
......@@ -345,7 +367,8 @@ function transfer() {
hash=$(${1} send coins withdraw -a 1 -n withdraw -e retrieve -k CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944)
echo "${hash}"
block_wait "${1}" 1
# block_wait "${1}" 1
tx_wait "${1}" "${hash}"
txs=$(${1} tx query_hash -s "${hash}" | jq ".txs")
if [ "${txs}" == "null" ]; then
echo "withdraw cannot find tx"
......@@ -354,7 +377,8 @@ function transfer() {
hash=$(${1} send coins transfer -a 1000 -n transfer -t 1E5saiXVb9mW8wcWUUZjsHJPZs5GmdzuSY -k 4257D8692EF7FE13C68B65D6A52F03933DB2FA5CE8FAF210B5B8B80C721CED01)
echo "${hash}"
block_wait "${1}" 1
# block_wait "${1}" 1
tx_wait "${1}" "${hash}"
}
function dapp_test_address() {
......@@ -382,11 +406,14 @@ function dapp_test_address() {
hash=$(${1} send coins transfer -a 1500 -n transfer -t 1PUiGcbsccfxW3zuvHXZBJfznziph5miAo -k 2116459C0EC8ED01AA0EEAE35CAC5C96F94473F7816F114873291217303F6989)
echo "${hash}"
tx_wait "${1}" "${hash}"
#total allocation for rpc test
hash=$(${1} send coins transfer -a 8000 -n transfer -t 1PcGKYYoLn1PLLJJodc1UpgWGeFAQasAkx -k 2116459C0EC8ED01AA0EEAE35CAC5C96F94473F7816F114873291217303F6989)
echo "${hash}"
block_wait "${1}" 1
# block_wait "${1}" 1
tx_wait "${1}" "${hash}"
}
function base_config() {
......
#!/bin/bash
# 在 plugin/plugin_type/plugin_name 找出fork
function subdir_forks() {
plugin_dir=$1
plugin_name=$2
full_dir=$1
forks=$(grep types.RegisterDappFork "${full_dir}" -R | cut -d '(' -f 2 | cut -d ')' -f 1 | sed 's/ //g')
if [ -z "${forks}" ]; then
return
fi
cnt=$(echo "${forks}" | grep "^\"" | wc -l)
if [ $cnt -gt 0 ]; then
name=$(echo $forks | head -n1 | cut -d ',' -f 1 | sed 's/"//g')
echo "[fork.sub.${name}]"
else
echo "[fork.sub.${plugin_name}]";
fi
for fork in "${forks}"
do
echo "${fork}" | awk -F ',' '{ \
if(match($2,"\"")) gsub("\"","",$2); else gsub("X$","",$2); \
print $2 "=" $3}'
#/*print "debug" $1 $2 $3;*/ \
done
echo
}
dir=$(go list -f '{{.Dir}}' github.com/33cn/plugin)/plugin/
plugins=$(find $dir -maxdepth 2 -mindepth 2 -type d | sort)
for plugin in ${plugins}
do
name=$(echo $plugin | sed 's/.*\///g')
subdir_forks $plugin $name
done
......@@ -90,8 +90,8 @@ coinDevFund=12
[consensus.sub.para]
#主链节点的grpc服务器ip,当前可以支持多ip负载均衡,如“101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com”
#ParaRemoteGrpcClient="183.129.226.74:8802,183.129.226.75:8802,101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.bityuan.com,cloud.bityuan.com"
#主链节点的grpc服务器ip,当前可以支持多ip负载均衡,如“118.31.177.1:8802,39.97.2.127:8802,120.77.111.44:8802,jiedian2.bityuan.com,cloud.bityuan.com”
#ParaRemoteGrpcClient="118.31.177.1:8802,39.97.2.127:8802,120.77.111.44:8802,jiedian2.bityuan.com,cloud.bityuan.com,183.129.226.74:8802,183.129.226.75:8802"
ParaRemoteGrpcClient="localhost:8802"
#主链指定高度的区块开始同步
startHeight=345850
......
package main
import (
"fmt"
"os"
"sort"
"strings"
_ "github.com/33cn/chain33/system"
"github.com/33cn/chain33/types"
_ "github.com/33cn/plugin/plugin"
)
func main() {
forks, err := types.CloneFork("chain33")
if err != nil {
fmt.Printf("clone fork failed: %v", err)
return
}
fmtForks(forks)
}
/*
两个规则:
key 有 ".", Part1.Part2 为 [fork.sub.Part1] Part2=value
key 没有 "." [fork.system] key=value
把相同段的fork打印到一起
[fork.system]
ForkChainParamV1= 0 # ForkBlockCheck=1560000
[fork.sub.ticket]
Enable=0 # manage.ForkManageExec=400000
[fork.sub.store-kvmvccmavl]
ForkKvmvccmavl=2270000 # store-kvmvccmavl.ForkKvmvccmavl=1870000
*/
func fmtForks(forks map[string]int64) {
systemFork := make(map[string]int64)
subFork := make(map[string]map[string]int64)
for k, v := range forks {
if strings.Contains(k, ".") {
str2 := strings.SplitN(k, ".", 2)
if len(str2) != 2 {
fmt.Fprintf(os.Stderr, "can't deal key=%s ", k)
continue
}
_, ok := subFork[str2[0]]
if !ok {
subFork[str2[0]] = make(map[string]int64)
}
subFork[str2[0]][str2[1]] = v
} else {
systemFork[k] = v
}
}
fmt.Println("[fork.system]")
for k, v := range systemFork {
fmt.Printf("%s=%d\n", k, v)
}
fmt.Println("")
plugins := make([]string, 0)
for plugin := range subFork {
plugins = append(plugins, plugin)
}
sort.Strings(plugins)
for _, plugin := range plugins {
fmt.Printf("[fork.sub.%s]\n", plugin)
forks := subFork[plugin]
for k, v := range forks {
fmt.Printf("%s=%d\n", k, v)
}
fmt.Println("")
}
}
......@@ -4,7 +4,7 @@ go 1.12
require (
github.com/33cn/chain33 v0.0.0-20191002010202-14cdc009dfb9
github.com/33cn/chain33 v0.0.0-20191011025601-06dbefe7d2e8
github.com/BurntSushi/toml v0.3.1
github.com/NebulousLabs/Sia v1.3.7
github.com/btcsuite/btcd v0.0.0-20181013004428-67e573d211ac
......
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/33cn/chain33 v0.0.0-20191002010202-14cdc009dfb9 h1:lZiDHZlnWRnF7BuoiGKccyqLv3COc29x8qoXcLiZeu0=
github.com/33cn/chain33 v0.0.0-20191002010202-14cdc009dfb9/go.mod h1:4I8n+Zyf3t0UKM5jjpqJY627Tub62oXkLsdzIv4r6rQ=
github.com/33cn/chain33 v0.0.0-20191011025601-06dbefe7d2e8 h1:YorXd8yAS26S49lY8Mdmn7Z5HhGJ1pat9QfXAEZfkw4=
github.com/33cn/chain33 v0.0.0-20191011025601-06dbefe7d2e8/go.mod h1:4I8n+Zyf3t0UKM5jjpqJY627Tub62oXkLsdzIv4r6rQ=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7 h1:PqzgE6kAMi81xWQA2QIVxjWkFHptGgC547vchpUbtFo=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
......
......@@ -359,12 +359,15 @@ func (client *blockSyncClient) addMinerTx(preStateHash []byte, block *types.Bloc
status := &pt.ParacrossNodeStatus{
Title: types.GetTitle(),
Height: block.Height,
PreBlockHash: block.ParentHash,
PreStateHash: preStateHash,
MainBlockHash: localBlock.MainHash,
MainBlockHeight: localBlock.MainHeight,
}
if !pt.IsParaForkHeight(status.MainBlockHeight, pt.ForkLoopCheckCommitTxDone) {
status.PreBlockHash = block.ParentHash
status.PreStateHash = preStateHash
}
tx, err := pt.CreateRawMinerTx(&pt.ParacrossMinerAction{
Status: status,
IsSelfConsensus: client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight),
......
......@@ -277,12 +277,12 @@ func (rep *Replica) hasRequest(REQ *pb.Request) bool {
return rep.hasRequestPrepare(REQ)
case *pb.Request_Commit:
return rep.hasRequestCommit(REQ)
case *pb.Request_Viewchange:
return rep.hasRequestViewChange(REQ)
case *pb.Request_Ack:
return rep.hasRequestAck(REQ)
case *pb.Request_Newview:
return rep.hasRequestNewView(REQ)
//case *pb.Request_Viewchange:
// return rep.hasRequestViewChange(REQ)
//case *pb.Request_Ack:
// return rep.hasRequestAck(REQ)
//case *pb.Request_Newview:
// return rep.hasRequestNewView(REQ)
default:
return false
}
......@@ -335,160 +335,160 @@ func (rep *Replica) hasRequestCommit(REQ *pb.Request) bool {
return false
}
func (rep *Replica) hasRequestViewChange(REQ *pb.Request) bool {
view := REQ.GetViewchange().View
replica := REQ.GetViewchange().Replica
for _, req := range rep.requests["view-change"] {
v := req.GetViewchange().View
r := req.GetViewchange().Replica
if v == view && r == replica {
return true
}
}
return false
}
func (rep *Replica) hasRequestAck(REQ *pb.Request) bool {
view := REQ.GetAck().View
replica := REQ.GetAck().Replica
viewchanger := REQ.GetAck().Viewchanger
for _, req := range rep.requests["ack"] {
v := req.GetAck().View
r := req.GetAck().Replica
vc := req.GetAck().Viewchanger
if v == view && r == replica && vc == viewchanger {
return true
}
}
return false
}
func (rep *Replica) hasRequestNewView(REQ *pb.Request) bool {
view := REQ.GetNewview().View
for _, req := range rep.requests["new-view"] {
v := req.GetNewview().View
if v == view {
return true
}
}
return false
}
//func (rep *Replica) hasRequestViewChange(REQ *pb.Request) bool {
// view := REQ.GetViewchange().View
// replica := REQ.GetViewchange().Replica
// for _, req := range rep.requests["view-change"] {
// v := req.GetViewchange().View
// r := req.GetViewchange().Replica
// if v == view && r == replica {
// return true
// }
// }
// return false
//}
//func (rep *Replica) hasRequestAck(REQ *pb.Request) bool {
// view := REQ.GetAck().View
// replica := REQ.GetAck().Replica
// viewchanger := REQ.GetAck().Viewchanger
// for _, req := range rep.requests["ack"] {
// v := req.GetAck().View
// r := req.GetAck().Replica
// vc := req.GetAck().Viewchanger
// if v == view && r == replica && vc == viewchanger {
// return true
// }
// }
// return false
//}
//func (rep *Replica) hasRequestNewView(REQ *pb.Request) bool {
// view := REQ.GetNewview().View
// for _, req := range rep.requests["new-view"] {
// v := req.GetNewview().View
// if v == view {
// return true
// }
// }
// return false
//}
// Clear requests
func (rep *Replica) clearRequestsBySeq(sequence uint32) {
rep.clearRequestClients()
rep.clearRequestPrepreparesBySeq(sequence)
rep.clearRequestPreparesBySeq(sequence)
rep.clearRequestCommitsBySeq(sequence)
//rep.clearRequestCheckpointsBySeq(sequence)
}
func (rep *Replica) clearRequestClients() {
clientReqs := rep.requests["client"]
lastTimestamp := rep.theLastReply().Timestamp
for idx, req := range rep.requests["client"] {
timestamp := req.GetClient().Timestamp
if lastTimestamp >= timestamp && idx < (len(clientReqs)-1) {
clientReqs = append(clientReqs[:idx], clientReqs[idx+1:]...)
} else {
clientReqs = clientReqs[:idx-1]
}
}
rep.requests["client"] = clientReqs
}
func (rep *Replica) clearRequestPrepreparesBySeq(sequence uint32) {
prePrepares := rep.requests["pre-prepare"]
for idx, req := range rep.requests["pre-prepare"] {
s := req.GetPreprepare().Sequence
if s <= sequence && idx < (len(prePrepares)-1) {
prePrepares = append(prePrepares[:idx], prePrepares[idx+1:]...)
} else {
prePrepares = prePrepares[:idx-1]
}
}
rep.requests["pre-prepare"] = prePrepares
}
func (rep *Replica) clearRequestPreparesBySeq(sequence uint32) {
prepares := rep.requests["prepare"]
for idx, req := range rep.requests["prepare"] {
s := req.GetPrepare().Sequence
if s <= sequence && idx < (len(prepares)-1) {
prepares = append(prepares[:idx], prepares[idx+1:]...)
} else {
prepares = prepares[:idx-1]
}
}
rep.requests["prepare"] = prepares
}
func (rep *Replica) clearRequestCommitsBySeq(sequence uint32) {
commits := rep.requests["commit"]
for idx, req := range rep.requests["commit"] {
s := req.GetCommit().Sequence
if s <= sequence && idx < (len(commits)-1) {
commits = append(commits[:idx], commits[idx+1:]...)
} else {
commits = commits[:idx-1]
}
}
rep.requests["commit"] = commits
}
func (rep *Replica) clearRequestCheckpointsBySeq(sequence uint32) {
checkpoints := rep.requests["checkpoint"]
for idx, req := range rep.requests["checkpoint"] {
s := req.GetCheckpoint().Sequence
if s <= sequence && idx < (len(checkpoints)-1) {
checkpoints = append(checkpoints[:idx], checkpoints[idx+1:]...)
} else {
checkpoints = checkpoints[:idx-1]
}
}
rep.requests["checkpoint"] = checkpoints
}
func (rep *Replica) clearRequestsByView(view uint32) {
rep.clearRequestPrepreparesByView(view)
rep.clearRequestPreparesByView(view)
rep.clearRequestCommitsByView(view)
//add others
}
func (rep *Replica) clearRequestPrepreparesByView(view uint32) {
prePrepares := rep.requests["pre-prepare"]
for idx, req := range rep.requests["pre-prepare"] {
v := req.GetPreprepare().View
if v < view {
prePrepares = append(prePrepares[:idx], prePrepares[idx+1:]...)
}
}
rep.requests["pre-prepare"] = prePrepares
}
func (rep *Replica) clearRequestPreparesByView(view uint32) {
prepares := rep.requests["prepare"]
for idx, req := range rep.requests["prepare"] {
v := req.GetPrepare().View
if v < view {
prepares = append(prepares[:idx], prepares[idx+1:]...)
}
}
rep.requests["prepare"] = prepares
}
func (rep *Replica) clearRequestCommitsByView(view uint32) {
commits := rep.requests["commit"]
for idx, req := range rep.requests["commit"] {
v := req.GetCommit().View
if v < view {
commits = append(commits[:idx], commits[idx+1:]...)
}
}
rep.requests["commit"] = commits
}
//func (rep *Replica) clearRequestsBySeq(sequence uint32) {
// rep.clearRequestClients()
// rep.clearRequestPrepreparesBySeq(sequence)
// rep.clearRequestPreparesBySeq(sequence)
// rep.clearRequestCommitsBySeq(sequence)
// //rep.clearRequestCheckpointsBySeq(sequence)
//}
//func (rep *Replica) clearRequestClients() {
// clientReqs := rep.requests["client"]
// lastTimestamp := rep.theLastReply().Timestamp
// for idx, req := range rep.requests["client"] {
// timestamp := req.GetClient().Timestamp
// if lastTimestamp >= timestamp && idx < (len(clientReqs)-1) {
// clientReqs = append(clientReqs[:idx], clientReqs[idx+1:]...)
// } else {
// clientReqs = clientReqs[:idx-1]
// }
// }
// rep.requests["client"] = clientReqs
//}
//
//func (rep *Replica) clearRequestPrepreparesBySeq(sequence uint32) {
// prePrepares := rep.requests["pre-prepare"]
// for idx, req := range rep.requests["pre-prepare"] {
// s := req.GetPreprepare().Sequence
// if s <= sequence && idx < (len(prePrepares)-1) {
// prePrepares = append(prePrepares[:idx], prePrepares[idx+1:]...)
// } else {
// prePrepares = prePrepares[:idx-1]
// }
// }
// rep.requests["pre-prepare"] = prePrepares
//}
//
//func (rep *Replica) clearRequestPreparesBySeq(sequence uint32) {
// prepares := rep.requests["prepare"]
// for idx, req := range rep.requests["prepare"] {
// s := req.GetPrepare().Sequence
// if s <= sequence && idx < (len(prepares)-1) {
// prepares = append(prepares[:idx], prepares[idx+1:]...)
// } else {
// prepares = prepares[:idx-1]
// }
// }
// rep.requests["prepare"] = prepares
//}
//
//func (rep *Replica) clearRequestCommitsBySeq(sequence uint32) {
// commits := rep.requests["commit"]
// for idx, req := range rep.requests["commit"] {
// s := req.GetCommit().Sequence
// if s <= sequence && idx < (len(commits)-1) {
// commits = append(commits[:idx], commits[idx+1:]...)
// } else {
// commits = commits[:idx-1]
// }
// }
// rep.requests["commit"] = commits
//}
//
//func (rep *Replica) clearRequestCheckpointsBySeq(sequence uint32) {
// checkpoints := rep.requests["checkpoint"]
// for idx, req := range rep.requests["checkpoint"] {
// s := req.GetCheckpoint().Sequence
// if s <= sequence && idx < (len(checkpoints)-1) {
// checkpoints = append(checkpoints[:idx], checkpoints[idx+1:]...)
// } else {
// checkpoints = checkpoints[:idx-1]
// }
// }
// rep.requests["checkpoint"] = checkpoints
//}
//
//func (rep *Replica) clearRequestsByView(view uint32) {
// rep.clearRequestPrepreparesByView(view)
// rep.clearRequestPreparesByView(view)
// rep.clearRequestCommitsByView(view)
// //add others
//}
//
//func (rep *Replica) clearRequestPrepreparesByView(view uint32) {
// prePrepares := rep.requests["pre-prepare"]
// for idx, req := range rep.requests["pre-prepare"] {
// v := req.GetPreprepare().View
// if v < view {
// prePrepares = append(prePrepares[:idx], prePrepares[idx+1:]...)
// }
// }
// rep.requests["pre-prepare"] = prePrepares
//}
//
//func (rep *Replica) clearRequestPreparesByView(view uint32) {
// prepares := rep.requests["prepare"]
// for idx, req := range rep.requests["prepare"] {
// v := req.GetPrepare().View
// if v < view {
// prepares = append(prepares[:idx], prepares[idx+1:]...)
// }
// }
// rep.requests["prepare"] = prepares
//}
//
//func (rep *Replica) clearRequestCommitsByView(view uint32) {
// commits := rep.requests["commit"]
// for idx, req := range rep.requests["commit"] {
// v := req.GetCommit().View
// if v < view {
// commits = append(commits[:idx], commits[idx+1:]...)
// }
// }
// rep.requests["commit"] = commits
//}
// Handle requests
......@@ -511,17 +511,17 @@ func (rep *Replica) handleRequest(REQ *pb.Request) {
rep.handleRequestCommit(REQ)
case *pb.Request_Checkpoint:
rep.handleRequestCheckpoint(REQ)
case *pb.Request_Viewchange:
rep.handleRequestViewChange(REQ)
case *pb.Request_Ack:
rep.handleRequestAck(REQ)
//case *pb.Request_Checkpoint:
//
// rep.handleRequestCheckpoint(REQ)
//
//case *pb.Request_Viewchange:
//
// rep.handleRequestViewChange(REQ)
//
//case *pb.Request_Ack:
//
// rep.handleRequestAck(REQ)
default:
plog.Info("Replica %d received unrecognized request type\n", rep.ID)
......@@ -766,599 +766,599 @@ func (rep *Replica) handleRequestCommit(REQ *pb.Request) {
}
func (rep *Replica) handleRequestCheckpoint(REQ *pb.Request) {
sequence := REQ.GetCheckpoint().Sequence
if !rep.sequenceInRange(sequence) {
return
}
digest := REQ.GetCheckpoint().Digest
replica := REQ.GetCheckpoint().Replica
count := 0
for _, req := range rep.requests["checkpoint"] {
s := req.GetCheckpoint().Sequence
d := req.GetCheckpoint().Digest
r := req.GetCheckpoint().Replica
if s != sequence || !EQ(d, digest) {
continue
}
if r == replica {
plog.Info("Replica %d sent multiple checkpoint requests\n", replica)
//continue
}
count++
if !rep.overTwoThirds(count) {
continue
}
// rep.clearEntries(sequence)
rep.clearRequestsBySeq(sequence)
checkpoint := ToCheckpoint(sequence, digest)
rep.addCheckpoint(checkpoint)
plog.Info("checkpoint and clear request done")
return
}
}
func (rep *Replica) handleRequestViewChange(REQ *pb.Request) {
view := REQ.GetViewchange().View
if view < rep.view {
return
}
reqViewChange := REQ.GetViewchange()
for _, prep := range reqViewChange.GetPreps() {
v := prep.View
s := prep.Sequence
if v >= view || !rep.sequenceInRange(s) {
return
}
}
for _, prePrep := range reqViewChange.GetPrepreps() {
v := prePrep.View
s := prePrep.Sequence
if v >= view || !rep.sequenceInRange(s) {
return
}
}
for _, checkpoint := range reqViewChange.GetCheckpoints() {
s := checkpoint.Sequence
if !rep.sequenceInRange(s) {
return
}
}
if rep.hasRequest(REQ) {
return
}
rep.logRequest(REQ)
viewchanger := reqViewChange.Replica
req := ToRequestAck(
view,
rep.ID,
viewchanger,
ReqDigest(REQ))
go func() {
rep.requestChan <- req
}()
}
func (rep *Replica) handleRequestAck(REQ *pb.Request) {
view := REQ.GetAck().View
primaryID := rep.newPrimary(view)
if rep.ID != primaryID {
return
}
if rep.hasRequest(REQ) {
return
}
rep.logRequest(REQ)
replica := REQ.GetAck().Replica
viewchanger := REQ.GetAck().Viewchanger
digest := REQ.GetAck().Digest
reqViewChange := make(chan *pb.Request, 1)
twoThirds := make(chan bool, 1)
go func() {
for _, req := range rep.requests["view-change"] {
v := req.GetViewchange().View
vc := req.GetViewchange().Replica
if v == view && vc == viewchanger {
reqViewChange <- req
}
}
reqViewChange <- nil
}()
go func() {
count := 0
for _, req := range rep.requests["ack"] {
v := req.GetAck().View
r := req.GetAck().Replica
vc := req.GetAck().Viewchanger
d := req.GetAck().Digest
if v != view || vc != viewchanger || !EQ(d, digest) {
continue
}
if r == replica {
plog.Info("Replica %d sent multiple ack requests\n", replica)
continue
}
count++
if rep.twoThirds(count) {
twoThirds <- true
return
}
}
twoThirds <- false
}()
req := <-reqViewChange
if req == nil || !<-twoThirds {
return
}
rep.logPendingVC(req)
// When to send new view?
rep.requestNewView(view)
}
func (rep *Replica) handleRequestNewView(REQ *pb.Request) {
view := REQ.GetNewview().View
if view == 0 || view < rep.view {
return
}
replica := REQ.GetNewview().Replica
primary := rep.newPrimary(view)
if replica != primary {
return
}
if rep.hasRequest(REQ) {
return
}
rep.logRequest(REQ)
rep.processNewView(REQ)
}
func (rep *Replica) correctViewChanges(viewChanges []*pb.ViewChange) (requests []*pb.Request) {
// Returns requests if correct, else returns nil
valid := false
for _, vc := range viewChanges {
for _, req := range rep.requests["view-change"] {
d := ReqDigest(req)
if !EQ(d, vc.Digest) {
continue
}
requests = append(requests, req)
v := req.GetViewchange().View
// VIEW or rep.view??
if v == rep.view {
valid = true
break
}
}
if !valid {
return nil
}
}
if rep.isPrimary(rep.ID) {
reps := make(map[uint32]int)
valid = false
for _, req := range rep.requests["ack"] {
reqAck := req.GetAck()
reps[reqAck.Replica]++
if rep.twoThirds(reps[reqAck.Replica]) { //-2
valid = true
break
}
}
if !valid {
return nil
}
}
return
}
func (rep *Replica) correctSummaries(requests []*pb.Request, summaries []*pb.Summary) (correct bool) {
// Verify SUMMARIES
var start uint32
var digest []byte
digests := make(map[uint32][]byte)
for _, summary := range summaries {
s := summary.Sequence
d := summary.Digest
if _d, ok := digests[s]; ok && !EQ(_d, d) {
return
} else if !ok {
digests[s] = d
}
if s < start || start == uint32(0) {
start = s
digest = d
}
}
var A1 []*pb.Request
var A2 []*pb.Request
valid := false
for _, req := range requests {
reqViewChange := req.GetViewchange()
s := reqViewChange.Sequence
if s <= start {
A1 = append(A1, req)
}
checkpoints := reqViewChange.GetCheckpoints()
for _, checkpoint := range checkpoints {
if checkpoint.Sequence == start && EQ(checkpoint.Digest, digest) {
A2 = append(A2, req)
break
}
}
if rep.twoThirds(len(A1)) && rep.oneThird(len(A2)) {
valid = true
break
}
}
if !valid {
return
}
end := start + CheckPointPeriod*ConstantFactor
for seq := start; seq <= end; seq++ {
valid = false
for _, summary := range summaries {
if summary.Sequence != seq {
continue
}
if summary.Digest != nil {
var view uint32
for _, req := range requests {
reqViewChange := req.GetViewchange()
preps := reqViewChange.GetPreps()
for _, prep := range preps {
s := prep.Sequence
d := prep.Digest
if s != summary.Sequence || !EQ(d, summary.Digest) {
continue
}
v := prep.View
if v > view {
view = v
}
}
}
verifiedA1 := make(chan bool, 1)
// Verify A1
go func() {
var A1 []*pb.Request
FOR_LOOP:
for _, req := range requests {
reqViewChange := req.GetViewchange()
s := reqViewChange.Sequence
if s >= summary.Sequence {
continue
}
preps := reqViewChange.GetPreps()
for _, prep := range preps {
s = prep.Sequence
if s != summary.Sequence {
continue
}
d := prep.Digest
v := prep.View
if v > view || (v == view && !EQ(d, summary.Digest)) {
continue FOR_LOOP
}
}
A1 = append(A1, req)
if rep.twoThirds(len(A1)) {
verifiedA1 <- true
return
}
}
verifiedA1 <- false
}()
verifiedA2 := make(chan bool, 1)
// Verify A2
go func() {
var A2 []*pb.Request
for _, req := range requests {
reqViewChange := req.GetViewchange()
prePreps := reqViewChange.GetPrepreps()
for _, prePrep := range prePreps {
s := prePrep.Sequence
d := prePrep.Digest
v := prePrep.View
if s == summary.Sequence && EQ(d, summary.Digest) && v >= view {
A2 = append(A2, req)
break
}
}
if rep.oneThird(len(A2)) {
verifiedA2 <- true
return
}
}
verifiedA2 <- false
}()
if !<-verifiedA1 || !<-verifiedA2 {
continue
}
valid = true
break
} else {
var A1 []*pb.Request
FOR_LOOP:
for _, req := range requests {
reqViewChange := req.GetViewchange()
s := reqViewChange.Sequence
if s >= summary.Sequence {
continue
}
preps := reqViewChange.GetPreps()
for _, prep := range preps {
if prep.Sequence == summary.Sequence {
continue FOR_LOOP
}
}
A1 = append(A1, req)
if rep.twoThirds(len(A1)) {
valid = true
break
}
}
if valid {
break
}
}
}
if !valid {
return
}
}
return true
}
func (rep *Replica) processNewView(REQ *pb.Request) (success bool) {
if rep.activeView {
return
}
reqNewView := REQ.GetNewview()
viewChanges := reqNewView.GetViewchanges()
requests := rep.correctViewChanges(viewChanges)
if requests == nil {
return
}
summaries := reqNewView.GetSummaries()
correct := rep.correctSummaries(requests, summaries)
if !correct {
return
}
var h uint32
for _, checkpoint := range rep.checkpoints {
if checkpoint.Sequence < h || h == uint32(0) {
h = checkpoint.Sequence
}
}
var s uint32
for _, summary := range summaries {
if summary.Sequence < s || s == uint32(0) {
s = summary.Sequence
}
if summary.Sequence > h {
valid := false
for _, req := range rep.requests["view-change"] { //in
if EQ(ReqDigest(req), summary.Digest) {
valid = true
break
}
}
if !valid {
return
}
}
}
if h < s {
return
}
// Process new view
rep.activeView = true
for _, summary := range summaries {
if rep.ID != reqNewView.Replica {
req := ToRequestPrepare(
reqNewView.View,
summary.Sequence,
summary.Digest,
rep.ID) // the backup sends/logs prepare
go func() {
if !rep.hasRequest(req) {
rep.requestChan <- req
}
}()
if summary.Sequence <= h {
continue
}
if !rep.hasRequest(req) {
rep.logRequest(req)
}
} else {
if summary.Sequence <= h {
break
}
}
req := ToRequestPreprepare(
reqNewView.View,
summary.Sequence,
summary.Digest,
reqNewView.Replica) // new primary pre-prepares
if !rep.hasRequest(req) {
rep.logRequest(req)
}
}
var maxSequence uint32
for _, req := range rep.requests["pre-prepare"] {
reqPrePrepare := req.GetPreprepare()
if reqPrePrepare.Sequence > maxSequence {
maxSequence = reqPrePrepare.Sequence
}
}
rep.sequence = maxSequence
return true
}
func (rep *Replica) prePrepBySequence(sequence uint32) []*pb.Entry {
var view uint32
var requests []*pb.Request
for _, req := range rep.requests["pre-prepare"] {
v := req.GetPreprepare().View
s := req.GetPreprepare().Sequence
if v >= view && s == sequence {
view = v
requests = append(requests, req)
}
}
if requests == nil {
return nil
}
var prePreps []*pb.Entry
for _, req := range requests {
v := req.GetPreprepare().View
if v == view {
s := req.GetPreprepare().Sequence
d := req.GetPreprepare().Digest
prePrep := ToEntry(s, d, v)
prePreps = append(prePreps, prePrep)
}
}
FOR_LOOP:
for _, prePrep := range prePreps {
for _, req := range rep.allRequests() { //TODO: optimize
if EQ(ReqDigest(req), prePrep.Digest) {
continue FOR_LOOP
}
}
return nil
}
return prePreps
}
func (rep *Replica) prepBySequence(sequence uint32) ([]*pb.Entry, []*pb.Entry) {
prePreps := rep.prePrepBySequence(sequence)
if prePreps == nil {
return nil, nil
}
var preps []*pb.Entry
FOR_LOOP:
for _, prePrep := range prePreps {
view := prePrep.View
digest := prePrep.Digest
replicas := make(map[uint32]int)
for _, req := range rep.requests["prepare"] {
reqPrepare := req.GetPrepare()
v := reqPrepare.View
s := reqPrepare.Sequence
d := reqPrepare.Digest
if v == view && s == sequence && EQ(d, digest) {
r := reqPrepare.Replica
replicas[r]++
if rep.twoThirds(replicas[r]) {
prep := ToEntry(s, d, v)
preps = append(preps, prep)
continue FOR_LOOP
}
}
}
return prePreps, nil
}
return prePreps, preps
}
//func (rep *Replica) handleRequestCheckpoint(REQ *pb.Request) {
//
// sequence := REQ.GetCheckpoint().Sequence
//
// if !rep.sequenceInRange(sequence) {
// return
// }
//
// digest := REQ.GetCheckpoint().Digest
// replica := REQ.GetCheckpoint().Replica
//
// count := 0
// for _, req := range rep.requests["checkpoint"] {
// s := req.GetCheckpoint().Sequence
// d := req.GetCheckpoint().Digest
// r := req.GetCheckpoint().Replica
// if s != sequence || !EQ(d, digest) {
// continue
// }
// if r == replica {
// plog.Info("Replica %d sent multiple checkpoint requests\n", replica)
// //continue
// }
// count++
// if !rep.overTwoThirds(count) {
// continue
// }
// // rep.clearEntries(sequence)
// rep.clearRequestsBySeq(sequence)
// checkpoint := ToCheckpoint(sequence, digest)
// rep.addCheckpoint(checkpoint)
// plog.Info("checkpoint and clear request done")
// return
// }
//
//}
//func (rep *Replica) handleRequestViewChange(REQ *pb.Request) {
//
// view := REQ.GetViewchange().View
//
// if view < rep.view {
// return
// }
//
// reqViewChange := REQ.GetViewchange()
//
// for _, prep := range reqViewChange.GetPreps() {
// v := prep.View
// s := prep.Sequence
// if v >= view || !rep.sequenceInRange(s) {
// return
// }
// }
//
// for _, prePrep := range reqViewChange.GetPrepreps() {
// v := prePrep.View
// s := prePrep.Sequence
// if v >= view || !rep.sequenceInRange(s) {
// return
// }
// }
//
// for _, checkpoint := range reqViewChange.GetCheckpoints() {
// s := checkpoint.Sequence
// if !rep.sequenceInRange(s) {
// return
// }
// }
//
// if rep.hasRequest(REQ) {
// return
// }
//
// rep.logRequest(REQ)
//
// viewchanger := reqViewChange.Replica
//
// req := ToRequestAck(
// view,
// rep.ID,
// viewchanger,
// ReqDigest(REQ))
//
// go func() {
// rep.requestChan <- req
// }()
//}
//func (rep *Replica) handleRequestAck(REQ *pb.Request) {
//
// view := REQ.GetAck().View
// primaryID := rep.newPrimary(view)
//
// if rep.ID != primaryID {
// return
// }
//
// if rep.hasRequest(REQ) {
// return
// }
//
// rep.logRequest(REQ)
//
// replica := REQ.GetAck().Replica
// viewchanger := REQ.GetAck().Viewchanger
// digest := REQ.GetAck().Digest
//
// reqViewChange := make(chan *pb.Request, 1)
// twoThirds := make(chan bool, 1)
//
// go func() {
// for _, req := range rep.requests["view-change"] {
// v := req.GetViewchange().View
// vc := req.GetViewchange().Replica
// if v == view && vc == viewchanger {
// reqViewChange <- req
// }
// }
// reqViewChange <- nil
// }()
//
// go func() {
// count := 0
// for _, req := range rep.requests["ack"] {
// v := req.GetAck().View
// r := req.GetAck().Replica
// vc := req.GetAck().Viewchanger
// d := req.GetAck().Digest
// if v != view || vc != viewchanger || !EQ(d, digest) {
// continue
// }
// if r == replica {
// plog.Info("Replica %d sent multiple ack requests\n", replica)
// continue
// }
// count++
// if rep.twoThirds(count) {
// twoThirds <- true
// return
// }
// }
// twoThirds <- false
// }()
//
// req := <-reqViewChange
//
// if req == nil || !<-twoThirds {
// return
// }
//
// rep.logPendingVC(req)
//
// // When to send new view?
// rep.requestNewView(view)
//}
//func (rep *Replica) handleRequestNewView(REQ *pb.Request) {
//
// view := REQ.GetNewview().View
//
// if view == 0 || view < rep.view {
// return
// }
//
// replica := REQ.GetNewview().Replica
// primary := rep.newPrimary(view)
//
// if replica != primary {
// return
// }
//
// if rep.hasRequest(REQ) {
// return
// }
//
// rep.logRequest(REQ)
//
// rep.processNewView(REQ)
//}
//func (rep *Replica) correctViewChanges(viewChanges []*pb.ViewChange) (requests []*pb.Request) {
//
// // Returns requests if correct, else returns nil
//
// valid := false
// for _, vc := range viewChanges {
// for _, req := range rep.requests["view-change"] {
// d := ReqDigest(req)
// if !EQ(d, vc.Digest) {
// continue
// }
// requests = append(requests, req)
// v := req.GetViewchange().View
// // VIEW or rep.view??
// if v == rep.view {
// valid = true
// break
// }
// }
// if !valid {
// return nil
// }
// }
//
// if rep.isPrimary(rep.ID) {
// reps := make(map[uint32]int)
// valid = false
// for _, req := range rep.requests["ack"] {
// reqAck := req.GetAck()
// reps[reqAck.Replica]++
// if rep.twoThirds(reps[reqAck.Replica]) { //-2
// valid = true
// break
// }
// }
// if !valid {
// return nil
// }
// }
//
// return
//}
//func (rep *Replica) correctSummaries(requests []*pb.Request, summaries []*pb.Summary) (correct bool) {
//
// // Verify SUMMARIES
//
// var start uint32
// var digest []byte
// digests := make(map[uint32][]byte)
//
// for _, summary := range summaries {
// s := summary.Sequence
// d := summary.Digest
// if _d, ok := digests[s]; ok && !EQ(_d, d) {
// return
// } else if !ok {
// digests[s] = d
// }
// if s < start || start == uint32(0) {
// start = s
// digest = d
// }
// }
//
// var A1 []*pb.Request
// var A2 []*pb.Request
//
// valid := false
// for _, req := range requests {
// reqViewChange := req.GetViewchange()
// s := reqViewChange.Sequence
// if s <= start {
// A1 = append(A1, req)
// }
// checkpoints := reqViewChange.GetCheckpoints()
// for _, checkpoint := range checkpoints {
// if checkpoint.Sequence == start && EQ(checkpoint.Digest, digest) {
// A2 = append(A2, req)
// break
// }
// }
// if rep.twoThirds(len(A1)) && rep.oneThird(len(A2)) {
// valid = true
// break
// }
// }
//
// if !valid {
// return
// }
//
// end := start + CheckPointPeriod*ConstantFactor
//
// for seq := start; seq <= end; seq++ {
//
// valid = false
//
// for _, summary := range summaries {
//
// if summary.Sequence != seq {
// continue
// }
//
// if summary.Digest != nil {
//
// var view uint32
//
// for _, req := range requests {
// reqViewChange := req.GetViewchange()
// preps := reqViewChange.GetPreps()
// for _, prep := range preps {
// s := prep.Sequence
// d := prep.Digest
// if s != summary.Sequence || !EQ(d, summary.Digest) {
// continue
// }
// v := prep.View
// if v > view {
// view = v
// }
// }
// }
//
// verifiedA1 := make(chan bool, 1)
//
// // Verify A1
// go func() {
//
// var A1 []*pb.Request
//
// FOR_LOOP:
// for _, req := range requests {
// reqViewChange := req.GetViewchange()
// s := reqViewChange.Sequence
// if s >= summary.Sequence {
// continue
// }
// preps := reqViewChange.GetPreps()
// for _, prep := range preps {
// s = prep.Sequence
// if s != summary.Sequence {
// continue
// }
// d := prep.Digest
// v := prep.View
// if v > view || (v == view && !EQ(d, summary.Digest)) {
// continue FOR_LOOP
// }
// }
// A1 = append(A1, req)
// if rep.twoThirds(len(A1)) {
// verifiedA1 <- true
// return
// }
// }
// verifiedA1 <- false
// }()
//
// verifiedA2 := make(chan bool, 1)
//
// // Verify A2
// go func() {
//
// var A2 []*pb.Request
//
// for _, req := range requests {
// reqViewChange := req.GetViewchange()
// prePreps := reqViewChange.GetPrepreps()
// for _, prePrep := range prePreps {
// s := prePrep.Sequence
// d := prePrep.Digest
// v := prePrep.View
// if s == summary.Sequence && EQ(d, summary.Digest) && v >= view {
// A2 = append(A2, req)
// break
// }
// }
// if rep.oneThird(len(A2)) {
// verifiedA2 <- true
// return
// }
// }
// verifiedA2 <- false
// }()
//
// if !<-verifiedA1 || !<-verifiedA2 {
// continue
// }
//
// valid = true
// break
//
// } else {
//
// var A1 []*pb.Request
//
// FOR_LOOP:
//
// for _, req := range requests {
//
// reqViewChange := req.GetViewchange()
//
// s := reqViewChange.Sequence
//
// if s >= summary.Sequence {
// continue
// }
//
// preps := reqViewChange.GetPreps()
// for _, prep := range preps {
// if prep.Sequence == summary.Sequence {
// continue FOR_LOOP
// }
// }
//
// A1 = append(A1, req)
// if rep.twoThirds(len(A1)) {
// valid = true
// break
// }
// }
// if valid {
// break
// }
// }
// }
// if !valid {
// return
// }
// }
//
// return true
//}
//func (rep *Replica) processNewView(REQ *pb.Request) (success bool) {
//
// if rep.activeView {
// return
// }
//
// reqNewView := REQ.GetNewview()
//
// viewChanges := reqNewView.GetViewchanges()
// requests := rep.correctViewChanges(viewChanges)
//
// if requests == nil {
// return
// }
//
// summaries := reqNewView.GetSummaries()
// correct := rep.correctSummaries(requests, summaries)
//
// if !correct {
// return
// }
//
// var h uint32
// for _, checkpoint := range rep.checkpoints {
// if checkpoint.Sequence < h || h == uint32(0) {
// h = checkpoint.Sequence
// }
// }
//
// var s uint32
// for _, summary := range summaries {
// if summary.Sequence < s || s == uint32(0) {
// s = summary.Sequence
// }
// if summary.Sequence > h {
// valid := false
// for _, req := range rep.requests["view-change"] { //in
// if EQ(ReqDigest(req), summary.Digest) {
// valid = true
// break
// }
// }
// if !valid {
// return
// }
// }
// }
//
// if h < s {
// return
// }
//
// // Process new view
// rep.activeView = true
//
// for _, summary := range summaries {
//
// if rep.ID != reqNewView.Replica {
// req := ToRequestPrepare(
// reqNewView.View,
// summary.Sequence,
// summary.Digest,
// rep.ID) // the backup sends/logs prepare
//
// go func() {
// if !rep.hasRequest(req) {
// rep.requestChan <- req
// }
// }()
// if summary.Sequence <= h {
// continue
// }
//
// if !rep.hasRequest(req) {
// rep.logRequest(req)
// }
// } else {
// if summary.Sequence <= h {
// break
// }
// }
//
// req := ToRequestPreprepare(
// reqNewView.View,
// summary.Sequence,
// summary.Digest,
// reqNewView.Replica) // new primary pre-prepares
//
// if !rep.hasRequest(req) {
// rep.logRequest(req)
// }
// }
//
// var maxSequence uint32
// for _, req := range rep.requests["pre-prepare"] {
// reqPrePrepare := req.GetPreprepare()
// if reqPrePrepare.Sequence > maxSequence {
// maxSequence = reqPrePrepare.Sequence
// }
// }
// rep.sequence = maxSequence
// return true
//}
//func (rep *Replica) prePrepBySequence(sequence uint32) []*pb.Entry {
// var view uint32
// var requests []*pb.Request
// for _, req := range rep.requests["pre-prepare"] {
// v := req.GetPreprepare().View
// s := req.GetPreprepare().Sequence
// if v >= view && s == sequence {
// view = v
// requests = append(requests, req)
// }
// }
// if requests == nil {
// return nil
// }
// var prePreps []*pb.Entry
// for _, req := range requests {
// v := req.GetPreprepare().View
// if v == view {
// s := req.GetPreprepare().Sequence
// d := req.GetPreprepare().Digest
// prePrep := ToEntry(s, d, v)
// prePreps = append(prePreps, prePrep)
// }
// }
//FOR_LOOP:
// for _, prePrep := range prePreps {
// for _, req := range rep.allRequests() { //TODO: optimize
// if EQ(ReqDigest(req), prePrep.Digest) {
// continue FOR_LOOP
// }
// }
// return nil
// }
// return prePreps
//}
//
//func (rep *Replica) prepBySequence(sequence uint32) ([]*pb.Entry, []*pb.Entry) {
//
// prePreps := rep.prePrepBySequence(sequence)
//
// if prePreps == nil {
// return nil, nil
// }
//
// var preps []*pb.Entry
//
//FOR_LOOP:
// for _, prePrep := range prePreps {
//
// view := prePrep.View
// digest := prePrep.Digest
//
// replicas := make(map[uint32]int)
// for _, req := range rep.requests["prepare"] {
// reqPrepare := req.GetPrepare()
// v := reqPrepare.View
// s := reqPrepare.Sequence
// d := reqPrepare.Digest
// if v == view && s == sequence && EQ(d, digest) {
// r := reqPrepare.Replica
// replicas[r]++
// if rep.twoThirds(replicas[r]) {
// prep := ToEntry(s, d, v)
// preps = append(preps, prep)
// continue FOR_LOOP
// }
// }
// }
// return prePreps, nil
// }
//
// return prePreps, preps
//}
/*
func (rep *Replica) prepBySequence(sequence uint32) *pb.Entry {
......@@ -1420,187 +1420,187 @@ func (rep *Replica) prePrepBySequence(sequence uint32) *pb.Entry {
}
*/
func (rep *Replica) requestViewChange(view uint32) {
if view != rep.view+1 {
return
}
rep.view = view
rep.activeView = false
var prePreps []*pb.Entry
var preps []*pb.Entry
start := rep.lowWaterMark() + 1
end := rep.highWaterMark()
for s := start; s <= end; s++ {
_prePreps, _preps := rep.prepBySequence(s)
if _prePreps != nil {
prePreps = append(prePreps, _prePreps...)
}
if _preps != nil {
preps = append(preps, _preps...)
}
}
sequence := rep.lowWaterMark()
req := ToRequestViewChange(
view,
sequence,
rep.checkpoints, //
preps,
prePreps,
rep.ID)
rep.logRequest(req)
go func() {
rep.requestChan <- req
}()
rep.clearRequestsByView(view)
}
func (rep *Replica) createNewView(view uint32) (request *pb.Request) {
// Returns RequestNewView if successful, else returns nil
// create viewChanges
viewChanges := make([]*pb.ViewChange, len(rep.pendingVC))
for idx := range viewChanges {
req := rep.pendingVC[idx]
viewchanger := req.GetViewchange().Replica
vc := ToViewChange(viewchanger, ReqDigest(req))
viewChanges[idx] = vc
}
var summaries []*pb.Summary
var summary *pb.Summary
start := rep.lowWaterMark() + 1
end := rep.highWaterMark()
// select starting checkpoint
FOR_LOOP_1:
for seq := start; seq <= end; seq++ {
overLWM := 0
var digest []byte
digests := make(map[string]int)
for _, req := range rep.pendingVC {
reqViewChange := req.GetViewchange()
if reqViewChange.Sequence <= seq {
overLWM++
}
for _, checkpoint := range reqViewChange.GetCheckpoints() {
if checkpoint.Sequence == seq {
d := checkpoint.Digest
digests[string(d)]++
if rep.oneThird(digests[string(d)]) {
digest = d
break
}
}
}
if rep.twoThirds(overLWM) && rep.oneThird(digests[string(digest)]) {
summary = ToSummary(seq, digest)
continue FOR_LOOP_1
}
}
}
if summary == nil {
return
}
summaries = append(summaries, summary)
start = summary.Sequence
end = start + CheckPointPeriod*ConstantFactor
// select summaries
// TODO: optimize
FOR_LOOP_2:
for seq := start; seq <= end; seq++ {
for _, REQ := range rep.pendingVC {
sequence := REQ.GetViewchange().Sequence
if sequence != seq {
continue
}
var A1 []*pb.Request
var A2 []*pb.Request
view := REQ.GetViewchange().View
digest := ReqDigest(REQ)
FOR_LOOP_3:
for _, req := range rep.pendingVC {
reqViewChange := req.GetViewchange()
if reqViewChange.Sequence < sequence {
preps := reqViewChange.GetPreps()
for _, prep := range preps {
if prep.Sequence != sequence {
continue
}
if prep.View > view || (prep.View == view && !EQ(prep.Digest, digest)) {
continue FOR_LOOP_3
}
}
A1 = append(A1, req)
}
prePreps := reqViewChange.GetPrepreps()
for _, prePrep := range prePreps {
if prePrep.Sequence != sequence {
continue
}
if prePrep.View >= view && EQ(prePrep.Digest, digest) {
A2 = append(A2, req)
continue FOR_LOOP_3
}
}
}
if rep.twoThirds(len(A1)) && rep.oneThird(len(A2)) {
summary = ToSummary(sequence, digest)
summaries = append(summaries, summary)
continue FOR_LOOP_2
}
}
}
request = ToRequestNewView(view, viewChanges, summaries, rep.ID)
return
}
func (rep *Replica) requestNewView(view uint32) {
req := rep.createNewView(view)
if req == nil || rep.hasRequest(req) {
return
}
// Process new view
success := rep.processNewView(req)
if !success {
return
}
rep.logRequest(req)
go func() {
rep.requestChan <- req
}()
}
//func (rep *Replica) requestViewChange(view uint32) {
//
// if view != rep.view+1 {
// return
// }
// rep.view = view
// rep.activeView = false
//
// var prePreps []*pb.Entry
// var preps []*pb.Entry
//
// start := rep.lowWaterMark() + 1
// end := rep.highWaterMark()
//
// for s := start; s <= end; s++ {
// _prePreps, _preps := rep.prepBySequence(s)
// if _prePreps != nil {
// prePreps = append(prePreps, _prePreps...)
// }
// if _preps != nil {
// preps = append(preps, _preps...)
// }
// }
//
// sequence := rep.lowWaterMark()
//
// req := ToRequestViewChange(
// view,
// sequence,
// rep.checkpoints, //
// preps,
// prePreps,
// rep.ID)
//
// rep.logRequest(req)
//
// go func() {
// rep.requestChan <- req
// }()
//
// rep.clearRequestsByView(view)
//}
//
//func (rep *Replica) createNewView(view uint32) (request *pb.Request) {
//
// // Returns RequestNewView if successful, else returns nil
// // create viewChanges
// viewChanges := make([]*pb.ViewChange, len(rep.pendingVC))
//
// for idx := range viewChanges {
// req := rep.pendingVC[idx]
// viewchanger := req.GetViewchange().Replica
// vc := ToViewChange(viewchanger, ReqDigest(req))
// viewChanges[idx] = vc
// }
//
// var summaries []*pb.Summary
// var summary *pb.Summary
//
// start := rep.lowWaterMark() + 1
// end := rep.highWaterMark()
//
// // select starting checkpoint
//FOR_LOOP_1:
// for seq := start; seq <= end; seq++ {
//
// overLWM := 0
// var digest []byte
// digests := make(map[string]int)
//
// for _, req := range rep.pendingVC {
// reqViewChange := req.GetViewchange()
// if reqViewChange.Sequence <= seq {
// overLWM++
// }
// for _, checkpoint := range reqViewChange.GetCheckpoints() {
// if checkpoint.Sequence == seq {
// d := checkpoint.Digest
// digests[string(d)]++
// if rep.oneThird(digests[string(d)]) {
// digest = d
// break
// }
// }
// }
// if rep.twoThirds(overLWM) && rep.oneThird(digests[string(digest)]) {
// summary = ToSummary(seq, digest)
// continue FOR_LOOP_1
// }
// }
// }
//
// if summary == nil {
// return
// }
//
// summaries = append(summaries, summary)
//
// start = summary.Sequence
// end = start + CheckPointPeriod*ConstantFactor
//
// // select summaries
// // TODO: optimize
//FOR_LOOP_2:
// for seq := start; seq <= end; seq++ {
//
// for _, REQ := range rep.pendingVC {
//
// sequence := REQ.GetViewchange().Sequence
//
// if sequence != seq {
// continue
// }
//
// var A1 []*pb.Request
// var A2 []*pb.Request
//
// view := REQ.GetViewchange().View
// digest := ReqDigest(REQ)
//
// FOR_LOOP_3:
// for _, req := range rep.pendingVC {
//
// reqViewChange := req.GetViewchange()
//
// if reqViewChange.Sequence < sequence {
// preps := reqViewChange.GetPreps()
// for _, prep := range preps {
// if prep.Sequence != sequence {
// continue
// }
// if prep.View > view || (prep.View == view && !EQ(prep.Digest, digest)) {
// continue FOR_LOOP_3
// }
// }
// A1 = append(A1, req)
// }
// prePreps := reqViewChange.GetPrepreps()
// for _, prePrep := range prePreps {
// if prePrep.Sequence != sequence {
// continue
// }
// if prePrep.View >= view && EQ(prePrep.Digest, digest) {
// A2 = append(A2, req)
// continue FOR_LOOP_3
// }
// }
// }
//
// if rep.twoThirds(len(A1)) && rep.oneThird(len(A2)) {
// summary = ToSummary(sequence, digest)
// summaries = append(summaries, summary)
// continue FOR_LOOP_2
// }
// }
// }
//
// request = ToRequestNewView(view, viewChanges, summaries, rep.ID)
// return
//}
//
//func (rep *Replica) requestNewView(view uint32) {
//
// req := rep.createNewView(view)
//
// if req == nil || rep.hasRequest(req) {
// return
// }
//
// // Process new view
//
// success := rep.processNewView(req)
//
// if !success {
// return
// }
//
// rep.logRequest(req)
//
// go func() {
// rep.requestChan <- req
// }()
//
//}
......@@ -5,6 +5,7 @@
package raft
import (
"context"
"fmt"
"sync"
"time"
......@@ -35,14 +36,15 @@ type Client struct {
errorC <-chan error
snapshotter *snap.Snapshotter
validatorC <-chan bool
stopC chan<- struct{}
ctx context.Context
cancel context.CancelFunc
once sync.Once
}
// NewBlockstore create Raft Client
func NewBlockstore(cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, stopC chan<- struct{}) *Client {
func NewBlockstore(ctx context.Context, cfg *types.Consensus, snapshotter *snap.Snapshotter, proposeC chan<- *types.Block, commitC <-chan *types.Block, errorC <-chan error, validatorC <-chan bool, cancel context.CancelFunc) *Client {
c := drivers.NewBaseClient(cfg)
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, stopC: stopC}
client := &Client{BaseClient: c, proposeC: proposeC, snapshotter: snapshotter, validatorC: validatorC, commitC: commitC, errorC: errorC, ctx: ctx, cancel: cancel}
c.SetChild(client)
return client
}
......@@ -97,12 +99,12 @@ func (client *Client) SetQueueClient(c queue.Client) {
})
go client.EventLoop()
go client.readCommits(client.commitC, client.errorC)
go client.pollingTask(c)
go client.pollingTask()
}
// Close method
func (client *Client) Close() {
client.stopC <- struct{}{}
client.cancel()
rlog.Info("consensus raft closed")
}
......@@ -125,10 +127,14 @@ func (client *Client) CreateBlock() {
panic("This node encounter problem, exit.")
}
}
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-client.ctx.Done():
case <-ticker.C:
//如果leader节点突然挂了,不是打包节点,需要退出
if !isLeader {
if !mux.Load().(bool) {
rlog.Warn("I'm not the validator node anymore, exit.=============================")
break
}
......@@ -193,6 +199,8 @@ func (client *Client) CreateBlock() {
}
time.Sleep(time.Second * time.Duration(writeBlockSeconds))
}
}
}
// 向raft底层发送block
......@@ -219,17 +227,21 @@ func (client *Client) readCommits(commitC <-chan *types.Block, errorC <-chan err
if ok {
panic(err)
}
case <-client.ctx.Done():
return
}
}
}
//轮询任务,去检测本机器是否为validator节点,如果是,则执行打包任务
func (client *Client) pollingTask(c queue.Client) {
func (client *Client) pollingTask() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-client.ctx.Done():
return
case value, ok := <-client.validatorC:
//各个节点Block只初始化一次
client.once.Do(func() {
......@@ -237,9 +249,14 @@ func (client *Client) pollingTask(c queue.Client) {
})
if ok && !value {
rlog.Debug("================I'm not the validator node!=============")
leader := mux.Load().(bool)
if leader {
isLeader = false
} else if ok && !isLeader && value {
mux.Store(isLeader)
}
} else if ok && !mux.Load().(bool) && value {
isLeader = true
mux.Store(isLeader)
go client.CreateBlock()
} else if !ok {
break
......
......@@ -36,8 +36,8 @@ enableTxQuickIndex=true
[p2p]
seeds=["127.0.0.1:13802"]
enable=true
isSeed=true
enable=false
isSeed=false
serverStart=true
innerSeedEnable=false
useGithub=false
......@@ -80,6 +80,7 @@ poolCacheSize=10240
# 共识驱动名,支持solo/raft/ticket/tendermint/pbft
name="raft"
minerstart=false
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[mver.consensus]
fundKeyAddr = "1BQXS6TxaYYG5mADaWij4AxhZZUTpw95a5"
......@@ -107,8 +108,8 @@ peersURL="http://127.0.0.1:9021"
# raft共识用到,指示raft集群中只读节点的IP(只同步日志,不参与raft共识)
readOnlyPeersURL=""
addPeersURL=""
#raft共识用到,默认raft中多少条记录打包一个snapshot
defaultSnapCount=1000
#raft共识用到,默认raft中多少条记录打包一个snapshot(这里为了测试调整小一点)
defaultSnapCount=2
#raft共识用到,默认raft中写区块时间间隔
writeBlockSeconds=1
#raft共识用到,默认raft中leader发送心跳包时间间隔
......@@ -127,7 +128,7 @@ enableMVCC=false
[wallet]
minFee=100000
driver="leveldb"
driver="memdb"
dbPath="wallet"
dbCache=16
signType="secp256k1"
......@@ -135,6 +136,8 @@ signType="secp256k1"
[wallet.sub.ticket]
minerdisable=false
minerwhitelist=["*"]
minerWaitTime="1s"
[exec]
isFree=false
......@@ -144,6 +147,9 @@ enableMVCC=false
alias=["token1:token","token2:token","token3:token"]
saveTokenTxList=false
[exec.sub.relay]
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
[exec.sub.cert]
# 是否启用证书验证和签名
enable=false
......@@ -151,3 +157,14 @@ enable=false
cryptoPath="authdir/crypto"
# 带证书签名类型,支持"auth_ecdsa", "auth_sm2"
signType="auth_ecdsa"
[exec.sub.manage]
superManager=[
"1Bsg9j6gW83sShoee1fZAt9TkUjcrCgA9S",
"12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv",
"1Q8hGLfoGe63efeWa8fJ4Pnukhkngt6poK"
]
[exec.sub.autonomy]
total="16htvcBNSEA7fZhAdLJphDwQRQJaHpyHTp"
useBalance=false
\ No newline at end of file
......@@ -5,7 +5,9 @@
package raft
import (
"context"
"strings"
"sync/atomic"
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/queue"
......@@ -22,6 +24,7 @@ var (
writeBlockSeconds int64 = 1
heartbeatTick = 1
isLeader = false
mux atomic.Value
confChangeC chan raftpb.ConfChange
)
......@@ -39,6 +42,10 @@ type subConfig struct {
HeartbeatTick int32 `json:"heartbeatTick"`
}
func init() {
mux.Store(isLeader)
}
// NewRaftCluster create raft cluster
func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
rlog.Info("Start to create raft cluster")
......@@ -70,10 +77,6 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if subcfg.HeartbeatTick > 0 {
heartbeatTick = int(subcfg.HeartbeatTick)
}
// propose channel
proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange)
var b *Client
getSnapshot := func() ([]byte, error) { return b.getSnapshot() }
// raft集群的建立,1. 初始化两条channel: propose channel用于客户端和raft底层交互, commit channel用于获取commit消息
......@@ -90,10 +93,15 @@ func NewRaftCluster(cfg *types.Consensus, sub []byte) queue.Module {
if len(addPeers) == 1 && addPeers[0] == "" {
addPeers = []string{}
}
commitC, errorC, snapshotterReady, validatorC, stopC := NewRaftNode(int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//采用context来统一管理所有服务
ctx, stop := context.WithCancel(context.Background())
// propose channel
proposeC := make(chan *types.Block)
confChangeC = make(chan raftpb.ConfChange)
commitC, errorC, snapshotterReady, validatorC := NewRaftNode(ctx, int(subcfg.NodeID), subcfg.IsNewJoinNode, peers, readOnlyPeers, addPeers, getSnapshot, proposeC, confChangeC)
//启动raft删除节点操作监听
go serveHTTPRaftAPI(int(subcfg.RaftAPIPort), confChangeC, errorC)
go serveHTTPRaftAPI(ctx, int(subcfg.RaftAPIPort), confChangeC, errorC)
// 监听commit channel,取block
b = NewBlockstore(cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stopC)
b = NewBlockstore(ctx, cfg, <-snapshotterReady, proposeC, commitC, errorC, validatorC, stop)
return b
}
......@@ -5,6 +5,7 @@
package raft
import (
"context"
"io/ioutil"
"net/http"
"strconv"
......@@ -66,8 +67,8 @@ func (h *httpRaftAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
func serveHTTPRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{
func serveHTTPRaftAPI(ctx context.Context, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := &http.Server{
Addr: "localhost:" + strconv.Itoa(port),
Handler: &httpRaftAPI{
confChangeC: confChangeC,
......@@ -78,9 +79,11 @@ func serveHTTPRaftAPI(port int, confChangeC chan<- raftpb.ConfChange, errorC <-c
rlog.Error(fmt.Sprintf("ListenAndServe have a err: (%v)", err.Error()))
}
}()
// exit when raft goes down
if err, ok := <-errorC; ok {
select {
case <-ctx.Done():
srv.Close()
case err := <-errorC:
srv.Close()
rlog.Error(fmt.Sprintf("the errorC chan receive a err (%v)\n", err.Error()))
}
}
......@@ -5,6 +5,7 @@
package raft
import (
"context"
"errors"
"net"
"time"
......@@ -13,16 +14,16 @@ import (
// 设置TCP keep-alive超时,接收stopc
type stoppableListener struct {
*net.TCPListener
stopc <-chan struct{}
ctx context.Context
}
// 监听tcp连接
func newStoppableListener(addr string, stopc <-chan struct{}) (*stoppableListener, error) {
func newStoppableListener(ctx context.Context, addr string) (*stoppableListener, error) {
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
return &stoppableListener{ln.(*net.TCPListener), stopc}, nil
return &stoppableListener{ln.(*net.TCPListener), ctx}, nil
}
func (ln stoppableListener) Accept() (c net.Conn, err error) {
......@@ -37,7 +38,7 @@ func (ln stoppableListener) Accept() (c net.Conn, err error) {
connc <- tc
}()
select {
case <-ln.stopc:
case <-ln.ctx.Done():
return nil, errors.New("server stopped")
case err := <-errc:
return nil, err
......
......@@ -56,17 +56,18 @@ type raftNode struct {
snapCount uint64
transport *rafthttp.Transport
stopMu sync.RWMutex
stopc chan struct{}
httpstopc chan struct{}
httpdonec chan struct{}
ctx context.Context
//stopc chan struct{}
//httpstopc chan struct{}
//httpdonec chan struct{}
validatorC chan bool
//用于判断该节点是否重启过
restartC chan struct{}
}
// NewRaftNode create raft node
func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool, chan<- struct{}) {
func NewRaftNode(ctx context.Context, id int, join bool, peers []string, readOnlyPeers []string, addPeers []string, getSnapshot func() ([]byte, error), proposeC <-chan *types.Block,
confChangeC <-chan raftpb.ConfChange) (<-chan *types.Block, <-chan error, <-chan *snap.Snapshotter, <-chan bool) {
rlog.Info("Enter consensus raft")
// commit channel
......@@ -86,16 +87,14 @@ func NewRaftNode(id int, join bool, peers []string, readOnlyPeers []string, addP
snapdir: fmt.Sprintf("chain33_raft-%d%ssnap", id, string(os.PathSeparator)),
getSnapshot: getSnapshot,
snapCount: defaultSnapCount,
stopc: make(chan struct{}),
httpstopc: make(chan struct{}),
httpdonec: make(chan struct{}),
validatorC: make(chan bool),
snapshotterReady: make(chan *snap.Snapshotter, 1),
restartC: make(chan struct{}, 1),
ctx: ctx,
}
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady, rc.validatorC, rc.stopc
return commitC, errorC, rc.snapshotterReady, rc.validatorC
}
// 启动raft节点
......@@ -184,22 +183,22 @@ func (rc *raftNode) serveRaft() {
panic(err)
}
ln, err := newStoppableListener(nodeURL.Host, rc.httpstopc)
ln, err := newStoppableListener(rc.ctx, nodeURL.Host)
if err != nil {
rlog.Error(fmt.Sprintf("raft: Failed to listen rafthttp (%v)", err.Error()))
panic(err)
}
err = (&http.Server{Handler: rc.transport.Handler()}).Serve(ln)
raftSrv := &http.Server{Handler: rc.transport.Handler()}
err = raftSrv.Serve(ln)
if err != nil {
rlog.Error(fmt.Sprintf("raft: Failed to serve rafthttp (%v)", err.Error()))
}
select {
case <-rc.httpstopc:
case <-rc.ctx.Done():
raftSrv.Close()
default:
rlog.Error(fmt.Sprintf("raft: Failed to serve rafthttp (%v)", err.Error()))
}
close(rc.httpdonec)
}
func (rc *raftNode) serveChannels() {
......@@ -246,9 +245,11 @@ func (rc *raftNode) serveChannels() {
rlog.Error(fmt.Sprintf("rc.node.ProposeConfChange:%v", err.Error()))
}
}
case <-rc.ctx.Done():
rlog.Info("I have a exit message!")
return
}
}
close(rc.stopc)
}()
// 从Ready()中接收数据
for {
......@@ -275,7 +276,7 @@ func (rc *raftNode) serveChannels() {
rc.writeError(err)
return
case <-rc.stopc:
case <-rc.ctx.Done():
rc.stop()
return
}
......@@ -283,9 +284,9 @@ func (rc *raftNode) serveChannels() {
}
func (rc *raftNode) updateValidator() {
//TODO 这块监听后期需要根据场景进行优化?
time.Sleep(5 * time.Second)
//用于标记readOnlyPeers是否已经被添加到集群中了
flag := false
isRestart := false
......@@ -297,8 +298,12 @@ func (rc *raftNode) updateValidator() {
case <-ticker.C:
ticker.Stop()
}
ticker = time.NewTicker(time.Second)
for {
time.Sleep(time.Second)
select {
case <-rc.ctx.Done():
return
case <-ticker.C:
status := rc.Status()
if status.Lead == raft.None {
rlog.Debug(fmt.Sprintf("==============This is %s node!==============", status.RaftState.String()))
......@@ -317,6 +322,8 @@ func (rc *raftNode) updateValidator() {
flag = true
}
}
}
}
func (rc *raftNode) Status() raft.Status {
rc.stopMu.RLock()
......@@ -454,20 +461,18 @@ func (rc *raftNode) stop() {
rc.stopHTTP()
close(rc.commitC)
close(rc.errorC)
close(rc.stopc)
rc.node.Stop()
}
func (rc *raftNode) stopHTTP() {
rc.transport.Stop()
close(rc.httpstopc)
<-rc.httpdonec
//close(rc.httpstopc)
//<-rc.httpdonec
}
func (rc *raftNode) writeError(err error) {
rc.stopHTTP()
close(rc.commitC)
close(rc.stopc)
rc.errorC <- err
close(rc.errorC)
rc.node.Stop()
......@@ -488,7 +493,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
}
select {
case rc.commitC <- block:
case <-rc.stopc:
case <-rc.ctx.Done():
return false
}
......@@ -521,7 +526,7 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
if ents[i].Index == rc.lastIndex {
select {
case rc.commitC <- nil:
case <-rc.stopc:
case <-rc.ctx.Done():
return false
}
}
......
......@@ -5,177 +5,45 @@
package raft
import (
"encoding/binary"
"flag"
"fmt"
"math/rand"
"os"
"testing"
"time"
"github.com/33cn/chain33/blockchain"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/limits"
"github.com/33cn/chain33/common/log"
"github.com/33cn/chain33/executor"
"github.com/33cn/chain33/mempool"
"github.com/33cn/chain33/p2p"
"github.com/33cn/chain33/queue"
"github.com/33cn/chain33/store"
"github.com/33cn/chain33/types"
//加载系统内置store, 不要依赖plugin
_ "github.com/33cn/chain33/system/dapp/init"
_ "github.com/33cn/chain33/system/mempool/init"
_ "github.com/33cn/chain33/system/store/init"
"github.com/33cn/chain33/util"
"github.com/33cn/chain33/util/testnode"
_ "github.com/33cn/chain33/system"
_ "github.com/33cn/plugin/plugin/dapp/init"
pty "github.com/33cn/plugin/plugin/dapp/norm/types"
_ "github.com/33cn/plugin/plugin/store/init"
)
var (
random *rand.Rand
txNumber = 10
loopCount = 10
)
func init() {
err := limits.SetLimits()
if err != nil {
panic(err)
}
random = rand.New(rand.NewSource(types.Now().UnixNano()))
log.SetLogLevel("info")
}
func TestRaftPerf(t *testing.T) {
RaftPerf()
fmt.Println("=======start clear test data!=======")
// 执行: go test -cover
func TestRaft(t *testing.T) {
mock33 := testnode.New("chain33.test.toml", nil)
defer mock33.Close()
mock33.Listen()
t.Log(mock33.GetGenesisAddress())
time.Sleep(10 * time.Second)
txs := util.GenNoneTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
mock33.GetAPI().SendTx(txs[i])
}
mock33.WaitHeight(1)
txs = util.GenNoneTxs(mock33.GetGenesisKey(), 10)
for i := 0; i < len(txs); i++ {
mock33.GetAPI().SendTx(txs[i])
}
mock33.WaitHeight(2)
clearTestData()
}
func RaftPerf() {
q, chain, s, mem, exec, cs, p2p := initEnvRaft()
defer q.Close()
defer s.Close()
defer p2p.Close()
defer mem.Close()
defer exec.Close()
defer chain.Close()
defer cs.Close()
sendReplyList(q)
}
func initEnvRaft() (queue.Queue, *blockchain.BlockChain, queue.Module, queue.Module, *executor.Executor, queue.Module, queue.Module) {
var q = queue.New("channel")
flag.Parse()
cfg, sub := types.InitCfg("chain33.test.toml")
types.Init(cfg.Title, cfg)
s := store.New(cfg.Store, sub.Store)
s.SetQueueClient(q.Client())
chain := blockchain.New(cfg.BlockChain)
chain.SetQueueClient(q.Client())
exec := executor.New(cfg.Exec, sub.Exec)
exec.SetQueueClient(q.Client())
types.SetMinFee(0)
mem := mempool.New(cfg.Mempool, nil)
mem.SetQueueClient(q.Client())
cs := NewRaftCluster(cfg.Consensus, sub.Consensus["raft"])
cs.SetQueueClient(q.Client())
network := p2p.New(cfg.P2P)
network.SetQueueClient(q.Client())
return q, chain, s, mem, exec, cs, network
}
func generateKey(i, valI int) string {
key := make([]byte, valI)
binary.PutUvarint(key[:10], uint64(valI))
binary.PutUvarint(key[12:24], uint64(i))
if _, err := rand.Read(key[24:]); err != nil {
os.Exit(1)
}
return string(key)
}
func generateValue(i, valI int) string {
value := make([]byte, valI)
binary.PutUvarint(value[:16], uint64(i))
binary.PutUvarint(value[32:128], uint64(i))
if _, err := rand.Read(value[128:]); err != nil {
os.Exit(1)
}
return string(value)
}
func getprivkey(key string) crypto.PrivKey {
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
panic(err)
}
bkey, err := common.FromHex(key)
if err != nil {
panic(err)
}
priv, err := cr.PrivKeyFromBytes(bkey)
if err != nil {
panic(err)
}
return priv
}
func sendReplyList(q queue.Queue) {
client := q.Client()
client.Sub("mempool")
var count int
for msg := range client.Recv() {
if msg.Ty == types.EventTxList {
count++
msg.Reply(client.NewMessage("consensus", types.EventReplyTxList,
&types.ReplyTxList{Txs: getReplyList(txNumber)}))
if count >= loopCount {
time.Sleep(4 * time.Second)
break
}
}
}
}
func prepareTxList() *types.Transaction {
var key string
var value string
var i int
key = generateKey(i, 32)
value = generateValue(i, 180)
nput := &pty.NormAction_Nput{Nput: &pty.NormPut{Key: []byte(key), Value: []byte(value)}}
action := &pty.NormAction{Value: nput, Ty: pty.NormActionPut}
tx := &types.Transaction{Execer: []byte("norm"), Payload: types.Encode(action), Fee: 0}
tx.To = address.ExecAddress("norm")
tx.Nonce = random.Int63()
tx.Sign(types.SECP256K1, getprivkey("CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"))
return tx
}
func getReplyList(n int) (txs []*types.Transaction) {
for i := 0; i < n; i++ {
txs = append(txs, prepareTxList())
}
return txs
}
func clearTestData() {
err := os.RemoveAll("datadir")
if err != nil {
fmt.Println("delete datadir have a err:", err.Error())
}
err = os.RemoveAll("chain33_raft-1")
err := os.RemoveAll("chain33_raft-1")
if err != nil {
fmt.Println("delete chain33_raft dir have a err:", err.Error())
}
......
......@@ -39,7 +39,7 @@ const (
ProposalPOLID = byte(0x05)
VoteID = byte(0x06)
HasVoteID = byte(0x07)
VoteSetMaj23ID = byte(0X08)
VoteSetMaj23ID = byte(0x08)
VoteSetBitsID = byte(0x09)
ProposalHeartbeatID = byte(0x0a)
ProposalBlockID = byte(0x0b)
......
package executor
import (
"testing"
"github.com/33cn/chain33/account"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
dbm "github.com/33cn/chain33/common/db"
pty "github.com/33cn/plugin/plugin/dapp/game/types"
"github.com/stretchr/testify/assert"
)
type execEnv struct {
blockTime int64
blockHeight int64
difficulty uint64
}
var (
PrivKeyA = "0x6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b" // 1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4
PrivKeyB = "0x19c069234f9d3e61135fefbeb7791b149cdf6af536f26bebb310d4cd22c3fee4" // 1JRNjdEqp4LJ5fqycUBm9ayCKSeeskgMKR
PrivKeyC = "0x7a80a1f75d7360c6123c32a78ecf978c1ac55636f87892df38d8b85a9aeff115" // 1NLHPEcbTWWxxU3dGUZBhayjrCHD3psX7k
PrivKeyD = "0xcacb1f5d51700aea07fca2246ab43b0917d70405c65edea9b5063d72eb5c6b71" // 1MCftFynyvG2F4ED5mdHYgziDxx6vDrScs
Nodes = [][]byte{
[]byte("1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4"),
[]byte("1JRNjdEqp4LJ5fqycUBm9ayCKSeeskgMKR"),
[]byte("1NLHPEcbTWWxxU3dGUZBhayjrCHD3psX7k"),
[]byte("1MCftFynyvG2F4ED5mdHYgziDxx6vDrScs"),
}
)
func TestGame(t *testing.T) {
types.SetTitleOnlyForTest("chain33")
total := 100 * types.Coin
accountA := types.Account{
Balance: total,
Frozen: 0,
Addr: string(Nodes[0]),
}
accountB := types.Account{
Balance: total,
Frozen: 0,
Addr: string(Nodes[1]),
}
accountC := types.Account{
Balance: total,
Frozen: 0,
Addr: string(Nodes[2]),
}
accountD := types.Account{
Balance: total,
Frozen: 0,
Addr: string(Nodes[3]),
}
execAddr := address.ExecAddress(pty.GameX)
stateDB, _ := dbm.NewGoMemDB("1", "2", 1000)
_, _, kvdb := util.CreateTestDB()
accA, _ := account.NewAccountDB("coins", "bty", stateDB)
accA.SaveExecAccount(execAddr, &accountA)
accB, _ := account.NewAccountDB("coins", "bty", stateDB)
accB.SaveExecAccount(execAddr, &accountB)
accC, _ := account.NewAccountDB("coins", "bty", stateDB)
accC.SaveExecAccount(execAddr, &accountC)
accD, _ := account.NewAccountDB("coins", "bty", stateDB)
accD.SaveExecAccount(execAddr, &accountD)
env := execEnv{
10,
types.GetDappFork(pty.GameX, "Enable"),
1539918074,
}
// create game
createParam := &pty.GamePreCreateTx{Amount: 2 * types.Coin,
HashType: "sha256",
HashValue: common.Sha256([]byte("harrylee" + string(Rock))),
Fee: 100000}
createTx, err := pty.CreateRawGamePreCreateTx(createParam)
if err != nil {
t.Error(err)
}
createTx, err = signTx(createTx, PrivKeyA)
if err != nil {
t.Error(err)
}
exec := newGame()
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(1, env.blockTime, env.difficulty)
receipt, err := exec.Exec(createTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate := &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err := exec.ExecLocal(createTx, receiptDate, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
gameID := common.ToHex(createTx.Hash())
//match game
matchParam := &pty.GamePreMatchTx{GameID: gameID, Guess: Scissor, Fee: 100000}
matchTx, err := pty.CreateRawGamePreMatchTx(matchParam)
if err != nil {
t.Error(err)
}
matchTx, err = signTx(matchTx, PrivKeyB)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(2, env.blockTime+20, env.difficulty)
receipt, err = exec.Exec(matchTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(matchTx, receiptDate, int(1))
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
msg, err := exec.Query(pty.FuncNameQueryGameListByIds, types.Encode(&pty.QueryGameInfos{
GameIds: []string{gameID},
}))
if err != nil {
t.Error(err)
}
t.Log(msg)
msg, err = exec.Query(pty.FuncNameQueryGameByID, types.Encode(&pty.QueryGameInfo{
GameId: gameID}))
if err != nil {
t.Error(err)
}
t.Log(msg)
msg, err = exec.Query(pty.FuncNameQueryGameListByStatusAndAddr, types.Encode(&pty.QueryGameListByStatusAndAddr{
Status: pty.GameActionMatch}))
if err != nil {
t.Error(err)
}
//close game
closeParam := &pty.GamePreCloseTx{GameID: gameID, Secret: "harrylee", Result: Rock, Fee: 100000}
closeTx, err := pty.CreateRawGamePreCloseTx(closeParam)
if err != nil {
t.Error(err)
}
closeTx, err = signTx(closeTx, PrivKeyA)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(2, env.blockTime+20, env.difficulty)
receipt, err = exec.Exec(closeTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(closeTx, receiptDate, int(1))
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
acA := accA.LoadExecAccount(string(Nodes[0]), execAddr)
acB := accB.LoadExecAccount(string(Nodes[1]), execAddr)
t.Log(acA)
t.Log(acB)
msg, err = exec.Query(pty.FuncNameQueryGameByID, types.Encode(&pty.QueryGameInfo{
GameId: gameID}))
if err != nil {
t.Error(err)
}
reply := msg.(*pty.ReplyGame)
assert.Equal(t, int32(pty.GameActionClose), reply.Game.Status)
assert.Equal(t, IsCreatorWin, reply.Game.Result)
// create game
createParam = &pty.GamePreCreateTx{Amount: 2 * types.Coin,
HashType: "sha256",
HashValue: common.Sha256([]byte("123456" + string(Rock))),
Fee: 100000}
createTx, err = pty.CreateRawGamePreCreateTx(createParam)
if err != nil {
t.Error(err)
}
createTx, err = signTx(createTx, PrivKeyC)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(1, env.blockTime, env.difficulty)
receipt, err = exec.Exec(createTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(createTx, receiptDate, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
gameID = common.ToHex(createTx.Hash())
//cancle game
cancleParam := &pty.GamePreCancelTx{Fee: 1e5, GameID: gameID}
cancelTx, err := pty.CreateRawGamePreCancelTx(cancleParam)
if err != nil {
t.Error(err)
}
createTx, err = signTx(cancelTx, PrivKeyC)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(1, env.blockTime, env.difficulty)
receipt, err = exec.Exec(cancelTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(cancelTx, receiptDate, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
msg, err = exec.Query(pty.FuncNameQueryGameByID, types.Encode(&pty.QueryGameInfo{
GameId: gameID}))
if err != nil {
t.Error(err)
}
reply = msg.(*pty.ReplyGame)
assert.Equal(t, int32(pty.GameActionCancel), reply.Game.Status)
//create game
createParam = &pty.GamePreCreateTx{Amount: 2 * types.Coin,
HashType: "sha256",
HashValue: common.Sha256([]byte("123456" + string(Rock))),
Fee: 100000}
createTx, err = pty.CreateRawGamePreCreateTx(createParam)
if err != nil {
t.Error(err)
}
createTx, err = signTx(createTx, PrivKeyC)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(1, env.blockTime, env.difficulty)
receipt, err = exec.Exec(createTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(createTx, receiptDate, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
gameID = common.ToHex(createTx.Hash())
//match game
matchParam = &pty.GamePreMatchTx{GameID: gameID, Guess: Rock, Fee: 100000}
matchTx, err = pty.CreateRawGamePreMatchTx(matchParam)
if err != nil {
t.Error(err)
}
matchTx, err = signTx(matchTx, PrivKeyB)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(2, env.blockTime+20, env.difficulty)
receipt, err = exec.Exec(matchTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(matchTx, receiptDate, int(1))
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
//close game
closeParam = &pty.GamePreCloseTx{GameID: gameID, Secret: "123456", Result: Rock, Fee: 100000}
closeTx, err = pty.CreateRawGamePreCloseTx(closeParam)
if err != nil {
t.Error(err)
}
closeTx, err = signTx(closeTx, PrivKeyC)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(2, env.blockTime+20, env.difficulty)
receipt, err = exec.Exec(closeTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(closeTx, receiptDate, int(1))
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
msg, err = exec.Query(pty.FuncNameQueryGameByID, types.Encode(&pty.QueryGameInfo{
GameId: gameID}))
if err != nil {
t.Error(err)
}
reply = msg.(*pty.ReplyGame)
assert.Equal(t, int32(pty.GameActionClose), reply.Game.Status)
assert.Equal(t, IsDraw, reply.Game.Result)
//create game
createParam = &pty.GamePreCreateTx{Amount: 2 * types.Coin,
HashType: "sha256",
HashValue: common.Sha256([]byte("123456" + string(Rock))),
Fee: 100000}
createTx, err = pty.CreateRawGamePreCreateTx(createParam)
if err != nil {
t.Error(err)
}
createTx, err = signTx(createTx, PrivKeyC)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(1, env.blockTime, env.difficulty)
receipt, err = exec.Exec(createTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(createTx, receiptDate, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
gameID = common.ToHex(createTx.Hash())
//match game
matchParam = &pty.GamePreMatchTx{GameID: gameID, Guess: Paper, Fee: 100000}
matchTx, err = pty.CreateRawGamePreMatchTx(matchParam)
if err != nil {
t.Error(err)
}
matchTx, err = signTx(matchTx, PrivKeyB)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(2, env.blockTime+20, env.difficulty)
receipt, err = exec.Exec(matchTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(matchTx, receiptDate, int(1))
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
//close game
closeParam = &pty.GamePreCloseTx{GameID: gameID, Secret: "123456", Result: Rock, Fee: 100000}
closeTx, err = pty.CreateRawGamePreCloseTx(closeParam)
if err != nil {
t.Error(err)
}
closeTx, err = signTx(closeTx, PrivKeyC)
if err != nil {
t.Error(err)
}
exec.SetStateDB(stateDB)
exec.SetLocalDB(kvdb)
exec.SetEnv(2, env.blockTime+20, env.difficulty)
receipt, err = exec.Exec(closeTx, int(1))
if err != nil {
t.Error(err)
}
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptDate = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(closeTx, receiptDate, int(1))
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
msg, err = exec.Query(pty.FuncNameQueryGameByID, types.Encode(&pty.QueryGameInfo{
GameId: gameID}))
if err != nil {
t.Error(err)
}
reply = msg.(*pty.ReplyGame)
assert.Equal(t, int32(pty.GameActionClose), reply.Game.Status)
assert.Equal(t, IsMatcherWin, reply.Game.Result)
}
func signTx(tx *types.Transaction, hexPrivKey string) (*types.Transaction, error) {
signType := types.SECP256K1
c, err := crypto.New(types.GetSignName("", signType))
if err != nil {
return tx, err
}
bytes, err := common.FromHex(hexPrivKey[:])
if err != nil {
return tx, err
}
privKey, err := c.PrivKeyFromBytes(bytes)
if err != nil {
return tx, err
}
tx.Sign(int32(signType), privKey)
return tx, nil
}
......@@ -6,6 +6,7 @@ package executor
import (
"bytes"
"encoding/hex"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
......@@ -65,6 +66,7 @@ func filterParaTxGroup(tx *types.Transaction, allTxs []*types.TxDetail, index in
}
if !checkReceiptExecOk(allTxs[i].Receipt) {
clog.Error("filterParaTxGroup rmv tx group", "txhash", hex.EncodeToString(allTxs[i].Tx.Hash()))
return nil, endIdx
}
}
......@@ -90,6 +92,7 @@ func FilterTxsForPara(main *types.ParaTxDetail) []*types.Transaction {
}
//单独的paracross tx 如果主链执行失败也要排除, 6.2fork原因 没有排除 非user.p.xx.paracross的平行链交易
if main.Header.Height >= forkHeight && bytes.HasSuffix(tx.Execer, []byte(pt.ParaX)) && !checkReceiptExecOk(main.TxDetails[i].Receipt) {
clog.Error("FilterTxsForPara rmv tx", "txhash", hex.EncodeToString(tx.Hash()))
continue
}
......
package executor
import (
"testing"
"github.com/33cn/chain33/account"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
dbm "github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
pkt "github.com/33cn/plugin/plugin/dapp/pokerbull/types"
"github.com/stretchr/testify/assert"
)
type execEnv struct {
blockTime int64
blockHeight int64
difficulty uint64
}
var (
PrivKeyA = "0x6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b" // 1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4
PrivKeyB = "0x19c069234f9d3e61135fefbeb7791b149cdf6af536f26bebb310d4cd22c3fee4" // 1JRNjdEqp4LJ5fqycUBm9ayCKSeeskgMKR
Nodes = [][]byte{
[]byte("1KSBd17H7ZK8iT37aJztFB22XGwsPTdwE4"),
[]byte("1JRNjdEqp4LJ5fqycUBm9ayCKSeeskgMKR"),
}
)
func TestPokerbull(t *testing.T) {
types.SetTitleOnlyForTest("chain33")
total := 1000 * types.Coin
accountA := types.Account{
Balance: total,
Frozen: 0,
Addr: string(Nodes[0]),
}
accountB := types.Account{
Balance: total,
Frozen: 0,
Addr: string(Nodes[1]),
}
execAddr := dapp.ExecAddress(pkt.PokerBullX)
stateDB, _ := dbm.NewGoMemDB("1", "2", 100)
_, _, kvdb := util.CreateTestDB()
accA := account.NewCoinsAccount()
accA.SetDB(stateDB)
accA.SaveExecAccount(execAddr, &accountA)
accB := account.NewCoinsAccount()
accB.SetDB(stateDB)
accB.SaveExecAccount(execAddr, &accountB)
env := execEnv{
10,
types.GetDappFork(pkt.PokerBullX, "Enable"),
1539918074,
}
// start game
p1 := &pkt.PBGameStart{
Value: 5 * types.Coin,
PlayerNum: 2,
}
createTx, err := types.CallCreateTransaction(pkt.PokerBullX, "Start", p1)
if err != nil {
t.Error("RPC_Default_Process", "err", err)
}
createTx.Execer = pkt.ExecerPokerBull
createTx, err = signTx(createTx, PrivKeyA)
if err != nil {
t.Error("RPC_Default_Process sign", "err", err)
}
exec := newPBGame()
exec.SetStateDB(stateDB)
assert.Equal(t, exec.GetCoinsAccount().LoadExecAccount(string(Nodes[0]), execAddr).GetBalance(), total)
exec.SetLocalDB(kvdb)
exec.SetEnv(env.blockHeight, env.blockTime, env.difficulty)
receipt, err := exec.Exec(createTx, int(1))
assert.Nil(t, err)
assert.NotNil(t, receipt)
t.Log(receipt)
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptData := &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err := exec.ExecLocal(createTx, receiptData, int(1))
assert.Nil(t, err)
assert.NotNil(t, set)
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
gameID := createTx.Hash()
// start game p2
createTx, err = types.CallCreateTransaction(pkt.PokerBullX, "Start", p1)
if err != nil {
t.Error("RPC_Default_Process", "err", err)
}
createTx.Execer = pkt.ExecerPokerBull
createTx, err = signTx(createTx, PrivKeyB)
if err != nil {
t.Error("RPC_Default_Process sign", "err", err)
}
exec.SetEnv(env.blockHeight+1, env.blockTime+1, env.difficulty)
receipt, err = exec.Exec(createTx, int(1))
assert.Nil(t, err)
assert.NotNil(t, receipt)
t.Log(receipt)
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptData = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(createTx, receiptData, int(1))
assert.Nil(t, err)
assert.NotNil(t, set)
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
// continue game
p2 := &pkt.PBGameContinue{
GameId: common.ToHex(gameID),
}
createTx, err = types.CallCreateTransaction(pkt.PokerBullX, "Continue", p2)
if err != nil {
t.Error("RPC_Default_Process", "err", err)
}
createTx.Execer = pkt.ExecerPokerBull
createTx, err = signTx(createTx, PrivKeyA)
if err != nil {
t.Error("RPC_Default_Process sign", "err", err)
}
exec.SetEnv(env.blockHeight+1, env.blockTime+1, env.difficulty)
receipt, err = exec.Exec(createTx, int(1))
assert.Nil(t, err)
assert.NotNil(t, receipt)
t.Log(receipt)
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptData = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(createTx, receiptData, int(1))
assert.Nil(t, err)
assert.NotNil(t, set)
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
// quit game
p3 := &pkt.PBGameQuit{
GameId: common.ToHex(gameID),
}
createTx, err = types.CallCreateTransaction(pkt.PokerBullX, "Quit", p3)
if err != nil {
t.Error("RPC_Default_Process", "err", err)
}
createTx.Execer = pkt.ExecerPokerBull
createTx, err = signTx(createTx, PrivKeyA)
if err != nil {
t.Error("RPC_Default_Process sign", "err", err)
}
exec.SetEnv(env.blockHeight+2, env.blockTime+2, env.difficulty)
receipt, err = exec.Exec(createTx, int(1))
assert.Nil(t, err)
assert.NotNil(t, receipt)
for _, kv := range receipt.KV {
stateDB.Set(kv.Key, kv.Value)
}
receiptData = &types.ReceiptData{Ty: receipt.Ty, Logs: receipt.Logs}
set, err = exec.ExecLocal(createTx, receiptData, int(1))
assert.Nil(t, err)
assert.NotNil(t, set)
for _, kv := range set.KV {
kvdb.Set(kv.Key, kv.Value)
}
// query
res, err := exec.Query(pkt.FuncNameQueryGameByID, types.Encode(&pkt.QueryPBGameInfo{GameId: common.ToHex(gameID)}))
assert.Nil(t, err)
assert.NotNil(t, res)
res, err = exec.Query(pkt.FuncNameQueryGameByAddr, types.Encode(&pkt.QueryPBGameInfo{Addr: string(Nodes[0])}))
assert.Nil(t, err)
assert.NotNil(t, res)
res, err = exec.Query(pkt.FuncNameQueryGameByStatus, types.Encode(&pkt.QueryPBGameInfo{Status: pkt.PBGameActionQuit}))
assert.Nil(t, err)
assert.NotNil(t, res)
res, err = exec.Query(pkt.FuncNameQueryGameByRound, types.Encode(&pkt.QueryPBGameByRound{GameId: common.ToHex(gameID), Round: int32(1)}))
assert.Nil(t, err)
assert.NotNil(t, res)
var gameIDsS []string
gameIDsS = append(gameIDsS, common.ToHex(gameID))
res, err = exec.Query(pkt.FuncNameQueryGameListByIDs, types.Encode(&pkt.QueryPBGameInfos{GameIds: gameIDsS}))
assert.Nil(t, err)
assert.NotNil(t, res)
}
func signTx(tx *types.Transaction, hexPrivKey string) (*types.Transaction, error) {
signType := types.SECP256K1
c, err := crypto.New(types.GetSignName(pkt.PokerBullX, signType))
if err != nil {
return tx, err
}
bytes, err := common.FromHex(hexPrivKey[:])
if err != nil {
return tx, err
}
privKey, err := c.PrivKeyFromBytes(bytes)
if err != nil {
return tx, err
}
tx.Sign(int32(signType), privKey)
return tx, nil
}
......@@ -41,7 +41,9 @@ func TestJRPCChannel(t *testing.T) {
{fn: testStartRawTxCmd},
{fn: testContinueRawTxCmd},
{fn: testQuitRawTxCmd},
{fn: testPlayRawTxCmd},
}
for _, testCase := range testCases {
err := testCase.fn(t, jrpcClient)
assert.Nil(t, err)
......@@ -93,6 +95,17 @@ func testContinueRawTxCmd(t *testing.T, jrpc *jsonclient.JSONClient) error {
return jrpc.Call("Chain33.CreateTransaction", params, &res)
}
func testPlayRawTxCmd(t *testing.T, jrpc *jsonclient.JSONClient) error {
payload := &pty.PBGamePlay{GameId: "123", Round: 1, Value: 5, Address: []string{"a", "b"}}
params := &rpctypes.CreateTxIn{
Execer: types.ExecName(pty.PokerBullX),
ActionName: pty.CreatePlayTx,
Payload: types.MustPBToJSON(payload),
}
var res string
return jrpc.Call("Chain33.CreateTransaction", params, &res)
}
func testQuitRawTxCmd(t *testing.T, jrpc *jsonclient.JSONClient) error {
payload := &pty.PBGameQuit{GameId: "123"}
params := &rpctypes.CreateTxIn{
......
......@@ -10,7 +10,7 @@ import (
rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
ty "github.com/33cn/plugin/plugin/dapp/ticket/types"
context "golang.org/x/net/context"
"golang.org/x/net/context"
)
func bindMiner(param *ty.ReqBindMiner) (*ty.ReplyBindMiner, error) {
......@@ -28,23 +28,24 @@ func bindMiner(param *ty.ReqBindMiner) (*ty.ReplyBindMiner, error) {
// CreateBindMiner 创建绑定挖矿
func (g *channelClient) CreateBindMiner(ctx context.Context, in *ty.ReqBindMiner) (*ty.ReplyBindMiner, error) {
header, err := g.GetLastHeader()
err := address.CheckAddress(in.BindAddr)
if err != nil {
return nil, err
}
if in.Amount%ty.GetTicketMinerParam(header.Height).TicketPrice != 0 || in.Amount < 0 {
return nil, types.ErrAmount
}
err = address.CheckAddress(in.BindAddr)
err = address.CheckAddress(in.OriginAddr)
if err != nil {
return nil, err
}
err = address.CheckAddress(in.OriginAddr)
if in.CheckBalance {
header, err := g.GetLastHeader()
if err != nil {
return nil, err
}
if in.Amount%ty.GetTicketMinerParam(header.Height).TicketPrice != 0 || in.Amount < 0 {
return nil, types.ErrAmount
}
if in.CheckBalance {
getBalance := &types.ReqBalance{Addresses: []string{in.OriginAddr}, Execer: "coins", AssetSymbol: "bty", AssetExec: "coins"}
balances, err := g.GetCoinsAccountDB().GetBalance(g, getBalance)
if err != nil {
......
......@@ -26,6 +26,26 @@ import (
var cfgstring = `
Title="test"
[mempool]
poolCacheSize=102400
minTxFee=100000
maxTxNumPerAccount=100
[exec]
isFree=false
minExecFee=100000
enableStat=false
enableMVCC=false
[wallet]
minFee=100000
driver="leveldb"
dbPath="wallet"
dbCache=16
signType="secp256k1"
minerdisable=false
minerwhitelist=["*"]
[mver.consensus]
fundKeyAddr = "1BQXS6TxaYYG5mADaWij4AxhZZUTpw95a5"
powLimitBits = "0x1f00ffff"
......@@ -43,6 +63,46 @@ ticketWithdrawTime = 10
ticketMinerWaitTime = 2
targetTimespan = 2304
targetTimePerBlock = 16
[mver.consensus.ticket.ForkChainParamV1]
ticketPrice = 3000
[mver.consensus.ticket.ForkChainParamV2]
ticketPrice = 6000
[fork.system]
ForkChainParamV1= 10
ForkChainParamV2= 20
ForkStateDBSet=-1
ForkCheckTxDup=0
ForkBlockHash= 1
ForkMinerTime= 10
ForkTransferExec= 100000
ForkExecKey= 200000
ForkTxGroup= 200000
ForkResetTx0= 200000
ForkWithdraw= 200000
ForkExecRollback= 450000
ForkTxHeight= -1
ForkTxGroupPara= -1
ForkCheckBlockTime=1200000
ForkMultiSignAddress=1298600
ForkBlockCheck=1
ForkLocalDBAccess=0
ForkBase58AddressCheck=1800000
ForkEnableParaRegExec=0
ForkCacheDriver=0
ForkTicketFundAddrV1=-1
[fork.sub.coins]
Enable=0
[fork.sub.manage]
Enable=0
ForkManageExec=100000
[fork.sub.store-kvmvccmavl]
ForkKvmvccmavl=1
`
func newGrpc(api client.QueueProtocolAPI) *channelClient {
......@@ -59,17 +119,18 @@ func TestChannelClient_BindMiner(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("ticket", nil, nil, nil)
head := &types.Header{StateHash: []byte("sdfadasds")}
api.On("GetLastHeader").Return(head, nil)
head := &types.Header{Height: 2, StateHash: []byte("sdfadasds")}
api.On("GetLastHeader").Return(head, nil).Times(4)
var acc = &types.Account{Addr: "1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt", Balance: 100000 * types.Coin}
accv := types.Encode(acc)
storevalue := &types.StoreReplyValue{}
storevalue.Values = append(storevalue.Values, accv)
api.On("StoreGet", mock.Anything).Return(storevalue, nil)
api.On("StoreGet", mock.Anything).Return(storevalue, nil).Twice()
types.SetTitleOnlyForTest("test")
types.InitCfgString(cfgstring)
cfg, _ := types.InitCfgString(cfgstring)
types.Init("test", cfg)
//var addrs = make([]string, 1)
//addrs = append(addrs, "1Jn2qu84Z1SUUosWjySggBS9pKWdAP3tZt")
......@@ -81,6 +142,16 @@ func TestChannelClient_BindMiner(t *testing.T) {
}
_, err := client.CreateBindMiner(context.Background(), in)
assert.Nil(t, err)
in.Amount = 200000 * types.Coin
_, err = client.CreateBindMiner(context.Background(), in)
assert.Equal(t, types.ErrNoBalance, err)
head.Height = 20 //ForkChainParamV2
api.On("GetLastHeader").Return(head, nil).Times(2)
_, err = client.CreateBindMiner(context.Background(), in)
assert.Equal(t, types.ErrAmount, err)
}
func testGetTicketCountOK(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment