Commit 8ed23987 authored by 张振华's avatar 张振华

commit dpos

parent 8bf90244
Title="local"
[log]
# 日志级别,支持debug(dbug)/info/warn/error(eror)/crit
loglevel = "debug"
logConsoleLevel = "info"
# 日志文件名,可带目录,所有生成的日志文件都放到此目录下
logFile = "logs/chain33.log"
# 单个日志文件的最大值(单位:兆)
maxFileSize = 300
# 最多保存的历史日志文件个数
maxBackups = 100
# 最多保存的历史日志消息(单位:天)
maxAge = 28
# 日志文件名是否使用本地事件(否则使用UTC时间)
localTime = true
# 历史日志文件是否压缩(压缩格式为gz)
compress = true
# 是否打印调用源文件和行号
callerFile = false
# 是否打印调用方法
callerFunction = false
[blockchain]
defCacheSize=512
maxFetchBlockNum=128
timeoutSeconds=5
batchBlockNum=128
driver="leveldb"
dbPath="datadir"
dbCache=64
isStrongConsistency=true
singleMode=true
batchsync=false
enableTxQuickIndex=true
[p2p]
seeds=["127.0.0.1:13802"]
enable=true
isSeed=true
serverStart=true
innerSeedEnable=false
useGithub=false
innerBounds=300
msgCacheSize=10240
driver="leveldb"
dbPath="datadir/addrbook"
dbCache=4
grpcLogFile="grpc33.log"
version=199
verMix=199
verMax=199
[rpc]
jrpcBindAddr="localhost:8801"
grpcBindAddr="localhost:8802"
whitelist=["127.0.0.1"]
jrpcFuncWhitelist=["*"]
grpcFuncWhitelist=["*"]
[mempool]
name="timeline"
poolCacheSize=10240
minTxFee=100000
[consensus]
name="tendermint"
minerstart=false
[mver.consensus]
fundKeyAddr = "1BQXS6TxaYYG5mADaWij4AxhZZUTpw95a5"
coinReward = 18
coinDevFund = 12
ticketPrice = 10000
powLimitBits = "0x1f00ffff"
retargetAdjustmentFactor = 4
futureBlockTime = 16
ticketFrozenTime = 5 #5s only for test
ticketWithdrawTime = 10 #10s only for test
ticketMinerWaitTime = 2 #2s only for test
maxTxNumber = 1600 #160
targetTimespan = 2304
targetTimePerBlock = 16
[mver.consensus.ForkChainParamV1]
maxTxNumber = 10000
targetTimespan = 288 #only for test
targetTimePerBlock = 2
[mver.consensus.ForkChainParamV2]
powLimitBits = "0x1f2fffff"
[consensus.sub.dpos]
genesis="14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
genesisBlockTime=1514533394
timeoutCheckConnections=1000
timeoutVoting=3000
timeoutWaitNotify=2000
createEmptyBlocks=false
createEmptyBlocksInterval=0
validatorNodes=["127.0.0.1:46656"]
blockInterval=3
continueBlockNum=12
isValidator=false
[store]
name="kvdb"
driver="leveldb"
dbPath="datadir/mavltree"
dbCache=128
[store.sub.kvdb]
enableMavlPrefix=false
enableMVCC=false
[wallet]
minFee=100000
driver="leveldb"
dbPath="wallet"
dbCache=16
signType="secp256k1"
[wallet.sub.ticket]
minerdisable=false
minerwhitelist=["*"]
[exec]
isFree=false
minExecFee=100000
enableStat=false
enableMVCC=false
alias=["token1:token","token2:token","token3:token"]
saveTokenTxList=false
[exec.sub.cert]
# 是否启用证书验证和签名
enable=false
# 加密文件路径
cryptoPath="authdir/crypto"
# 带证书签名类型,支持"auth_ecdsa", "auth_sm2"
signType="auth_ecdsa"
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"bytes"
"errors"
"fmt"
"reflect"
"runtime/debug"
"sync"
"sync/atomic"
"time"
dpostype "github.com/33cn/plugin/plugin/consensus/dpos/types"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
)
//-----------------------------------------------------------------------------
// Config
const (
proposalHeartbeatIntervalSeconds = 1
continueToVote = 0
voteSuccess = 1
voteFail = 2
)
// Errors define
var (
ErrInvalidVoteSignature = errors.New("Error invalid vote signature")
ErrInvalidVoteReplySignature = errors.New("Error invalid vote reply signature")
ErrInvalidNotifySignature = errors.New("Error invalid notify signature")
)
//-----------------------------------------------------------------------------
var (
msgQueueSize = 1000
)
// internally generated messages which may update the state
type timeoutInfo struct {
Duration time.Duration `json:"duration"`
State int `json:"state"`
}
func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v", ti.Duration)
}
// ConsensusState handles execution of the consensus algorithm.
// It processes votes and proposals, and upon reaching agreement,
// commits blocks to the chain and executes them against the application.
// The internal state machine receives input from peers, the internal validator, and from a timer.
type ConsensusState struct {
// config details
client *Client
privValidator ttypes.PrivValidator // for signing votes
privValidatorIndex int
// internal state
mtx sync.Mutex
validatorMgr ValidatorMgr // State until height-1.
// state changes may be triggered by msgs from peers,
// msgs from ourself, or by timeouts
peerMsgQueue chan MsgInfo
internalMsgQueue chan MsgInfo
timeoutTicker TimeoutTicker
broadcastChannel chan<- MsgInfo
ourID ID
started uint32 // atomic
stopped uint32 // atomic
Quit chan struct{}
//当前状态
dposState DposState
//所有选票,包括自己的和从网络中接收到的
dposVotes []*dpostype.DPosVote
//当前达成共识的选票
currentVote *dpostype.VoteItem
lastVote *dpostype.VoteItem
myVote *dpostype.DPosVote
lastMyVote *dpostype.DPosVote
notify *dpostype.DPosNotify
lastNotify *dpostype.DPosNotify
//所有选票,包括自己的和从网络中接收到的
cachedVotes []*dpostype.DPosVote
cachedNotify *dpostype.DPosNotify
}
// NewConsensusState returns a new ConsensusState.
func NewConsensusState(client *Client, valMgr ValidatorMgr) *ConsensusState {
cs := &ConsensusState{
client: client,
peerMsgQueue: make(chan MsgInfo, msgQueueSize),
internalMsgQueue: make(chan MsgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
Quit: make(chan struct{}),
dposState: InitStateObj,
dposVotes: nil,
}
cs.updateToValMgr(valMgr)
return cs
}
// SetOurID method
func (cs *ConsensusState) SetOurID(id ID) {
cs.ourID = id
}
// SetBroadcastChannel method
func (cs *ConsensusState) SetBroadcastChannel(broadcastChannel chan<- MsgInfo) {
cs.broadcastChannel = broadcastChannel
}
// IsRunning method
func (cs *ConsensusState) IsRunning() bool {
return atomic.LoadUint32(&cs.started) == 1 && atomic.LoadUint32(&cs.stopped) == 0
}
//----------------------------------------
// String returns a string.
func (cs *ConsensusState) String() string {
// better not to access shared variables
return fmt.Sprintf("ConsensusState") //(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step)
}
// GetState returns a copy of the chain state.
func (cs *ConsensusState) GetValidatorMgr() ValidatorMgr {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.validatorMgr.Copy()
}
// GetValidators returns a copy of the current validators.
func (cs *ConsensusState) GetValidators() ([]*ttypes.Validator) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
return cs.validatorMgr.Validators.Copy().Validators
}
// SetPrivValidator sets the private validator account for signing votes.
func (cs *ConsensusState) SetPrivValidator(priv ttypes.PrivValidator, index int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.privValidator = priv
cs.privValidatorIndex = index
}
// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing.
func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.timeoutTicker = timeoutTicker
}
// Start It start first time starts the timeout receive routines.
func (cs *ConsensusState) Start() {
if atomic.CompareAndSwapUint32(&cs.started, 0, 1) {
if atomic.LoadUint32(&cs.stopped) == 1 {
dposlog.Error("ConsensusState already stoped")
}
cs.timeoutTicker.Start()
// now start the receiveRoutine
go cs.receiveRoutine()
// schedule the first round!
cs.scheduleDPosTimeout(time.Second * 3, InitStateType)
}
}
// Stop timer and receive routine
func (cs *ConsensusState) Stop() {
cs.timeoutTicker.Stop()
cs.Quit <- struct{}{}
}
// Attempt to schedule a timeout (by sending timeoutInfo on the tickChan)
func (cs *ConsensusState) scheduleDPosTimeout(duration time.Duration, stateType int) {
cs.timeoutTicker.ScheduleTimeout(timeoutInfo{Duration:duration, State: stateType})
}
// send a msg into the receiveRoutine regarding our own proposal, block part, or vote
func (cs *ConsensusState) sendInternalMessage(mi MsgInfo) {
select {
case cs.internalMsgQueue <- mi:
default:
// NOTE: using the go-routine means our votes can
// be processed out of order.
// TODO: use CList here for strict determinism and
// attempt push to internalMsgQueue in receiveRoutine
dposlog.Info("Internal msg queue is full. Using a go-routine")
go func() { cs.internalMsgQueue <- mi }()
}
}
// Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes ttypes.RoundStepNewHeight.
func (cs *ConsensusState) updateToValMgr(valMgr ValidatorMgr) {
cs.validatorMgr = valMgr
}
//-----------------------------------------
// the main go routines
// receiveRoutine handles messages which may cause state transitions.
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// ConsensusState must be locked before any internal state is updated.
func (cs *ConsensusState) receiveRoutine() {
defer func() {
if r := recover(); r != nil {
dposlog.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
}
}()
for {
var mi MsgInfo
select {
case mi = <-cs.peerMsgQueue:
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
case mi = <-cs.internalMsgQueue:
// handles proposals, block parts, votes
cs.handleMsg(mi)
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti)
case <-cs.Quit:
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
return
}
}
}
// state transitions on complete-proposal, 2/3-any, 2/3-one
func (cs *ConsensusState) handleMsg(mi MsgInfo) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
var err error
msg, peerID, peerIP := mi.Msg, string(mi.PeerID), mi.PeerIP
switch msg := msg.(type) {
case *dpostype.DPosVote:
cs.dposState.recvVote(cs, msg)
case *dpostype.DPosNotify:
cs.dposState.recvNotify(cs, msg)
case *dpostype.DPosVoteReply:
cs.dposState.recvVoteReply(cs, msg)
default:
dposlog.Error("Unknown msg type", msg.String(), "peerid", peerID, "peerip", peerIP)
}
if err != nil {
dposlog.Error("Error with msg", "type", reflect.TypeOf(msg), "peerid", peerID, "peerip", peerIP, "err", err, "msg", msg)
}
}
func (cs *ConsensusState) handleTimeout(ti timeoutInfo) {
dposlog.Debug("Received tock", "timeout", ti.Duration, "state", StateTypeMapping[ti.State])
// the timeout will now cause a state transition
cs.mtx.Lock()
defer cs.mtx.Unlock()
//由具体的状态来处理超时消息
cs.dposState.timeOut(cs)
}
// IsProposer method
func (cs *ConsensusState) IsProposer() bool {
if cs.currentVote != nil {
return bytes.Equal(cs.currentVote.VotedNodeAddress, cs.privValidator.GetAddress())
}
return false
}
func (cs *ConsensusState) SetState(state DposState){
cs.dposState = state
}
func (cs *ConsensusState) SaveVote() {
if cs.lastVote == nil {
cs.lastVote = cs.currentVote
} else if cs.currentVote != nil && !bytes.Equal(cs.currentVote.VoteId, cs.lastVote.VoteId) {
cs.lastVote = cs.currentVote
}
}
func (cs *ConsensusState) SetCurrentVote(vote * dpostype.VoteItem) {
cs.currentVote = vote
}
func (cs *ConsensusState) SaveMyVote() {
if cs.lastMyVote == nil {
cs.lastMyVote = cs.myVote
} else if cs.myVote != nil && !bytes.Equal(cs.myVote.Signature, cs.lastMyVote.Signature) {
cs.lastMyVote = cs.myVote
}
}
func (cs *ConsensusState) SetMyVote(vote * dpostype.DPosVote){
cs.myVote = vote
}
func (cs *ConsensusState) SaveNotify() {
if cs.lastNotify == nil {
cs.lastNotify = cs.notify
} else if cs.notify != nil && !bytes.Equal(cs.notify.Signature, cs.lastNotify.Signature) {
cs.lastNotify = cs.notify
}
}
func (cs *ConsensusState) SetNotify(notify * dpostype.DPosNotify){
if cs.notify != nil && !bytes.Equal(cs.lastNotify.Signature, notify.Signature) {
cs.lastNotify = cs.notify
}
cs.notify = notify
}
func (cs *ConsensusState) CacheNotify(notify * dpostype.DPosNotify){
cs.cachedNotify = notify
}
func (cs *ConsensusState) ClearCachedNotify(){
cs.cachedNotify = nil
}
func (cs *ConsensusState) AddVotes(vote * dpostype.DPosVote){
repeatFlag := false
addrExistFlag := false
index := -1
if cs.lastVote != nil && vote.VoteItem.PeriodStart < cs.lastVote.PeriodStop {
dposlog.Info("Old vote, discard it", "vote.PeriodStart", vote.VoteItem.PeriodStart, "last vote.PeriodStop", cs.lastVote.PeriodStop)
return
}
for i := 0; i < len(cs.dposVotes); i++ {
if bytes.Equal(cs.dposVotes[i].Signature, vote.Signature) {
repeatFlag = true
break
} else if bytes.Equal(cs.dposVotes[i].VoterNodeAddress, vote.VoterNodeAddress) {
addrExistFlag = true
index = i
break
}
}
//有重复投票,则不需要处理
if repeatFlag {
return
}
//投票不重复,如果地址也不重复,则直接加入;如果地址重复了,则替换老的投票
if !addrExistFlag {
cs.dposVotes = append(cs.dposVotes, vote)
} else if vote.VoteTimestamp > cs.dposVotes[index].VoteTimestamp {
cs.dposVotes[index] = vote
}
}
func (cs *ConsensusState) CacheVotes(vote * dpostype.DPosVote){
repeatFlag := false
addrExistFlag := false
index := -1
for i := 0; i < len(cs.cachedVotes); i++ {
if bytes.Equal(cs.cachedVotes[i].Signature, vote.Signature) {
repeatFlag = true
break
} else if bytes.Equal(cs.cachedVotes[i].VoterNodeAddress, vote.VoterNodeAddress) {
addrExistFlag = true
index = i
break
}
}
//有重复投票,则不需要处理
if repeatFlag {
return
}
//投票不重复,如果地址也不重复,则直接加入;如果地址重复了,则替换老的投票
if !addrExistFlag {
cs.cachedVotes = append(cs.cachedVotes, vote)
} else if vote.VoteTimestamp > cs.cachedVotes[index].VoteTimestamp {
/*
if index == len(cs.cachedVotes) - 1 {
cs.cachedVotes = append(cs.cachedVotes, vote)
}else {
cs.cachedVotes = append(cs.cachedVotes[:index], cs.dposVotes[(index + 1):]...)
cs.cachedVotes = append(cs.cachedVotes, vote)
}
*/
cs.cachedVotes[index] = vote
}
}
func (cs *ConsensusState) CheckVotes()(ty int, vote * dpostype.VoteItem){
major32 := int(dposDelegateNum * 2 / 3)
//总的票数还不够2/3,先不做决定
if len(cs.dposVotes) < major32 {
return continueToVote, nil
}
voteStat := map[string] int {}
for i := 0; i < len(cs.dposVotes); i++ {
key := string(cs.dposVotes[i].VoteItem.VoteId)
if _, ok := voteStat[key]; ok {
voteStat[key]++
} else {
voteStat[key] = 1
}
}
key := ""
value := 0
for k, v := range voteStat {
if v > value {
value = v
key = k
}
}
//如果一个节点的投票数已经过2/3,则返回最终票数超过2/3的选票
if value >= major32 {
for i := 0; i < len(cs.dposVotes); i++ {
if key == string(cs.dposVotes[i].VoteItem.VoteId) {
return voteSuccess, cs.dposVotes[i].VoteItem
}
}
} else if (value + (int(dposDelegateNum) - len(cs.dposVotes))) < major32{
//得票最多的节点,即使后续所有票都选它,也不满足2/3多数,不能达成共识。
return voteFail, nil
}
return continueToVote, nil
}
func (cs *ConsensusState) ClearVotes(){
cs.dposVotes = nil
cs.currentVote = nil
cs.myVote = nil
}
func (cs *ConsensusState) ClearCachedVotes() {
cs.cachedVotes = nil
}
func (cs *ConsensusState)VerifyVote(vote * dpostype.DPosVote) bool{
// Check validator
index, val := cs.validatorMgr.Validators.GetByAddress(vote.VoterNodeAddress)
if index == -1 && val == nil {
dposlog.Info("The voter is not a legal validator, so discard this vote", "vote", vote.String())
return false
}
// Verify signature
pubkey, err := dpostype.ConsensusCrypto.PubKeyFromBytes(val.PubKey)
if err != nil {
dposlog.Error("Error pubkey from bytes", "err", err)
return false
}
voteTmp := &dpostype.Vote{DPosVote: vote}
if err := voteTmp.Verify(cs.validatorMgr.ChainID, pubkey); err != nil {
dposlog.Error("Verify vote signature failed", "err", err)
return false
}
return true
}
func (cs *ConsensusState)VerifyNotify(notify * dpostype.DPosNotify) bool{
// Check validator
index, val := cs.validatorMgr.Validators.GetByAddress(notify.NotifyNodeAddress)
if index == -1 && val == nil {
dposlog.Info("The notifier is not a legal validator, so discard this notify", "notify", notify.String())
return false
}
// Verify signature
pubkey, err := dpostype.ConsensusCrypto.PubKeyFromBytes(val.PubKey)
if err != nil {
dposlog.Error("Error pubkey from bytes", "err", err)
return false
}
notifyTmp := &dpostype.Notify{DPosNotify: notify}
if err := notifyTmp.Verify(cs.validatorMgr.ChainID, pubkey); err != nil {
dposlog.Error("Verify vote signature failed", "err", err)
return false
}
return true
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"sync"
"time"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/common/merkle"
"github.com/33cn/chain33/queue"
drivers "github.com/33cn/chain33/system/consensus"
cty "github.com/33cn/chain33/system/dapp/coins/types"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
)
const tendermintVersion = "0.1.0"
var (
dposlog = log15.New("module", "dpos")
genesis string
genesisBlockTime int64
timeoutCheckConnections int32 = 1000
timeoutVoting int32 = 3000
timeoutWaitNotify int32 = 2000
createEmptyBlocks = false
createEmptyBlocksInterval int32 // second
validatorNodes = []string{"127.0.0.1:46656"}
isValidator = false
dposDelegateNum int64 = 3 //委托节点个数,从配置读取,以后可以根据投票结果来定
dposBlockInterval int64 = 3 //出块间隔,当前按3s
dposContinueBlockNum int64 = 6 //一个委托节点当选后,一次性持续出块数量
dposCycle = int64(dposDelegateNum * dposBlockInterval * dposContinueBlockNum)
dposPeriod = int64(dposBlockInterval * dposContinueBlockNum)
zeroHash [32]byte
)
func init() {
drivers.Reg("dpos", New)
drivers.QueryData.Register("dpos", &Client{})
}
// Client Tendermint implementation
type Client struct {
//config
*drivers.BaseClient
genesisDoc *ttypes.GenesisDoc // initial validator set
privValidator ttypes.PrivValidator
privKey crypto.PrivKey // local node's p2p key
pubKey string
csState *ConsensusState
crypto crypto.Crypto
node *Node
stopC chan struct{}
isDelegator bool
blockTime int64
once sync.Once
}
type subConfig struct {
Genesis string `json:"genesis"`
GenesisBlockTime int64 `json:"genesisBlockTime"`
TimeoutCheckConnections int32 `json:"timeoutCheckConnections"`
TimeoutVoting int32 `json:"timeoutVoting"`
TimeoutWaitNotify int32 `json:"timeoutWaitNotify"`
CreateEmptyBlocks bool `json:"createEmptyBlocks"`
CreateEmptyBlocksInterval int32 `json:"createEmptyBlocksInterval"`
ValidatorNodes []string `json:"validatorNodes"`
BlockInterval int64 `json:"blockInterval"`
ContinueBlockNum int64 `json:"continueBlockNum"`
IsValidator bool `json:"isValidator"`
}
func (client *Client) applyConfig(sub []byte) {
var subcfg subConfig
if sub != nil {
types.MustDecode(sub, &subcfg)
}
if subcfg.Genesis != "" {
genesis = subcfg.Genesis
}
if subcfg.GenesisBlockTime > 0 {
genesisBlockTime = subcfg.GenesisBlockTime
}
if subcfg.TimeoutCheckConnections > 0 {
timeoutCheckConnections = subcfg.TimeoutCheckConnections
}
if subcfg.TimeoutVoting > 0 {
timeoutVoting = subcfg.TimeoutVoting
}
if subcfg.TimeoutWaitNotify > 0 {
timeoutWaitNotify = subcfg.TimeoutWaitNotify
}
createEmptyBlocks = subcfg.CreateEmptyBlocks
if subcfg.CreateEmptyBlocksInterval > 0 {
createEmptyBlocksInterval = subcfg.CreateEmptyBlocksInterval
}
if len(subcfg.ValidatorNodes) > 0 {
validatorNodes = subcfg.ValidatorNodes
dposDelegateNum = int64(len(subcfg.ValidatorNodes))
}
if subcfg.BlockInterval > 0 {
dposBlockInterval = subcfg.BlockInterval
}
if subcfg.ContinueBlockNum > 0 {
dposContinueBlockNum = subcfg.ContinueBlockNum
}
dposCycle = dposDelegateNum * dposBlockInterval * dposContinueBlockNum
dposPeriod = dposBlockInterval * dposContinueBlockNum
if subcfg.CreateEmptyBlocks {
createEmptyBlocks = true
}
if subcfg.IsValidator {
isValidator = true
}
}
// New ...
func New(cfg *types.Consensus, sub []byte) queue.Module {
dposlog.Info("Start to create dpos client")
//init rand
ttypes.Init()
genDoc, err := ttypes.GenesisDocFromFile("./genesis.json")
if err != nil {
dposlog.Error("NewDPosClient", "msg", "GenesisDocFromFile failded", "error", err)
//return nil
}
cr, err := crypto.New(types.GetSignName("", types.ED25519))
if err != nil {
dposlog.Error("NewDPosClient", "err", err)
return nil
}
ttypes.ConsensusCrypto = cr
priv, err := cr.GenKey()
if err != nil {
dposlog.Error("NewDPosClient", "GenKey err", err)
return nil
}
privValidator := ttypes.LoadOrGenPrivValidatorFS("./priv_validator.json")
if privValidator == nil {
dposlog.Error("NewDPosClient create priv_validator file failed")
//return nil
}
ttypes.InitMessageMap()
pubkey := privValidator.GetPubKey().KeyString()
c := drivers.NewBaseClient(cfg)
client := &Client{
BaseClient: c,
genesisDoc: genDoc,
privValidator: privValidator,
privKey: priv,
pubKey: pubkey,
crypto: cr,
stopC: make(chan struct{}, 1),
isDelegator: false,
}
c.SetChild(client)
client.applyConfig(sub)
return client
}
// PrivValidator returns the Node's PrivValidator.
// XXX: for convenience only!
func (client *Client) PrivValidator() ttypes.PrivValidator {
return client.privValidator
}
// GenesisDoc returns the Node's GenesisDoc.
func (client *Client) GenesisDoc() *ttypes.GenesisDoc {
return client.genesisDoc
}
// Close TODO:may need optimize
func (client *Client) Close() {
client.node.Stop()
client.stopC <- struct{}{}
dposlog.Info("consensus dpos closed")
}
// SetQueueClient ...
func (client *Client) SetQueueClient(q queue.Client) {
client.InitClient(q, func() {
//call init block
//client.InitBlock()
})
go client.EventLoop()
go client.StartConsensus()
}
// DebugCatchup define whether catch up now
const DebugCatchup = false
// StartConsensus a routine that make the consensus start
func (client *Client) StartConsensus() {
//进入共识前先同步到最大高度
hint := time.NewTicker(5 * time.Second)
beg := time.Now()
OuterLoop:
for !DebugCatchup {
select {
case <-hint.C:
dposlog.Info("Still catching up max height......", "cost", time.Since(beg))
default:
if client.IsCaughtUp() {
dposlog.Info("This node has caught up max height")
break OuterLoop
}
time.Sleep(time.Second)
}
}
hint.Stop()
if !isValidator {
dposlog.Info("This node is not a validator,does not join the consensus, just syncs blocks from validators")
client.InitBlock()
return
}
var valMgr ValidatorMgr
valMgrTmp, err := MakeGenesisValidatorMgr(client.genesisDoc)
if err != nil {
dposlog.Error("StartConsensus", "msg", "MakeGenesisValidatorMgr failded", "error", err)
return
}
valMgr = valMgrTmp.Copy()
//todo 对于动态选举或者其他原因导致代理节点发生变化等情况,在后续增加处理 zzh
dposlog.Debug("Load Validator Manager finish", "state", valMgr)
dposlog.Info("StartConsensus", "validators", valMgr.Validators)
// Log whether this node is a delegator or an observer
if valMgr.Validators.HasAddress(client.privValidator.GetAddress()) {
dposlog.Info("This node is a delegator")
client.isDelegator = true
} else {
dposlog.Info("This node is not a delegator")
}
// Make ConsensusReactor
csState := NewConsensusState(client, valMgr)
client.csState = csState
csState.SetPrivValidator(client.privValidator, client.ValidatorIndex())
// Create & add listener
protocol, listeningAddress := "tcp", "0.0.0.0:36656"
node := NewNode(validatorNodes, protocol, listeningAddress, client.privKey, valMgr.ChainID, tendermintVersion, csState)
client.node = node
// 对于受托节点,才需要初始化区块,启动共识相关程序等,后续支持投票要做成动态切换的。
if client.isDelegator {
client.InitBlock()
node.Start()
}
//go client.CreateBlock()
}
// GetGenesisBlockTime ...
func (client *Client) GetGenesisBlockTime() int64 {
return genesisBlockTime
}
// CreateGenesisTx ...
func (client *Client) CreateGenesisTx() (ret []*types.Transaction) {
var tx types.Transaction
tx.Execer = []byte("coins")
tx.To = genesis
//gen payload
g := &cty.CoinsAction_Genesis{}
g.Genesis = &types.AssetsGenesis{}
g.Genesis.Amount = 1e8 * types.Coin
tx.Payload = types.Encode(&cty.CoinsAction{Value: g, Ty: cty.CoinsActionGenesis})
ret = append(ret, &tx)
return
}
// CheckBlock 暂不检查任何的交易
func (client *Client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
return nil
}
// ProcEvent ...
func (client *Client) ProcEvent(msg *queue.Message) bool {
return false
}
// CreateBlock a routine monitor whether some transactions available and tell client by available channel
func (client *Client) CreateBlock() {
lastBlock := client.GetCurrentBlock()
txs := client.RequestTx(int(types.GetP(lastBlock.Height+1).MaxTxNumber), nil)
if len(txs) == 0 {
block := client.GetCurrentBlock()
if createEmptyBlocks {
emptyBlock := &types.Block{}
emptyBlock.StateHash = block.StateHash
emptyBlock.ParentHash = block.Hash()
emptyBlock.Height = block.Height + 1
emptyBlock.Txs = nil
emptyBlock.TxHash = zeroHash[:]
emptyBlock.BlockTime = client.blockTime
err := client.WriteBlock(lastBlock.StateHash, emptyBlock)
//判断有没有交易是被删除的,这类交易要从mempool 中删除
if err != nil {
return
}
} else {
dposlog.Info("Ignore to create new Block for no tx in mempool", "Height", block.Height + 1)
}
return
}
//check dup
txs = client.CheckTxDup(txs, client.GetCurrentHeight())
var newblock types.Block
newblock.ParentHash = lastBlock.Hash()
newblock.Height = lastBlock.Height + 1
client.AddTxsToBlock(&newblock, txs)
//
newblock.Difficulty = types.GetP(0).PowLimitBits
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
newblock.BlockTime = client.blockTime
err := client.WriteBlock(lastBlock.StateHash, &newblock)
//判断有没有交易是被删除的,这类交易要从mempool 中删除
if err != nil {
return
}
}
// StopC stop client
func (client *Client) StopC() <-chan struct{} {
return client.stopC
}
// CheckTxDup check transactions that duplicate
func (client *Client) CheckTxDup(txs []*types.Transaction, height int64) (transactions []*types.Transaction) {
cacheTxs := types.TxsToCache(txs)
var err error
cacheTxs, err = util.CheckTxDup(client.GetQueueClient(), cacheTxs, height)
if err != nil {
return txs
}
return types.CacheToTxs(cacheTxs)
}
func (client *Client) SetBlockTime(blockTime int64) {
client.blockTime = blockTime
}
func (client *Client) ValidatorIndex() int{
if client.isDelegator {
index, _ := client.csState.validatorMgr.Validators.GetByAddress(client.privValidator.GetAddress())
return index
}
return -1
}
\ No newline at end of file
{"genesis_time":"0001-01-01T00:00:00Z","chain_id":"test-chain-Ep9EcD","validators":[{"pub_key":{"type":"ed25519","data":"220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19"},"name":""}],"app_hash":""}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"encoding/hex"
"fmt"
"math"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/33cn/chain33/common/crypto"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
)
const (
numBufferedConnections = 10
maxNumPeers = 50
tryListenSeconds = 5
handshakeTimeout = 20 // * time.Second,
maxSendQueueSize = 1024
defaultSendTimeout = 60 * time.Second
//MaxMsgPacketPayloadSize define
MaxMsgPacketPayloadSize = 10 * 1024 * 1024
defaultDialTimeout = 3 * time.Second
dialRandomizerIntervalMilliseconds = 3000
// repeatedly try to reconnect for a few minutes
// ie. 5 * 20 = 100s
reconnectAttempts = 20
reconnectInterval = 5 * time.Second
// then move into exponential backoff mode for ~1day
// ie. 3**10 = 16hrs
reconnectBackOffAttempts = 10
reconnectBackOffBaseSeconds = 3
minReadBufferSize = 1024
minWriteBufferSize = 65536
)
// Parallel method
func Parallel(tasks ...func()) {
var wg sync.WaitGroup
wg.Add(len(tasks))
for _, task := range tasks {
go func(task func()) {
task()
wg.Done()
}(task)
}
wg.Wait()
}
// GenAddressByPubKey method
func GenAddressByPubKey(pubkey crypto.PubKey) []byte {
//must add 3 bytes ahead to make compatibly
typeAddr := append([]byte{byte(0x01), byte(0x01), byte(0x20)}, pubkey.Bytes()...)
return crypto.Ripemd160(typeAddr)
}
// IP2IPPort struct
type IP2IPPort struct {
mutex sync.RWMutex
mapList map[string]string
}
// NewMutexMap method
func NewMutexMap() *IP2IPPort {
return &IP2IPPort{
mapList: make(map[string]string),
}
}
// Has method
func (ipp *IP2IPPort) Has(ip string) bool {
ipp.mutex.RLock()
defer ipp.mutex.RUnlock()
_, ok := ipp.mapList[ip]
return ok
}
// Set method
func (ipp *IP2IPPort) Set(ip string, ipport string) {
ipp.mutex.Lock()
defer ipp.mutex.Unlock()
ipp.mapList[ip] = ipport
}
// Delete method
func (ipp *IP2IPPort) Delete(ip string) {
ipp.mutex.Lock()
defer ipp.mutex.Unlock()
delete(ipp.mapList, ip)
}
// NodeInfo struct
type NodeInfo struct {
ID ID `json:"id"`
Network string `json:"network"`
Version string `json:"version"`
IP string `json:"ip,omitempty"`
}
// Node struct
type Node struct {
listener net.Listener
connections chan net.Conn
privKey crypto.PrivKey
Network string
Version string
ID ID
IP string //get ip from connect to ourself
localIPs map[string]net.IP
peerSet *PeerSet
dialing *IP2IPPort
reconnecting *IP2IPPort
seeds []string
protocol string
lAddr string
state *ConsensusState
broadcastChannel chan MsgInfo
started uint32 // atomic
stopped uint32 // atomic
quit chan struct{}
}
// NewNode method
func NewNode(seeds []string, protocol string, lAddr string, privKey crypto.PrivKey, network string, version string, state *ConsensusState) *Node {
address := GenAddressByPubKey(privKey.PubKey())
node := &Node{
peerSet: NewPeerSet(),
seeds: seeds,
protocol: protocol,
lAddr: lAddr,
connections: make(chan net.Conn, numBufferedConnections),
privKey: privKey,
Network: network,
Version: version,
ID: ID(hex.EncodeToString(address)),
dialing: NewMutexMap(),
reconnecting: NewMutexMap(),
broadcastChannel: make(chan MsgInfo, maxSendQueueSize),
state: state,
localIPs: make(map[string]net.IP),
}
state.SetOurID(node.ID)
state.SetBroadcastChannel(node.broadcastChannel)
localIPs := getNaiveExternalAddress(true)
if len(localIPs) > 0 {
for _, item := range localIPs {
node.localIPs[item.String()] = item
}
}
return node
}
// Start node
func (node *Node) Start() {
if atomic.CompareAndSwapUint32(&node.started, 0, 1) {
// Create listener
var listener net.Listener
var err error
for i := 0; i < tryListenSeconds; i++ {
listener, err = net.Listen(node.protocol, node.lAddr)
if err == nil {
break
} else if i < tryListenSeconds-1 {
time.Sleep(time.Second * 1)
}
}
if err != nil {
panic(err)
}
node.listener = listener
// Actual listener local IP & port
listenerIP, listenerPort := splitHostPort(listener.Addr().String())
dposlog.Info("Local listener", "ip", listenerIP, "port", listenerPort)
go node.listenRoutine()
for i := 0; i < len(node.seeds); i++ {
go func(i int) {
addr := node.seeds[i]
ip, _ := splitHostPort(addr)
_, ok := node.localIPs[ip]
if ok {
dposlog.Info("find our ip ", "ourip", ip)
node.IP = ip
return
}
randomSleep(0)
err := node.DialPeerWithAddress(addr)
if err != nil {
dposlog.Debug("Error dialing peer", "err", err)
}
}(i)
}
go node.StartConsensusRoutine()
go node.BroadcastRoutine()
//zzh go node.evidenceBroadcastRoutine()
}
}
// DialPeerWithAddress ...
func (node *Node) DialPeerWithAddress(addr string) error {
ip, _ := splitHostPort(addr)
node.dialing.Set(ip, addr)
defer node.dialing.Delete(ip)
return node.addOutboundPeerWithConfig(addr)
}
func (node *Node) addOutboundPeerWithConfig(addr string) error {
dposlog.Info("Dialing peer", "address", addr)
peerConn, err := newOutboundPeerConn(addr, node.privKey, node.StopPeerForError, node.state)
if err != nil {
go node.reconnectToPeer(addr)
return err
}
if err := node.addPeer(peerConn); err != nil {
peerConn.CloseConn()
return err
}
return nil
}
// Stop ...
func (node *Node) Stop() {
atomic.CompareAndSwapUint32(&node.stopped, 0, 1)
err := node.listener.Close()
if err != nil {
dposlog.Error("Close listener failed", "err", err)
}
if node.quit != nil {
close(node.quit)
}
// Stop peers
for _, peer := range node.peerSet.List() {
peer.Stop()
node.peerSet.Remove(peer)
}
//stop consensus
node.state.Stop()
}
// IsRunning ...
func (node *Node) IsRunning() bool {
return atomic.LoadUint32(&node.started) == 1 && atomic.LoadUint32(&node.stopped) == 0
}
func (node *Node) listenRoutine() {
for {
conn, err := node.listener.Accept()
if !node.IsRunning() {
break // Go to cleanup
}
// listener wasn't stopped,
// yet we encountered an error.
if err != nil {
panic(err)
}
go node.connectComming(conn)
}
// Cleanup
close(node.connections)
for range node.connections {
// Drain
}
}
// StartConsensusRoutine if peers reached the threshold start consensus routine
func (node *Node) StartConsensusRoutine() {
for {
//TODO:the peer count need be optimized
if node.peerSet.Size() >= 0 {
node.state.Start()
break
}
time.Sleep(1 * time.Second)
}
}
// BroadcastRoutine receive to broadcast
func (node *Node) BroadcastRoutine() {
for {
msg, ok := <-node.broadcastChannel
if !ok {
dposlog.Debug("broadcastChannel closed")
return
}
node.Broadcast(msg)
}
}
func (node *Node) connectComming(inConn net.Conn) {
maxPeers := maxNumPeers
if maxPeers <= node.peerSet.Size() {
dposlog.Debug("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", node.peerSet.Size(), "max", maxPeers)
return
}
// New inbound connection!
err := node.addInboundPeer(inConn)
if err != nil {
dposlog.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
return
}
}
func (node *Node) stopAndRemovePeer(peer Peer, reason interface{}) {
node.peerSet.Remove(peer)
peer.Stop()
}
// StopPeerForError called if error occurred
func (node *Node) StopPeerForError(peer Peer, reason interface{}) {
dposlog.Error("Stopping peer for error", "peer", peer, "err", reason)
addr, err := peer.RemoteAddr()
node.stopAndRemovePeer(peer, reason)
if peer.IsPersistent() {
if err == nil && addr != nil {
go node.reconnectToPeer(addr.String())
}
}
}
func (node *Node) addInboundPeer(conn net.Conn) error {
peerConn, err := newInboundPeerConn(conn, node.privKey, node.StopPeerForError, node.state)
if err != nil {
if er := conn.Close(); er != nil {
dposlog.Error("addInboundPeer close conn failed", "er", er)
}
return err
}
if err = node.addPeer(peerConn); err != nil {
peerConn.CloseConn()
return err
}
return nil
}
// addPeer checks the given peer's validity, performs a handshake, and adds the
// peer to the switch and to all registered reactors.
// NOTE: This performs a blocking handshake before the peer is added.
// NOTE: If error is returned, caller is responsible for calling peer.CloseConn()
func (node *Node) addPeer(pc *peerConn) error {
addr := pc.conn.RemoteAddr()
if err := node.FilterConnByAddr(addr); err != nil {
return err
}
remoteIP, rErr := pc.RemoteIP()
nodeinfo := NodeInfo{
ID: node.ID,
Network: node.Network,
Version: node.Version,
}
// Exchange NodeInfo on the conn
peerNodeInfo, err := pc.HandshakeTimeout(nodeinfo, handshakeTimeout*time.Second)
if err != nil {
return err
}
peerID := peerNodeInfo.ID
// ensure connection key matches self reported key
connID := pc.ID()
if peerID != connID {
return fmt.Errorf(
"nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
peerID,
connID,
)
}
// Avoid self
if node.ID == peerID {
return fmt.Errorf("Connect to self: %v", addr)
}
// Avoid duplicate
if node.peerSet.Has(peerID) {
return fmt.Errorf("Duplicate peer ID %v", peerID)
}
// Check for duplicate connection or peer info IP.
if rErr == nil && node.peerSet.HasIP(remoteIP) {
return fmt.Errorf("Duplicate peer IP %v", remoteIP)
} else if rErr != nil {
return fmt.Errorf("get remote ip failed:%v", rErr)
}
// Filter peer against ID white list
//if err := node.FilterConnByID(peerID); err != nil {
//return err
//}
// Check version, chain id
if err := node.CompatibleWith(peerNodeInfo); err != nil {
return err
}
dposlog.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
// All good. Start peer
if node.IsRunning() {
pc.SetTransferChannel(node.state.peerMsgQueue)
if err = node.startInitPeer(pc); err != nil {
return err
}
}
// Add the peer to .peers.
// We start it first so that a peer in the list is safe to Stop.
// It should not err since we already checked peers.Has().
if err := node.peerSet.Add(pc); err != nil {
return err
}
//node.metrics.Peers.Add(float64(1))
dposlog.Info("Added peer", "peer", pc.ip)
return nil
}
// Broadcast to peers in set
func (node *Node) Broadcast(msg MsgInfo) chan bool {
successChan := make(chan bool, len(node.peerSet.List()))
dposlog.Debug("Broadcast", "msgtype", msg.TypeID)
var wg sync.WaitGroup
for _, peer := range node.peerSet.List() {
wg.Add(1)
go func(peer Peer) {
defer wg.Done()
success := peer.Send(msg)
successChan <- success
}(peer)
}
go func() {
wg.Wait()
close(successChan)
}()
return successChan
}
func (node *Node) startInitPeer(peer *peerConn) error {
err := peer.Start() // spawn send/recv routines
if err != nil {
// Should never happen
dposlog.Error("Error starting peer", "peer", peer, "err", err)
return err
}
return nil
}
// FilterConnByAddr TODO:can make fileter by addr
func (node *Node) FilterConnByAddr(addr net.Addr) error {
return nil
}
// CompatibleWith one node by nodeInfo
func (node *Node) CompatibleWith(other NodeInfo) error {
iMajor, iMinor, _, iErr := splitVersion(node.Version)
oMajor, oMinor, _, oErr := splitVersion(other.Version)
// if our own version number is not formatted right, we messed up
if iErr != nil {
return iErr
}
// version number must be formatted correctly ("x.x.x")
if oErr != nil {
return oErr
}
// major version must match
if iMajor != oMajor {
return fmt.Errorf("Peer is on a different major version. Got %v, expected %v", oMajor, iMajor)
}
// minor version can differ
if iMinor != oMinor {
// ok
}
// nodes must be on the same network
if node.Network != other.Network {
return fmt.Errorf("Peer is on a different network. Got %v, expected %v", other.Network, node.Network)
}
return nil
}
func (node *Node) reconnectToPeer(addr string) {
host, _ := splitHostPort(addr)
if node.reconnecting.Has(host) {
return
}
node.reconnecting.Set(host, addr)
defer node.reconnecting.Delete(host)
start := time.Now()
dposlog.Info("Reconnecting to peer", "addr", addr)
for i := 0; i < reconnectAttempts; i++ {
if !node.IsRunning() {
return
}
ips, err := net.LookupIP(host)
if err != nil {
dposlog.Info("LookupIP failed", "host", host)
continue
}
if node.peerSet.HasIP(ips[0]) {
dposlog.Info("Reconnecting to peer exit, already connect to the peer", "peer", host)
return
}
err = node.DialPeerWithAddress(addr)
if err == nil {
return // success
}
dposlog.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
// sleep a set amount
randomSleep(reconnectInterval)
continue
}
dposlog.Error("Failed to reconnect to peer. Beginning exponential backoff",
"addr", addr, "elapsed", time.Since(start))
for i := 0; i < reconnectBackOffAttempts; i++ {
if !node.IsRunning() {
return
}
// sleep an exponentially increasing amount
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
time.Sleep(time.Duration(sleepIntervalSeconds) * time.Second)
err := node.DialPeerWithAddress(addr)
if err == nil {
return // success
}
dposlog.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
}
dposlog.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
}
//---------------------------------------------------------------------
func randomSleep(interval time.Duration) {
r := time.Duration(ttypes.RandInt63n(dialRandomizerIntervalMilliseconds)) * time.Millisecond
time.Sleep(r + interval)
}
func isIpv6(ip net.IP) bool {
v4 := ip.To4()
if v4 != nil {
return false
}
ipString := ip.String()
// Extra check just to be sure it's IPv6
return (strings.Contains(ipString, ":") && !strings.Contains(ipString, "."))
}
func getNaiveExternalAddress(defaultToIPv4 bool) []net.IP {
var ips []net.IP
addrs, err := net.InterfaceAddrs()
if err != nil {
panic(fmt.Sprintf("Could not fetch interface addresses: %v", err))
}
for _, a := range addrs {
ipnet, ok := a.(*net.IPNet)
if !ok {
continue
}
if defaultToIPv4 || !isIpv6(ipnet.IP) {
v4 := ipnet.IP.To4()
if v4 == nil || v4[0] == 127 {
// loopback
continue
}
} else if ipnet.IP.IsLoopback() {
// IPv6, check for loopback
continue
}
ips = append(ips, ipnet.IP)
}
return ips
}
func splitVersion(version string) (string, string, string, error) {
spl := strings.Split(version, ".")
if len(spl) != 3 {
return "", "", "", fmt.Errorf("Invalid version format %v", version)
}
return spl[0], spl[1], spl[2], nil
}
func splitHostPort(addr string) (host string, port int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
panic(err)
}
port, err = strconv.Atoi(portStr)
if err != nil {
panic(err)
}
return host, port
}
func dial(addr string) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", addr, defaultDialTimeout)
if err != nil {
return nil, err
}
return conn, nil
}
func newOutboundPeerConn(addr string, ourNodePrivKey crypto.PrivKey, onPeerError func(Peer, interface{}), state *ConsensusState) (*peerConn, error) {
conn, err := dial(addr)
if err != nil {
return &peerConn{}, fmt.Errorf("Error creating peer:%v", err)
}
pc, err := newPeerConn(conn, true, true, ourNodePrivKey, onPeerError, state)
if err != nil {
if cerr := conn.Close(); cerr != nil {
return &peerConn{}, fmt.Errorf("newPeerConn failed:%v, connection close failed:%v", err, cerr)
}
return &peerConn{}, err
}
return pc, nil
}
func newInboundPeerConn(
conn net.Conn,
ourNodePrivKey crypto.PrivKey,
onPeerError func(Peer, interface{}),
state *ConsensusState,
) (*peerConn, error) {
// TODO: issue PoW challenge
return newPeerConn(conn, false, false, ourNodePrivKey, onPeerError, state)
}
func newPeerConn(
rawConn net.Conn,
outbound, persistent bool,
ourNodePrivKey crypto.PrivKey,
onPeerError func(Peer, interface{}),
state *ConsensusState,
) (pc *peerConn, err error) {
conn := rawConn
// Set deadline for secret handshake
dl := time.Now().Add(handshakeTimeout * time.Second)
if err := conn.SetDeadline(dl); err != nil {
return pc, fmt.Errorf("Error setting deadline while encrypting connection:%v", err)
}
// Encrypt connection
conn, err = MakeSecretConnection(conn, ourNodePrivKey)
if err != nil {
return pc, fmt.Errorf("Error creating peer:%v", err)
}
// Only the information we already have
return &peerConn{
outbound: outbound,
persistent: persistent,
conn: conn,
onPeerError: onPeerError,
myState: state,
}, nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"bufio"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net"
"reflect"
"runtime/debug"
"sync"
"sync/atomic"
"time"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// ID is a hex-encoded crypto.Address
type ID string
// Messages in channels are chopped into smaller msgPackets for multiplexing.
type msgPacket struct {
TypeID byte
Bytes []byte
}
// MsgInfo struct
type MsgInfo struct {
TypeID byte
Msg proto.Message
PeerID ID
PeerIP string
}
// Peer interface
type Peer interface {
ID() ID
RemoteIP() (net.IP, error) // remote IP of the connection
RemoteAddr() (net.Addr, error)
IsOutbound() bool
IsPersistent() bool
Send(msg MsgInfo) bool
TrySend(msg MsgInfo) bool
Stop()
SetTransferChannel(chan MsgInfo)
//Set(string, interface{})
//Get(string) interface{}
}
type peerConn struct {
outbound bool
conn net.Conn // source connection
bufReader *bufio.Reader
bufWriter *bufio.Writer
persistent bool
ip net.IP
id ID
sendQueue chan MsgInfo
sendQueueSize int32
pongChannel chan struct{}
started uint32 //atomic
stopped uint32 // atomic
quit chan struct{}
waitQuit sync.WaitGroup
transferChannel chan MsgInfo
sendBuffer []byte
onPeerError func(Peer, interface{})
myState *ConsensusState
}
// PeerSet struct
type PeerSet struct {
mtx sync.Mutex
lookup map[ID]*peerSetItem
list []Peer
}
type peerSetItem struct {
peer Peer
index int
}
// NewPeerSet method
func NewPeerSet() *PeerSet {
return &PeerSet{
lookup: make(map[ID]*peerSetItem),
list: make([]Peer, 0, 256),
}
}
// Add adds the peer to the PeerSet.
// It returns an error carrying the reason, if the peer is already present.
func (ps *PeerSet) Add(peer Peer) error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
if ps.lookup[peer.ID()] != nil {
return fmt.Errorf("Duplicate peer ID %v", peer.ID())
}
index := len(ps.list)
// Appending is safe even with other goroutines
// iterating over the ps.list slice.
ps.list = append(ps.list, peer)
ps.lookup[peer.ID()] = &peerSetItem{peer, index}
return nil
}
// Has returns true if the set contains the peer referred to by this
// peerKey, otherwise false.
func (ps *PeerSet) Has(peerKey ID) bool {
ps.mtx.Lock()
_, ok := ps.lookup[peerKey]
ps.mtx.Unlock()
return ok
}
// HasIP returns true if the set contains the peer referred to by this IP
// address, otherwise false.
func (ps *PeerSet) HasIP(peerIP net.IP) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.hasIP(peerIP)
}
// hasIP does not acquire a lock so it can be used in public methods which
// already lock.
func (ps *PeerSet) hasIP(peerIP net.IP) bool {
for _, item := range ps.lookup {
if ip, err := item.peer.RemoteIP(); err == nil && ip.Equal(peerIP) {
return true
}
}
return false
}
// Size of list
func (ps *PeerSet) Size() int {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return len(ps.list)
}
// List returns the threadsafe list of peers.
func (ps *PeerSet) List() []Peer {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.list
}
// Remove discards peer by its Key, if the peer was previously memoized.
func (ps *PeerSet) Remove(peer Peer) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
item := ps.lookup[peer.ID()]
if item == nil {
return
}
index := item.index
// Create a new copy of the list but with one less item.
// (we must copy because we'll be mutating the list).
newList := make([]Peer, len(ps.list)-1)
copy(newList, ps.list)
// If it's the last peer, that's an easy special case.
if index == len(ps.list)-1 {
ps.list = newList
delete(ps.lookup, peer.ID())
return
}
// Replace the popped item with the last item in the old list.
lastPeer := ps.list[len(ps.list)-1]
lastPeerKey := lastPeer.ID()
lastPeerItem := ps.lookup[lastPeerKey]
newList[index] = lastPeer
lastPeerItem.index = index
ps.list = newList
delete(ps.lookup, peer.ID())
}
//-------------------------peer connection--------------------------------
func (pc *peerConn) ID() ID {
if len(pc.id) != 0 {
return pc.id
}
address := GenAddressByPubKey(pc.conn.(*SecretConnection).RemotePubKey())
pc.id = ID(hex.EncodeToString(address))
return pc.id
}
func (pc *peerConn) RemoteIP() (net.IP, error) {
if pc.ip != nil && len(pc.ip) > 0 {
return pc.ip, nil
}
// In test cases a conn could not be present at all or be an in-memory
// implementation where we want to return a fake ip.
if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" {
return nil, errors.New("connect is nil or just pipe")
}
host, _, err := net.SplitHostPort(pc.conn.RemoteAddr().String())
if err != nil {
panic(err)
}
ips, err := net.LookupIP(host)
if err != nil {
panic(err)
}
pc.ip = ips[0]
return pc.ip, nil
}
func (pc *peerConn) RemoteAddr() (net.Addr, error) {
if pc.conn == nil || pc.conn.RemoteAddr().String() == "pipe" {
return nil, errors.New("connect is nil or just pipe")
}
return pc.conn.RemoteAddr(), nil
}
func (pc *peerConn) SetTransferChannel(transferChannel chan MsgInfo) {
pc.transferChannel = transferChannel
}
func (pc *peerConn) CloseConn() {
err := pc.conn.Close() // nolint: errcheck
if err != nil {
dposlog.Error("peerConn CloseConn failed", "err", err)
}
}
func (pc *peerConn) HandshakeTimeout(
ourNodeInfo NodeInfo,
timeout time.Duration,
) (peerNodeInfo NodeInfo, err error) {
peerNodeInfo = NodeInfo{}
// Set deadline for handshake so we don't block forever on conn.ReadFull
if err := pc.conn.SetDeadline(time.Now().Add(timeout)); err != nil {
return peerNodeInfo, errors.Wrap(err, "Error setting deadline")
}
var err1 error
var err2 error
Parallel(
func() {
info, err1 := json.Marshal(ourNodeInfo)
if err1 != nil {
dposlog.Error("Peer handshake Marshal ourNodeInfo failed", "err", err1)
return
}
frame := make([]byte, 4)
binary.BigEndian.PutUint32(frame, uint32(len(info)))
_, err1 = pc.conn.Write(frame)
if err1 != nil {
dposlog.Error("Peer handshake write info size failed", "err", err1)
return
}
_, err1 = pc.conn.Write(info[:])
if err1 != nil {
dposlog.Error("Peer handshake write info failed", "err", err1)
return
}
},
func() {
readBuffer := make([]byte, 4)
_, err2 = io.ReadFull(pc.conn, readBuffer[:])
if err2 != nil {
dposlog.Error("Peer handshake read info size failed", "err", err1)
return
}
len := binary.BigEndian.Uint32(readBuffer)
readBuffer = make([]byte, len)
_, err2 = io.ReadFull(pc.conn, readBuffer[:])
if err2 != nil {
dposlog.Error("Peer handshake read info failed", "err", err1)
return
}
err2 = json.Unmarshal(readBuffer, &peerNodeInfo)
if err2 != nil {
dposlog.Error("Peer handshake Unmarshal failed", "err", err1)
return
}
dposlog.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
},
)
if err1 != nil {
return peerNodeInfo, errors.Wrap(err1, "Error during handshake/write")
}
if err2 != nil {
return peerNodeInfo, errors.Wrap(err2, "Error during handshake/read")
}
// Remove deadline
if err := pc.conn.SetDeadline(time.Time{}); err != nil {
return peerNodeInfo, errors.Wrap(err, "Error removing deadline")
}
return peerNodeInfo, nil
}
func (pc *peerConn) IsOutbound() bool {
return pc.outbound
}
func (pc *peerConn) IsPersistent() bool {
return pc.persistent
}
func (pc *peerConn) Send(msg MsgInfo) bool {
if !pc.IsRunning() {
return false
}
select {
case pc.sendQueue <- msg:
atomic.AddInt32(&pc.sendQueueSize, 1)
return true
case <-time.After(defaultSendTimeout):
dposlog.Error("send msg timeout", "peerip", msg.PeerIP, "msg", msg.Msg)
return false
}
}
func (pc *peerConn) TrySend(msg MsgInfo) bool {
if !pc.IsRunning() {
return false
}
select {
case pc.sendQueue <- msg:
atomic.AddInt32(&pc.sendQueueSize, 1)
return true
default:
return false
}
}
func (pc *peerConn) IsRunning() bool {
return atomic.LoadUint32(&pc.started) == 1 && atomic.LoadUint32(&pc.stopped) == 0
}
func (pc *peerConn) Start() error {
if atomic.CompareAndSwapUint32(&pc.started, 0, 1) {
if atomic.LoadUint32(&pc.stopped) == 1 {
dposlog.Error("peerConn already stoped can not start", "peerIP", pc.ip.String())
return nil
}
pc.bufReader = bufio.NewReaderSize(pc.conn, minReadBufferSize)
pc.bufWriter = bufio.NewWriterSize(pc.conn, minWriteBufferSize)
pc.pongChannel = make(chan struct{})
pc.sendQueue = make(chan MsgInfo, maxSendQueueSize)
pc.sendBuffer = make([]byte, 0, MaxMsgPacketPayloadSize)
pc.quit = make(chan struct{})
pc.waitQuit.Add(5) //sendRoutine, updateStateRoutine,gossipDataRoutine,gossipVotesRoutine,queryMaj23Routine
go pc.sendRoutine()
go pc.recvRoutine()
}
return nil
}
func (pc *peerConn) Stop() {
if atomic.CompareAndSwapUint32(&pc.stopped, 0, 1) {
if pc.quit != nil {
close(pc.quit)
dposlog.Info("peerConn stop quit wait", "peerIP", pc.ip.String())
pc.waitQuit.Wait()
dposlog.Info("peerConn stop quit wait finish", "peerIP", pc.ip.String())
pc.quit = nil
}
close(pc.sendQueue)
pc.transferChannel = nil
pc.CloseConn()
}
}
// Catch panics, usually caused by remote disconnects.
func (pc *peerConn) _recover() {
if r := recover(); r != nil {
stack := debug.Stack()
err := StackError{r, stack}
pc.stopForError(err)
}
}
func (pc *peerConn) stopForError(r interface{}) {
dposlog.Error("peerConn recovered panic", "error", r, "peer", pc.ip.String())
if pc.onPeerError != nil {
pc.onPeerError(pc, r)
}
pc.Stop()
}
func (pc *peerConn) sendRoutine() {
defer pc._recover()
FOR_LOOP:
for {
select {
case <-pc.quit:
pc.waitQuit.Done()
break FOR_LOOP
case msg := <-pc.sendQueue:
bytes, err := proto.Marshal(msg.Msg)
if err != nil {
dposlog.Error("peerConn sendroutine marshal data failed", "error", err)
pc.stopForError(err)
break FOR_LOOP
}
len := len(bytes)
bytelen := make([]byte, 4)
binary.BigEndian.PutUint32(bytelen, uint32(len))
pc.sendBuffer = pc.sendBuffer[:0]
pc.sendBuffer = append(pc.sendBuffer, msg.TypeID)
pc.sendBuffer = append(pc.sendBuffer, bytelen...)
pc.sendBuffer = append(pc.sendBuffer, bytes...)
if len+5 > MaxMsgPacketPayloadSize {
pc.sendBuffer = append(pc.sendBuffer, bytes[MaxMsgPacketPayloadSize-5:]...)
}
_, err = pc.bufWriter.Write(pc.sendBuffer[:len+5])
if err != nil {
dposlog.Error("peerConn sendroutine write data failed", "error", err)
pc.stopForError(err)
break FOR_LOOP
}
err = pc.bufWriter.Flush()
if err != nil {
dposlog.Error("peerConn sendroutine flush buffer failed", "error", err)
pc.stopForError(err)
break FOR_LOOP
}
case _, ok := <-pc.pongChannel:
if ok {
dposlog.Debug("Send Pong")
var pong [5]byte
pong[0] = ttypes.PacketTypePong
_, err := pc.bufWriter.Write(pong[:])
if err != nil {
dposlog.Error("peerConn sendroutine write pong failed", "error", err)
pc.stopForError(err)
break FOR_LOOP
}
} else {
pc.pongChannel = nil
}
}
}
}
func (pc *peerConn) recvRoutine() {
defer pc._recover()
FOR_LOOP:
for {
//typeID+msgLen+msg
var buf [5]byte
_, err := io.ReadFull(pc.bufReader, buf[:])
if err != nil {
dposlog.Error("Connection failed @ recvRoutine (reading byte)", "conn", pc, "err", err)
pc.stopForError(err)
break FOR_LOOP
}
pkt := msgPacket{}
pkt.TypeID = buf[0]
len := binary.BigEndian.Uint32(buf[1:])
if len > 0 {
buf2 := make([]byte, len)
_, err = io.ReadFull(pc.bufReader, buf2)
if err != nil {
dposlog.Error("Connection failed @ recvRoutine", "conn", pc, "err", err)
pc.stopForError(err)
panic(fmt.Sprintf("peerConn recvRoutine packetTypeMsg failed :%v", err))
}
pkt.Bytes = buf2
}
if pkt.TypeID == ttypes.PacketTypePong {
dposlog.Debug("Receive Pong")
} else if pkt.TypeID == ttypes.PacketTypePing {
dposlog.Debug("Receive Ping")
pc.pongChannel <- struct{}{}
} else {
if v, ok := ttypes.MsgMap[pkt.TypeID]; ok {
realMsg := reflect.New(v).Interface()
err := proto.Unmarshal(pkt.Bytes, realMsg.(proto.Message))
if err != nil {
dposlog.Error("peerConn recvRoutine Unmarshal data failed", "err", err)
continue
}
if pc.transferChannel != nil && (pkt.TypeID == ttypes.VoteID || pkt.TypeID == ttypes.VoteReplyID || pkt.TypeID == ttypes.NotifyID) {
pc.transferChannel <- MsgInfo{pkt.TypeID, realMsg.(proto.Message), pc.ID(), pc.ip.String()}
}
} else {
err := fmt.Errorf("Unknown message type %v", pkt.TypeID)
dposlog.Error("Connection failed @ recvRoutine", "conn", pc, "err", err)
pc.stopForError(err)
break FOR_LOOP
}
}
}
close(pc.pongChannel)
for range pc.pongChannel {
// Drain
}
}
// StackError struct
type StackError struct {
Err interface{}
Stack []byte
}
func (se StackError) String() string {
return fmt.Sprintf("Error: %v\nStack: %s", se.Err, se.Stack)
}
func (se StackError) Error() string {
return se.String()
}
{"address":"02A13174B92727C4902DB099E51A3339F48BD45E","pub_key":{"type":"ed25519","data":"220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19"},"priv_key":{"type":"ed25519","data":"B3DC4C0725884EBB7264B92F1D8D37584A64ADE1799D997EC64B4FE3973E08DE220ACBE680DF2473A0CB48987A00FCC1812F106A7390BE6B8E2D31122C992A19"}}
\ No newline at end of file
all:
sh ./create_protobuf.sh
#!/bin/sh
protoc --go_out=plugins=grpc:../types ./*.proto --proto_path=. --proto_path="../types/"
syntax = "proto3";
package types;
message VoteItem {
int32 votedNodeIndex = 1; //被投票的节点索引
bytes votedNodeAddress = 2; //被投票的节点地址
int64 cycleStart = 3; //大周期起始时间
int64 cycleStop = 4; //大周期终止时间
int64 periodStart = 5; //新节点负责出块的起始时间
int64 periodStop = 6; //新节点负责出块的终止时间
int64 height = 7; //新节点负责出块的起始高度
bytes voteId = 8; //选票ID
}
//DPosVote Dpos共识的节点投票,为达成共识用。
message DPosVote {
VoteItem voteItem = 1;
int64 voteTimestamp = 2; //发起投票的时间
int32 voterNodeIndex = 3; //投票节点索引
bytes voterNodeAddress = 4; //投票节点地址
bytes signature = 5; //投票者签名
}
message DPosVoteReply {
DPosVote vote = 1;
}
//DPosNotify Dpos委托节点出块周期结束时,通知其他节点进行高度确认及新节点投票。
message DPosNotify {
VoteItem vote = 1;
int64 heightStop = 2; //新节点负责出块的结束高度
bytes hashStop = 3; //新节点负责出块的结束hash
int64 notifyTimestamp = 4; //发起通知的时间
int32 notifyNodeIndex = 5; //通知节点的索引
bytes notifyNodeAddress= 6; //通知节点的地址
bytes signature = 7; //通知节点的签名
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package tendermint Uses nacl's secret_box to encrypt a net.Conn.
// It is (meant to be) an implementation of the STS protocol.
// Note we do not (yet) assume that a remote peer's pubkey
// is known ahead of time, and thus we are technically
// still vulnerable to MITM. (TODO!)
// See docs/sts-final.pdf for more info
package dpos
import (
"bytes"
crand "crypto/rand"
"crypto/sha256"
"encoding/binary"
"errors"
"io"
"net"
"time"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/plugin/plugin/consensus/dpos/types"
"golang.org/x/crypto/nacl/box"
"golang.org/x/crypto/nacl/secretbox"
"golang.org/x/crypto/ripemd160"
)
// 2 + 1024 == 1026 total frame size
const (
dataLenSize = 2 // uint16 to describe the length, is <= dataMaxSize
dataMaxSize = 1024
totalFrameSize = dataMaxSize + dataLenSize
sealedFrameSize = totalFrameSize + secretbox.Overhead
authSigMsgSize = (32) + (64)
) // fixed size (length prefixed) byte arrays
// SecretConnection Implements net.Conn
type SecretConnection struct {
conn io.ReadWriteCloser
recvBuffer []byte
recvNonce *[24]byte
sendNonce *[24]byte
remPubKey crypto.PubKey
shrSecret *[32]byte // shared secret
}
// MakeSecretConnection Performs handshake and returns a new authenticated SecretConnection.
// Returns nil if error in handshake.
// Caller should call conn.Close()
// See docs/sts-final.pdf for more information.
func MakeSecretConnection(conn io.ReadWriteCloser, locPrivKey crypto.PrivKey) (*SecretConnection, error) {
locPubKey := locPrivKey.PubKey()
// Generate ephemeral keys for perfect forward secrecy.
locEphPub, locEphPriv := genEphKeys()
// Write local ephemeral pubkey and receive one too.
// NOTE: every 32-byte string is accepted as a Curve25519 public key
// (see DJB's Curve25519 paper: http://cr.yp.to/ecdh/curve25519-20060209.pdf)
remEphPub, err := shareEphPubKey(conn, locEphPub)
if err != nil {
return nil, err
}
// Compute common shared secret.
shrSecret := computeSharedSecret(remEphPub, locEphPriv)
// Sort by lexical order.
loEphPub, hiEphPub := sort32(locEphPub, remEphPub)
// Check if the local ephemeral public key
// was the least, lexicographically sorted.
locIsLeast := bytes.Equal(locEphPub[:], loEphPub[:])
// Generate nonces to use for secretbox.
recvNonce, sendNonce := genNonces(loEphPub, hiEphPub, locIsLeast)
// Generate common challenge to sign.
challenge := genChallenge(loEphPub, hiEphPub)
// Construct SecretConnection.
sc := &SecretConnection{
conn: conn,
recvBuffer: nil,
recvNonce: recvNonce,
sendNonce: sendNonce,
shrSecret: shrSecret,
}
// Sign the challenge bytes for authentication.
locSignature := signChallenge(challenge, locPrivKey)
// Share (in secret) each other's pubkey & challenge signature
authSigMsg, err := shareAuthSignature(sc, locPubKey, locSignature)
if err != nil {
return nil, err
}
remPubKey, remSignature := authSigMsg.Key, authSigMsg.Sig
if !remPubKey.VerifyBytes(challenge[:], remSignature) {
return nil, errors.New("Challenge verification failed")
}
// We've authorized.
sc.remPubKey = remPubKey
return sc, nil
}
// RemotePubKey Returns authenticated remote pubkey
func (sc *SecretConnection) RemotePubKey() crypto.PubKey {
return sc.remPubKey
}
// Writes encrypted frames of `sealedFrameSize`
// CONTRACT: data smaller than dataMaxSize is read atomically.
func (sc *SecretConnection) Write(data []byte) (n int, err error) {
for 0 < len(data) {
var frame = make([]byte, totalFrameSize)
var chunk []byte
if dataMaxSize < len(data) {
chunk = data[:dataMaxSize]
data = data[dataMaxSize:]
} else {
chunk = data
data = nil
}
chunkLength := len(chunk)
binary.BigEndian.PutUint16(frame, uint16(chunkLength))
copy(frame[dataLenSize:], chunk)
// encrypt the frame
var sealedFrame = make([]byte, sealedFrameSize)
secretbox.Seal(sealedFrame[:0], frame, sc.sendNonce, sc.shrSecret)
// fmt.Printf("secretbox.Seal(sealed:%X,sendNonce:%X,shrSecret:%X\n", sealedFrame, sc.sendNonce, sc.shrSecret)
incr2Nonce(sc.sendNonce)
// end encryption
_, err := sc.conn.Write(sealedFrame)
if err != nil {
return n, err
}
n += len(chunk)
}
return
}
// CONTRACT: data smaller than dataMaxSize is read atomically.
func (sc *SecretConnection) Read(data []byte) (n int, err error) {
if 0 < len(sc.recvBuffer) {
count := copy(data, sc.recvBuffer)
sc.recvBuffer = sc.recvBuffer[count:]
return
}
sealedFrame := make([]byte, sealedFrameSize)
_, err = io.ReadFull(sc.conn, sealedFrame)
if err != nil {
return
}
// decrypt the frame
var frame = make([]byte, totalFrameSize)
// fmt.Printf("secretbox.Open(sealed:%X,recvNonce:%X,shrSecret:%X\n", sealedFrame, sc.recvNonce, sc.shrSecret)
_, ok := secretbox.Open(frame[:0], sealedFrame, sc.recvNonce, sc.shrSecret)
if !ok {
return n, errors.New("Failed to decrypt SecretConnection")
}
incr2Nonce(sc.recvNonce)
// end decryption
var chunkLength = binary.BigEndian.Uint16(frame) // read the first two bytes
if chunkLength > dataMaxSize {
return 0, errors.New("chunkLength is greater than dataMaxSize")
}
var chunk = frame[dataLenSize : dataLenSize+chunkLength]
n = copy(data, chunk)
sc.recvBuffer = chunk[n:]
return
}
// Close Implements net.Conn
func (sc *SecretConnection) Close() error { return sc.conn.Close() }
// LocalAddr ...
func (sc *SecretConnection) LocalAddr() net.Addr { return sc.conn.(net.Conn).LocalAddr() }
// RemoteAddr ...
func (sc *SecretConnection) RemoteAddr() net.Addr { return sc.conn.(net.Conn).RemoteAddr() }
// SetDeadline ...
func (sc *SecretConnection) SetDeadline(t time.Time) error { return sc.conn.(net.Conn).SetDeadline(t) }
// SetReadDeadline ...
func (sc *SecretConnection) SetReadDeadline(t time.Time) error {
return sc.conn.(net.Conn).SetReadDeadline(t)
}
// SetWriteDeadline ...
func (sc *SecretConnection) SetWriteDeadline(t time.Time) error {
return sc.conn.(net.Conn).SetWriteDeadline(t)
}
func genEphKeys() (ephPub, ephPriv *[32]byte) {
var err error
ephPub, ephPriv, err = box.GenerateKey(crand.Reader)
if err != nil {
types.PanicCrisis("Could not generate ephemeral keypairs")
}
return
}
func shareEphPubKey(conn io.ReadWriter, locEphPub *[32]byte) (remEphPub *[32]byte, err error) {
var err1, err2 error
Parallel(
func() {
_, err1 = conn.Write(locEphPub[:])
},
func() {
remEphPub = new([32]byte)
_, err2 = io.ReadFull(conn, remEphPub[:])
},
)
if err1 != nil {
return nil, err1
}
if err2 != nil {
return nil, err2
}
return remEphPub, nil
}
func computeSharedSecret(remPubKey, locPrivKey *[32]byte) (shrSecret *[32]byte) {
shrSecret = new([32]byte)
box.Precompute(shrSecret, remPubKey, locPrivKey)
return
}
func sort32(foo, bar *[32]byte) (lo, hi *[32]byte) {
if bytes.Compare(foo[:], bar[:]) < 0 {
lo = foo
hi = bar
} else {
lo = bar
hi = foo
}
return
}
func genNonces(loPubKey, hiPubKey *[32]byte, locIsLo bool) (recvNonce, sendNonce *[24]byte) {
nonce1 := hash24(append(loPubKey[:], hiPubKey[:]...))
nonce2 := new([24]byte)
copy(nonce2[:], nonce1[:])
nonce2[len(nonce2)-1] ^= 0x01
if locIsLo {
recvNonce = nonce1
sendNonce = nonce2
} else {
recvNonce = nonce2
sendNonce = nonce1
}
return
}
func genChallenge(loPubKey, hiPubKey *[32]byte) (challenge *[32]byte) {
return hash32(append(loPubKey[:], hiPubKey[:]...))
}
func signChallenge(challenge *[32]byte, locPrivKey crypto.PrivKey) (signature crypto.Signature) {
signature = locPrivKey.Sign(challenge[:])
return
}
type authSigMessage struct {
Key crypto.PubKey
Sig crypto.Signature
}
func shareAuthSignature(sc io.ReadWriter, pubKey crypto.PubKey, signature crypto.Signature) (*authSigMessage, error) {
var recvMsg authSigMessage
var err1, err2 error
Parallel(
func() {
msgByte := make([]byte, len(pubKey.Bytes())+len(signature.Bytes()))
copy(msgByte, pubKey.Bytes())
copy(msgByte[len(pubKey.Bytes()):], signature.Bytes())
_, err1 = sc.Write(msgByte)
},
func() {
readBuffer := make([]byte, authSigMsgSize)
_, err2 = io.ReadFull(sc, readBuffer)
if err2 != nil {
return
}
//n := int(0) // not used.
//recvMsg = wire.ReadBinary(authSigMessage{}, bytes.NewBuffer(readBuffer), authSigMsgSize, &n, &err2).(authSigMessage)
//secret.Info("shareAuthSignature", "readBuffer", readBuffer)
recvMsg.Key, err2 = types.ConsensusCrypto.PubKeyFromBytes(readBuffer[:32])
if err2 != nil {
return
}
recvMsg.Sig, err2 = types.ConsensusCrypto.SignatureFromBytes(readBuffer[32:])
if err2 != nil {
return
}
})
if err1 != nil {
return nil, err1
}
if err2 != nil {
return nil, err2
}
return &recvMsg, nil
}
//--------------------------------------------------------------------------------
// sha256
func hash32(input []byte) (res *[32]byte) {
hasher := sha256.New()
_, err := hasher.Write(input) // nolint: errcheck, gas
if err != nil {
panic(err)
}
resSlice := hasher.Sum(nil)
res = new([32]byte)
copy(res[:], resSlice)
return
}
// We only fill in the first 20 bytes with ripemd160
func hash24(input []byte) (res *[24]byte) {
hasher := ripemd160.New()
_, err := hasher.Write(input) // nolint: errcheck, gas
if err != nil {
panic(err)
}
resSlice := hasher.Sum(nil)
res = new([24]byte)
copy(res[:], resSlice)
return
}
// increment nonce big-endian by 2 with wraparound.
func incr2Nonce(nonce *[24]byte) {
incrNonce(nonce)
incrNonce(nonce)
}
// increment nonce big-endian by 1 with wraparound.
func incrNonce(nonce *[24]byte) {
for i := 23; 0 <= i; i-- {
nonce[i]++
if nonce[i] != 0 {
return
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"bytes"
"encoding/json"
"github.com/outbrain/golib/math"
"time"
"github.com/33cn/chain33/common/crypto"
dpostype "github.com/33cn/plugin/plugin/consensus/dpos/types"
"github.com/33cn/chain33/common"
)
var (
InitStateType = 1
VotingStateType = 2
VotedStateType = 3
WaitNotifyStateType = 4
StateTypeMapping = map[int] string { InitStateType: "InitState", VotingStateType: "VotingState", VotedStateType: "VotedState", WaitNotifyStateType: "WaitNotifyState"}
)
type DPosTask struct {
nodeId int64
cycleStart int64
cycleStop int64
periodStart int64
periodStop int64
blockStart int64
blockStop int64
}
func DecideTaskByTime(now int64) (task DPosTask){
task.nodeId = now % dposCycle / dposPeriod
task.cycleStart = now - now % dposCycle
task.cycleStop = task.cycleStart + dposCycle - 1
task.periodStart = task.cycleStart + task.nodeId * dposBlockInterval * dposContinueBlockNum
task.periodStop = task.periodStart + dposPeriod - 1
task.blockStart = task.periodStart + now % dposCycle % dposPeriod / dposBlockInterval * dposBlockInterval
task.blockStop = task.blockStart + dposBlockInterval - 1
return task
}
// DposState is the base class of dpos state machine, it defines some interfaces.
type DposState interface {
timeOut(cs *ConsensusState)
sendVote(cs *ConsensusState, vote * dpostype.DPosVote)
recvVote(cs *ConsensusState, vote * dpostype.DPosVote)
sendVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply)
recvVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply)
sendNotify(cs *ConsensusState, notify * dpostype.DPosNotify)
recvNotify(cs *ConsensusState, notify * dpostype.DPosNotify)
}
// InitState is the initial state of dpos state machine
type InitState struct {
}
func (init *InitState)timeOut(cs *ConsensusState) {
//if available noes < 2/3, don't change the state to voting.
connections := cs.client.node.peerSet.Size()
validators := cs.validatorMgr.Validators.Size()
if connections == 0 || connections < (validators * 2 / 3 -1) {
dposlog.Error("InitState timeout but available nodes less than 2/3,waiting for more connections", "connections", connections, "validators", validators)
cs.ClearVotes()
//设定超时时间,超时后再检查链接数量
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
} else {
//获得当前高度
height := cs.client.GetCurrentHeight()
now := time.Now().Unix()
if cs.lastMyVote != nil && math.AbsInt64(now - cs.lastMyVote.VoteItem.PeriodStop) <= 1 {
now += 2
}
//计算当前时间,属于哪一个周期,应该哪一个节点出块,应该出块的高度
task := DecideTaskByTime(now)
addr, validator := cs.validatorMgr.Validators.GetByIndex(int(task.nodeId))
if addr == nil && validator == nil {
dposlog.Error("Address and Validator is nil", "node index", task.nodeId, "now", now, "cycle", dposCycle, "period", dposPeriod)
//cs.SetState(InitStateObj)
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return
}
//生成vote, 对于vote进行签名
voteItem := &dpostype.VoteItem{
VotedNodeAddress: addr,
VotedNodeIndex: int32(task.nodeId),
CycleStart: task.cycleStart,
CycleStop: task.cycleStop,
PeriodStart: task.periodStart,
PeriodStop: task.periodStop,
Height: height + 1,
}
encode, err := json.Marshal(voteItem)
if err != nil {
panic("Marshal vote failed.")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return
}
voteItem.VoteId = crypto.Ripemd160(encode)
index := -1
for i := 0; i < cs.validatorMgr.Validators.Size(); i ++ {
if bytes.Compare(cs.validatorMgr.Validators.Validators[i].Address, cs.privValidator.GetAddress()) == 0 {
index = i
break
}
}
if index == -1 {
panic("This node's address is not exist in Validators.")
}
vote := &dpostype.Vote{DPosVote: &dpostype.DPosVote{
VoteItem: voteItem,
VoteTimestamp: now,
VoterNodeAddress: cs.privValidator.GetAddress(),
VoterNodeIndex: int32(index),
},
}
if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil {
dposlog.Error("SignVote failed", "vote", vote.String())
//cs.SetState(InitStateObj)
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
return
}
vote2 := *vote.DPosVote
cs.AddVotes(&vote2)
cs.SetMyVote(&vote2)
dposlog.Info("Available nodes equal or more than 2/3,change state to VotingState", "connections", connections, "validators", validators)
cs.SetState(VotingStateObj)
dposlog.Info("Change state.", "from", "InitState", "to", "VotingState")
//通过node发送p2p消息到其他节点
cs.dposState.sendVote(cs, vote.DPosVote)
cs.scheduleDPosTimeout(time.Duration(timeoutVoting) * time.Millisecond, VotingStateType)
//处理之前缓存的投票信息
for i := 0; i < len(cs.cachedVotes); i++ {
cs.dposState.recvVote(cs, cs.cachedVotes[i])
}
cs.ClearCachedVotes()
}
}
func (init *InitState)sendVote(cs *ConsensusState, vote * dpostype.DPosVote) {
dposlog.Info("InitState does not support sendVote,so do nothing")
}
func (init *InitState)recvVote(cs *ConsensusState, vote * dpostype.DPosVote) {
dposlog.Info("InitState recvVote ,add it and will handle it later.")
//cs.AddVotes(vote)
cs.CacheVotes(vote)
}
func (init *InitState)sendVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply){
dposlog.Info("InitState don't support sendVoteReply,so do nothing")
}
func (init *InitState)recvVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply){
dposlog.Info("InitState recv Vote reply,ignore it.")
}
func (init *InitState)sendNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("InitState does not support sendNotify,so do nothing")
}
func (init *InitState)recvNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("InitState recvNotify")
//zzh:需要增加对Notify的处理,可以考虑记录已经确认过的出快记录
cs.SetNotify(notify)
}
// VotingState is the voting state of dpos state machine until timeout or get an agreement by votes.
type VotingState struct {
}
func (voting *VotingState)timeOut(cs *ConsensusState) {
dposlog.Info("VotingState timeout but don't get an agreement. change state to InitState")
//清理掉之前的选票记录,从初始状态重新开始
cs.ClearVotes()
cs.ClearCachedVotes()
cs.SetState(InitStateObj)
dposlog.Info("Change state because of timeOut.", "from", "VotingState", "to", "InitState")
//由于连接多数情况下正常,快速触发InitState的超时处理
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
}
func (voting *VotingState)sendVote(cs *ConsensusState,vote * dpostype.DPosVote) {
cs.broadcastChannel <- MsgInfo{TypeID: dpostype.VoteID, Msg: vote, PeerID: cs.ourID, PeerIP: ""}
}
func (voting *VotingState)recvVote(cs *ConsensusState,vote * dpostype.DPosVote) {
dposlog.Info("VotingState get a vote", "VotedNodeIndex", vote.VoteItem.VotedNodeIndex,
"VotedNodeAddress", common.ToHex(vote.VoteItem.VotedNodeAddress),
"CycleStart", vote.VoteItem.CycleStart,
"CycleStop", vote.VoteItem.CycleStop,
"PeriodStart", vote.VoteItem.PeriodStart,
"periodStop", vote.VoteItem.PeriodStop,
"Height", vote.VoteItem.Height,
"VoteId", common.ToHex(vote.VoteItem.VoteId),
"VoteTimestamp", vote.VoteTimestamp,
"VoterNodeIndex", vote.VoterNodeIndex,
"VoterNodeAddress", common.ToHex(vote.VoterNodeAddress),
"Signature", common.ToHex(vote.Signature),
"localNodeIndex", cs.client.ValidatorIndex(),"now", time.Now().Unix())
if !cs.VerifyVote(vote) {
dposlog.Info("VotingState verify vote failed")
return
}
cs.AddVotes(vote)
result, voteItem := cs.CheckVotes()
if result == voteSuccess {
dposlog.Info("VotingState get 2/3 result", "final vote:", voteItem.String())
dposlog.Info("VotingState change state to VotedState")
//切换状态
cs.SetState(VotedStateObj)
dposlog.Info("Change state because of check votes successfully.", "from", "VotingState", "to", "VotedState")
cs.SetCurrentVote(voteItem)
//1s后检查是否出块,是否需要重新投票
cs.scheduleDPosTimeout(time.Millisecond * 500, VotedStateType)
} else if result == continueToVote {
dposlog.Info("VotingState get a vote, but don't get an agreement,waiting for new votes...")
} else {
dposlog.Info("VotingState get a vote, but don't get an agreement,vote fail,abort voting")
//清理掉之前的选票记录,从初始状态重新开始
cs.ClearVotes()
cs.SetState(InitStateObj)
dposlog.Info("Change state because of vote failed.", "from", "VotingState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
}
}
func (voting *VotingState)sendVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply){
dposlog.Info("VotingState don't support sendVoteReply,so do nothing")
//cs.broadcastChannel <- MsgInfo{TypeID: dpostype.VoteReplyID, Msg: reply, PeerID: cs.ourID, PeerIP: ""}
}
func (voting *VotingState)recvVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply){
dposlog.Info("VotingState recv Vote reply")
voting.recvVote(cs, reply.Vote)
}
func (voting *VotingState)sendNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("VotingState does not support sendNotify,so do nothing")
}
func (voting *VotingState)recvNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("VotingState does not support recvNotify,so do nothing")
}
// VotingState is the voting state of dpos state machine until timeout or get an agreement by votes.
type VotedState struct {
}
func (voted *VotedState)timeOut(cs *ConsensusState) {
now := time.Now().Unix()
block := cs.client.GetCurrentBlock()
task := DecideTaskByTime(now)
if bytes.Equal(cs.privValidator.GetAddress(), cs.currentVote.VotedNodeAddress) {
//当前节点为出块节点
if now >= cs.currentVote.PeriodStop {
//当前时间超过了节点切换时间,需要进行重新投票
dposlog.Info("VotedState timeOut over periodStop.", "periodStop", cs.currentVote.PeriodStop)
//当前时间超过了节点切换时间,需要进行重新投票
notify := &dpostype.Notify{
DPosNotify: &dpostype.DPosNotify{
Vote: cs.currentVote,
HeightStop: block.Height,
HashStop: block.Hash(),
NotifyTimestamp: now,
NotifyNodeAddress: cs.privValidator.GetAddress(),
NotifyNodeIndex: int32(cs.privValidatorIndex),
},
}
dposlog.Info("Send notify.", "HeightStop", notify.HeightStop, "HashStop", common.ToHex(notify.HashStop))
if err := cs.privValidator.SignNotify(cs.validatorMgr.ChainID, notify); err != nil {
dposlog.Error("SignNotify failed", "notify", notify.String())
cs.SaveVote()
cs.SaveMyVote()
cs.ClearVotes()
cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return
}
cs.SaveVote()
cs.SaveMyVote()
cs.SaveNotify()
notify2 := *notify
cs.SetNotify(notify2.DPosNotify)
cs.dposState.sendNotify(cs, notify.DPosNotify)
cs.ClearVotes()
cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return
} else {
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height + 1 < cs.currentVote.Height{
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType)
return
}
//当前时间未到节点切换时间,则继续进行出块判断
if block.BlockTime >= task.blockStop {
//已出块,或者时间落后了。
dposlog.Info("VotedState timeOut but block already is generated.", "blocktime", block.BlockTime, "blockStop", task.blockStop, "now", now)
cs.scheduleDPosTimeout(time.Second * 1, VotedStateType)
return
} else if block.BlockTime < task.blockStart {
//本出块周期尚未出块,则进行出块
if task.blockStop-now <= 1 {
dposlog.Info("Create new block.", "height", block.Height + 1)
cs.client.SetBlockTime(task.blockStop)
cs.client.CreateBlock()
cs.scheduleDPosTimeout(time.Millisecond * 500, VotedStateType)
return
} else {
dposlog.Info("Wait time to create block near blockStop.", "waittime", task.blockStop-now-1)
//cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop - now - 1), VotedStateType)
cs.scheduleDPosTimeout(time.Millisecond * 500, VotedStateType)
return
}
} else {
//本周期已经出块
dposlog.Info("Wait to next block cycle.", "waittime", task.blockStop-now+1)
//cs.scheduleDPosTimeout(time.Second * time.Duration(task.blockStop-now+1), VotedStateType)
cs.scheduleDPosTimeout(time.Millisecond * 500, VotedStateType)
return
}
}
} else{
dposlog.Info("This node is not current owner.", "current owner index", cs.currentVote.VotedNodeIndex, "this node index", cs.client.ValidatorIndex())
//非当前出块节点,如果到了切换出块节点的时间,则进行状态切换,进行投票
if now >= cs.currentVote.PeriodStop {
//当前时间超过了节点切换时间,需要进行重新投票
cs.SaveVote()
cs.SaveMyVote()
cs.ClearVotes()
cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "WaitNotifyState")
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify) * time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify)
}
return
} else {
//设置超时时间
dposlog.Info("wait until change state.", "waittime", cs.currentVote.PeriodStop - now + 1)
cs.scheduleDPosTimeout(time.Second * time.Duration(cs.currentVote.PeriodStop - now + 1), VotedStateType)
return
}
}
}
func (voted *VotedState)sendVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply) {
dposlog.Info("VotedState sendVoteReply")
cs.broadcastChannel <- MsgInfo{TypeID: dpostype.VoteReplyID, Msg: reply, PeerID: cs.ourID, PeerIP: ""}
}
func (voted *VotedState)recvVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply) {
dposlog.Info("VotedState recv Vote reply", "from index", reply.Vote.VoterNodeIndex, "local index", cs.privValidatorIndex)
cs.AddVotes(reply.Vote)
}
func (voted *VotedState)sendVote(cs *ConsensusState,vote * dpostype.DPosVote) {
dposlog.Info("VotedState does not support sendVote,so do nothing")
}
func (voted *VotedState)recvVote(cs *ConsensusState,vote * dpostype.DPosVote) {
dposlog.Info("VotedState recv Vote, will reply it", "from index", vote.VoterNodeIndex, "local index", cs.privValidatorIndex)
if cs.currentVote.PeriodStart >= vote.VoteItem.PeriodStart {
vote2 := *cs.myVote
reply := &dpostype.DPosVoteReply{Vote: &vote2}
cs.dposState.sendVoteReply(cs, reply)
} else {
dposlog.Info("VotedState recv future Vote, will cache it")
cs.CacheVotes(vote)
}
}
func (voted *VotedState)sendNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
cs.broadcastChannel <- MsgInfo{TypeID: dpostype.NotifyID, Msg: notify, PeerID: cs.ourID, PeerIP: ""}
}
func (voted *VotedState)recvNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("VotedState recvNotify")
if bytes.Equal(cs.privValidator.GetAddress(), cs.currentVote.VotedNodeAddress) {
dposlog.Info("ignore recvNotify because this node is the owner now.")
return
} else {
cs.CacheNotify(notify)
cs.SaveVote()
cs.SaveMyVote()
cs.ClearVotes()
cs.SetState(WaitNotifyStateObj)
dposlog.Info("Change state because of recv notify.", "from", "VotedState", "to", "WaitNotifyState")
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify)
}
return
}
}
// VotingState is the voting state of dpos state machine until timeout or get an agreement by votes.
type WaitNofifyState struct {
}
func (wait *WaitNofifyState)timeOut(cs *ConsensusState) {
//cs.clearVotes()
cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections) * time.Millisecond, InitStateType)
}
func (wait *WaitNofifyState)sendVote(cs *ConsensusState,vote * dpostype.DPosVote) {
dposlog.Info("WaitNofifyState does not support sendVote,so do nothing")
}
func (wait *WaitNofifyState)recvVote(cs *ConsensusState,vote * dpostype.DPosVote) {
dposlog.Info("WaitNofifyState recvVote,store it.")
//对于vote进行保存,在后续状态中进行处理。 保存的选票有先后,同一个节点发来的最新的选票被保存。
//if !cs.VerifyVote(vote) {
// dposlog.Info("VotingState verify vote failed", "vote", vote.String())
// return
//}
//cs.AddVotes(vote)
cs.CacheVotes(vote)
}
func (wait *WaitNofifyState)sendVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply) {
dposlog.Info("WaitNofifyState does not support sendVoteReply,so do nothing")
}
func (wait *WaitNofifyState)recvVoteReply(cs *ConsensusState, reply * dpostype.DPosVoteReply){
dposlog.Info("WaitNofifyState recv Vote reply,ignore it.")
}
func (wait *WaitNofifyState)sendNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("WaitNofifyState does not support sendNotify,so do nothing")
}
func (wait *WaitNofifyState)recvNotify(cs *ConsensusState, notify * dpostype.DPosNotify) {
dposlog.Info("WaitNofifyState recvNotify")
//记录Notify,校验区块,标记不可逆区块高度
if !cs.VerifyNotify(notify) {
dposlog.Info("VotedState verify notify failed")
return
}
block := cs.client.GetCurrentBlock()
if (block.Height > notify.HeightStop){
dposlog.Info("Local block height is advanced than notify, discard it.", "localheight", block.Height, "notify", notify.String())
return
} else if block.Height == notify.HeightStop && bytes.Equal(block.Hash(), notify.HashStop){
dposlog.Info("Local block height is sync with notify", "notify", notify.String())
} else {
dposlog.Info("Local block height is not sync with notify", "localheight", cs.client.GetCurrentHeight(), "notify", notify.String())
hint := time.NewTicker(3 * time.Second)
beg := time.Now()
catchupFlag := false
OuterLoop:
for !catchupFlag {
select {
case <-hint.C:
dposlog.Info("Still catching up max height......", "Height", cs.client.GetCurrentHeight(),"notifyHeight", notify.HeightStop, "cost", time.Since(beg))
if cs.client.IsCaughtUp() && cs.client.GetCurrentHeight() >= notify.HeightStop {
dposlog.Info("This node has caught up max height", "Height", cs.client.GetCurrentHeight(), "isHashSame", bytes.Equal(block.Hash(), notify.HashStop))
break OuterLoop
}
default:
if cs.client.IsCaughtUp() && cs.client.GetCurrentHeight() >= notify.HeightStop {
dposlog.Info("This node has caught up max height", "Height", cs.client.GetCurrentHeight())
break OuterLoop
}
time.Sleep(time.Second)
}
}
hint.Stop()
}
cs.ClearCachedNotify()
cs.SaveNotify()
cs.SetNotify(notify)
//cs.clearVotes()
cs.SetState(InitStateObj)
dposlog.Info("Change state because recv notify.", "from", "WaitNofifyState", "to", "InitState")
cs.dposState.timeOut(cs)
//cs.scheduleDPosTimeout(time.Second * 1, InitStateType)
}
var InitStateObj = & InitState{}
var VotingStateObj = & VotingState{}
var VotedStateObj = & VotedState{}
var WaitNotifyStateObj = & WaitNofifyState{}
\ No newline at end of file
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"fmt"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"math/rand"
"testing"
"time"
_ "github.com/33cn/chain33/system"
_ "github.com/33cn/plugin/plugin/dapp/init"
_ "github.com/33cn/plugin/plugin/store/init"
)
var (
random *rand.Rand
loopCount = 10
conn *grpc.ClientConn
c types.Chain33Client
)
func init() {
setParams(3, 3, 6)
}
func setParams(delegateNum int64, blockInterval int64, continueBlockNum int64) {
dposDelegateNum = delegateNum //委托节点个数,从配置读取,以后可以根据投票结果来定
dposBlockInterval = blockInterval //出块间隔,当前按3s
dposContinueBlockNum = continueBlockNum //一个委托节点当选后,一次性持续出块数量
dposCycle = int64(dposDelegateNum * dposBlockInterval * dposContinueBlockNum)
dposPeriod = int64(dposBlockInterval * dposContinueBlockNum)
}
func printTask(now int64, task *DPosTask){
fmt.Printf("now:%v|cycleStart:%v|cycleStop:%v|periodStart:%v|periodStop:%v|blockStart:%v|blockStop:%v|nodeId:%v\n",
now,
task.cycleStart,
task.cycleStop,
task.periodStart,
task.periodStop,
task.blockStart,
task.blockStop,
task.nodeId)
}
func assertTask(task *DPosTask, t*testing.T){
assert.Equal(t, true, task.nodeId >= 0 && task.nodeId < dposDelegateNum)
assert.Equal(t, true, task.cycleStart <= task.periodStart && task.periodStart <= task.blockStart && task.blockStop <= task.periodStop && task.periodStop <= task.cycleStop)
}
func TestDecideTaskByTime(t *testing.T) {
now := time.Now().Unix()
task := DecideTaskByTime(now)
printTask(now, &task)
assertTask(&task, t)
setParams(2, 1, 6)
now = time.Now().Unix()
task = DecideTaskByTime(now)
printTask(now, &task)
assertTask(&task, t)
setParams(21, 1, 12)
now = time.Now().Unix()
task = DecideTaskByTime(now)
printTask(now, &task)
assertTask(&task, t)
setParams(21, 2, 12)
now = time.Now().Unix()
task = DecideTaskByTime(now)
printTask(now, &task)
assertTask(&task, t)
setParams(2, 3, 12)
for i := 0; i < 120; i++ {
now = time.Now().Unix()
task = DecideTaskByTime(now)
printTask(now, &task)
assertTask(&task, t)
time.Sleep(time.Second * 1)
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"time"
)
var (
tickTockBufferSize = 10
)
// TimeoutTicker is a timer that schedules timeouts
// conditional on the height/round/step in the timeoutInfo.
// The timeoutInfo.Duration may be non-positive.
type TimeoutTicker interface {
Start()
Stop()
Chan() <-chan timeoutInfo // on which to receive a timeout
ScheduleTimeout(ti timeoutInfo) // reset the timer
}
// timeoutTicker wraps time.Timer,
// scheduling timeouts only for greater height/round/step
// than what it's already seen.
// Timeouts are scheduled along the tickChan,
// and fired on the tockChan.
type timeoutTicker struct {
timer *time.Timer
tickChan chan timeoutInfo // for scheduling timeouts
tockChan chan timeoutInfo // for notifying about them
}
// NewTimeoutTicker returns a new TimeoutTicker.
func NewTimeoutTicker() TimeoutTicker {
tt := &timeoutTicker{
timer: time.NewTimer(0),
tickChan: make(chan timeoutInfo, tickTockBufferSize),
tockChan: make(chan timeoutInfo, tickTockBufferSize),
}
tt.stopTimer() // don't want to fire until the first scheduled timeout
return tt
}
// OnStart implements cmn.Service. It starts the timeout routine.
func (t *timeoutTicker) Start() {
go t.timeoutRoutine()
}
// OnStop implements cmn.Service. It stops the timeout routine.
func (t *timeoutTicker) Stop() {
t.stopTimer()
}
// Chan returns a channel on which timeouts are sent.
func (t *timeoutTicker) Chan() <-chan timeoutInfo {
return t.tockChan
}
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
//-------------------------------------------------------------
// stop the timer and drain if necessary
func (t *timeoutTicker) stopTimer() {
// Stop() returns false if it was already fired or was stopped
if !t.timer.Stop() {
select {
case <-t.timer.C:
default:
dposlog.Debug("Timer already stopped")
}
}
}
// send on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (t *timeoutTicker) timeoutRoutine() {
dposlog.Debug("Starting timeout routine")
var ti timeoutInfo
for {
select {
case newti := <-t.tickChan:
dposlog.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// stop the last timer
t.stopTimer()
// update timeoutInfo and reset timer
// NOTE time.Timer allows duration to be non-positive
ti = newti
t.timer.Reset(ti.Duration)
dposlog.Debug("Scheduled timeout", "dur", ti.Duration)
case <-t.timer.C:
dposlog.Info("Timed out", "dur", ti.Duration, "state", StateTypeMapping[ti.State])
// go routine here guarantees timeoutRoutine doesn't block.
// Determinism comes from playback in the receiveRoutine.
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
// and managing the timeouts ourselves with a millisecond ticker
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"encoding/json"
"io/ioutil"
"time"
"github.com/pkg/errors"
)
//------------------------------------------------------------
// core types for a genesis definition
// GenesisValidator is an initial validator.
type GenesisValidator struct {
PubKey KeyText `json:"pub_key"`
Name string `json:"name"`
}
// GenesisDoc defines the initial conditions for a tendermint blockchain, in particular its validator set.
type GenesisDoc struct {
GenesisTime time.Time `json:"genesis_time"`
ChainID string `json:"chain_id"`
Validators []GenesisValidator `json:"validators"`
AppHash []byte `json:"app_hash"`
AppOptions interface{} `json:"app_options,omitempty"`
}
// SaveAs is a utility method for saving GenensisDoc as a JSON file.
func (genDoc *GenesisDoc) SaveAs(file string) error {
genDocBytes, err := json.Marshal(genDoc)
if err != nil {
return err
}
return WriteFile(file, genDocBytes, 0644)
}
// ValidatorHash returns the hash of the validator set contained in the GenesisDoc
func (genDoc *GenesisDoc) ValidatorHash() []byte {
vals := make([]*Validator, len(genDoc.Validators))
for i, v := range genDoc.Validators {
if len(v.PubKey.Data) == 0 {
panic(Fmt("ValidatorHash pubkey of validator[%v] in gendoc is empty", i))
}
pubkey, err := PubKeyFromString(v.PubKey.Data)
if err != nil {
panic(Fmt("ValidatorHash PubKeyFromBytes failed:%v", err))
}
vals[i] = NewValidator(pubkey)
}
vset := NewValidatorSet(vals)
return vset.Hash()
}
// ValidateAndComplete checks that all necessary fields are present
// and fills in defaults for optional fields left empty
func (genDoc *GenesisDoc) ValidateAndComplete() error {
if genDoc.ChainID == "" {
return errors.Errorf("Genesis doc must include non-empty chain_id")
}
if len(genDoc.Validators) == 0 {
return errors.Errorf("The genesis file must have at least one validator")
}
if genDoc.GenesisTime.IsZero() {
genDoc.GenesisTime = time.Now()
}
return nil
}
//------------------------------------------------------------
// Make genesis state from file
// GenesisDocFromJSON unmarshalls JSON data into a GenesisDoc.
func GenesisDocFromJSON(jsonBlob []byte) (*GenesisDoc, error) {
genDoc := GenesisDoc{}
err := json.Unmarshal(jsonBlob, &genDoc)
if err != nil {
return nil, err
}
if err := genDoc.ValidateAndComplete(); err != nil {
return nil, err
}
return &genDoc, err
}
// GenesisDocFromFile reads JSON data from a file and unmarshalls it into a GenesisDoc.
func GenesisDocFromFile(genDocFile string) (*GenesisDoc, error) {
jsonBlob, err := ioutil.ReadFile(genDocFile)
if err != nil {
return nil, errors.Wrap(err, "Couldn't read GenesisDoc file")
}
genDoc, err := GenesisDocFromJSON(jsonBlob)
if err != nil {
return nil, errors.Wrap(err, Fmt("Error reading GenesisDoc at %v", genDocFile))
}
return genDoc, nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"time"
"reflect"
)
var (
// MsgMap define
MsgMap map[byte]reflect.Type
)
// step and message id define
const (
VoteID = byte(0x06)
VoteReplyID = byte(0x07)
NotifyID = byte(0x08)
PacketTypePing = byte(0xff)
PacketTypePong = byte(0xfe)
)
// InitMessageMap ...
func InitMessageMap() {
MsgMap = map[byte]reflect.Type{
VoteID: reflect.TypeOf(DPosVote{}),
VoteReplyID: reflect.TypeOf(DPosVoteReply{}),
NotifyID: reflect.TypeOf(DPosNotify{}),
}
}
//---------------------Canonical json-----------------------------------
// CanonicalJSONVote ...
type CanonicalJSONVoteItem struct {
VotedNodeIndex int32 `json:"votedNodeIndex,omitempty"`
VotedNodeAddress []byte `json:"votedNodeAddress,omitempty"`
CycleStart int64 `json:"cycleStart,omitempty"`
CycleStop int64 `json:"cycleStop,omitempty"`
PeriodStart int64 `json:"periodStart,omitempty"`
PeriodStop int64 `json:"periodStop,omitempty"`
Height int64 `json:"height,omitempty"`
VoteId []byte `json:"voteId,omitempty"`
}
type CanonicalJSONVote struct {
VoteItem *CanonicalJSONVoteItem `json:"vote,omitempty"`
VoteTimestamp int64 `json:"voteTimestamp,omitempty"`
VoterNodeIndex int32 `json:"voterNodeIndex,omitempty"`
VoterNodeAddress []byte `json:"voterNodeAddress,omitempty"`
}
// CanonicalJSONOnceVote ...
type CanonicalJSONOnceVote struct {
ChainID string `json:"chain_id"`
Vote CanonicalJSONVote `json:"vote"`
}
// CanonicalVote ...
func CanonicalVote(vote *Vote) CanonicalJSONVote {
return CanonicalJSONVote{
VoteItem: &CanonicalJSONVoteItem{
VotedNodeIndex: vote.VoteItem.VotedNodeIndex,
VotedNodeAddress: vote.VoteItem.VotedNodeAddress,
CycleStart: vote.VoteItem.CycleStart,
CycleStop: vote.VoteItem.CycleStop,
PeriodStart: vote.VoteItem.PeriodStart,
PeriodStop: vote.VoteItem.PeriodStop,
Height: vote.VoteItem.Height,
VoteId: vote.VoteItem.VoteId,
},
VoteTimestamp: vote.VoteTimestamp,
VoterNodeIndex: vote.VoterNodeIndex,
VoterNodeAddress: vote.VoterNodeAddress,
}
}
type CanonicalJSONNotify struct {
VoteItem *CanonicalJSONVoteItem `json:"vote,omitempty"`
HeightStop int64 `json:"heightStop,omitempty"`
NotifyTimestamp int64 `json:"notifyTimestamp,omitempty"`
}
// CanonicalJSONOnceVote ...
type CanonicalJSONOnceNotify struct {
ChainID string `json:"chain_id"`
Notify CanonicalJSONNotify `json:"vote"`
}
// CanonicalVote ...
func CanonicalNotify(notify *Notify) CanonicalJSONNotify {
return CanonicalJSONNotify{
VoteItem: &CanonicalJSONVoteItem{
VotedNodeIndex: notify.Vote.VotedNodeIndex,
VotedNodeAddress: notify.Vote.VotedNodeAddress,
CycleStart: notify.Vote.CycleStart,
CycleStop: notify.Vote.CycleStop,
PeriodStart: notify.Vote.PeriodStart,
PeriodStop: notify.Vote.PeriodStop,
Height: notify.Vote.Height,
VoteId: notify.Vote.VoteId,
},
HeightStop: notify.HeightStop,
NotifyTimestamp: notify.NotifyTimestamp,
}
}
// CanonicalTime ...
func CanonicalTime(t time.Time) string {
// note that sending time over go-wire resets it to
// local time, we need to force UTC here, so the
// signatures match
return t.UTC().Format(timeFormat)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"io/ioutil"
"os"
"sync"
"github.com/33cn/chain33/common/crypto"
)
// KeyText ...
type KeyText struct {
Kind string `json:"type"`
Data string `json:"data"`
}
// PrivValidator defines the functionality of a local Tendermint validator
// that signs votes, proposals, and heartbeats, and never double signs.
type PrivValidator interface {
GetAddress() []byte // redundant since .PubKey().Address()
GetPubKey() crypto.PubKey
SignVote(chainID string, vote *Vote) error
SignNotify(chainID string, notify *Notify) error
}
// PrivValidatorFS implements PrivValidator using data persisted to disk
// to prevent double signing. The Signer itself can be mutated to use
// something besides the default, for instance a hardware signer.
type PrivValidatorFS struct {
Address string `json:"address"`
PubKey KeyText `json:"pub_key"`
LastSignature *KeyText `json:"last_signature,omitempty"` // so we dont lose signatures
LastSignBytes string `json:"last_signbytes,omitempty"` // so we dont lose signatures
// PrivKey should be empty if a Signer other than the default is being used.
PrivKey KeyText `json:"priv_key"`
}
// PrivValidatorImp ...
type PrivValidatorImp struct {
Address []byte
PubKey crypto.PubKey
LastSignature crypto.Signature
LastSignBytes []byte
// PrivKey should be empty if a Signer other than the default is being used.
PrivKey crypto.PrivKey
Signer `json:"-"`
// For persistence.
// Overloaded for testing.
filePath string
mtx sync.Mutex
}
// Signer is an interface that defines how to sign messages.
// It is the caller's duty to verify the msg before calling Sign,
// eg. to avoid double signing.
// Currently, the only callers are SignVote, SignProposal, and SignHeartbeat.
type Signer interface {
Sign(msg []byte) (crypto.Signature, error)
}
// DefaultSigner implements Signer.
// It uses a standard, unencrypted crypto.PrivKey.
type DefaultSigner struct {
PrivKey crypto.PrivKey `json:"priv_key"`
}
// NewDefaultSigner returns an instance of DefaultSigner.
func NewDefaultSigner(priv crypto.PrivKey) *DefaultSigner {
return &DefaultSigner{
PrivKey: priv,
}
}
// Sign implements Signer. It signs the byte slice with a private key.
func (ds *DefaultSigner) Sign(msg []byte) (crypto.Signature, error) {
return ds.PrivKey.Sign(msg), nil
}
// GetAddress returns the address of the validator.
// Implements PrivValidator.
func (pv *PrivValidatorImp) GetAddress() []byte {
return pv.Address
}
// GetPubKey returns the public key of the validator.
// Implements PrivValidator.
func (pv *PrivValidatorImp) GetPubKey() crypto.PubKey {
return pv.PubKey
}
// GenAddressByPubKey ...
func GenAddressByPubKey(pubkey crypto.PubKey) []byte {
//must add 3 bytes ahead to make compatibly
typeAddr := append([]byte{byte(0x01), byte(0x01), byte(0x20)}, pubkey.Bytes()...)
return crypto.Ripemd160(typeAddr)
}
// PubKeyFromString ...
func PubKeyFromString(pubkeystring string) (crypto.PubKey, error) {
pub, err := hex.DecodeString(pubkeystring)
if err != nil {
return nil, errors.New(Fmt("PubKeyFromString:DecodeString:%v failed,err:%v", pubkeystring, err))
}
pubkey, err := ConsensusCrypto.PubKeyFromBytes(pub)
if err != nil {
return nil, errors.New(Fmt("PubKeyFromString:PubKeyFromBytes:%v failed,err:%v", pub, err))
}
return pubkey, nil
}
// SignatureFromString ...
func SignatureFromString(sigString string) (crypto.Signature, error) {
sigbyte, err := hex.DecodeString(sigString)
if err != nil {
return nil, errors.New(Fmt("PubKeyFromString:DecodeString:%v failed,err:%v", sigString, err))
}
sig, err := ConsensusCrypto.SignatureFromBytes(sigbyte)
if err != nil {
return nil, errors.New(Fmt("PubKeyFromString:SignatureFromBytes:%v failed,err:%v", sigbyte, err))
}
return sig, nil
}
// GenPrivValidatorImp generates a new validator with randomly generated private key
// and sets the filePath, but does not call Save().
func GenPrivValidatorImp(filePath string) *PrivValidatorImp {
privKey, err := ConsensusCrypto.GenKey()
if err != nil {
panic(Fmt("GenPrivValidatorImp: GenKey failed:%v", err))
}
return &PrivValidatorImp{
Address: GenAddressByPubKey(privKey.PubKey()),
PubKey: privKey.PubKey(),
PrivKey: privKey,
Signer: NewDefaultSigner(privKey),
filePath: filePath,
}
}
// LoadPrivValidatorFS loads a PrivValidatorImp from the filePath.
func LoadPrivValidatorFS(filePath string) *PrivValidatorImp {
return LoadPrivValidatorFSWithSigner(filePath, func(privVal PrivValidator) Signer {
return NewDefaultSigner(privVal.(*PrivValidatorImp).PrivKey)
})
}
// LoadOrGenPrivValidatorFS loads a PrivValidatorFS from the given filePath
// or else generates a new one and saves it to the filePath.
func LoadOrGenPrivValidatorFS(filePath string) *PrivValidatorImp {
var privVal *PrivValidatorImp
if _, err := os.Stat(filePath); err == nil {
privVal = LoadPrivValidatorFS(filePath)
} else {
privVal = GenPrivValidatorImp(filePath)
privVal.Save()
}
return privVal
}
// LoadPrivValidatorFSWithSigner loads a PrivValidatorFS with a custom
// signer object. The PrivValidatorFS handles double signing prevention by persisting
// data to the filePath, while the Signer handles the signing.
// If the filePath does not exist, the PrivValidatorFS must be created manually and saved.
func LoadPrivValidatorFSWithSigner(filePath string, signerFunc func(PrivValidator) Signer) *PrivValidatorImp {
privValJSONBytes, err := ioutil.ReadFile(filePath)
if err != nil {
Exit(err.Error())
}
privVal := &PrivValidatorFS{}
err = json.Unmarshal(privValJSONBytes, &privVal)
if err != nil {
Exit(Fmt("Error reading PrivValidator from %v: %v\n", filePath, err))
}
if len(privVal.PubKey.Data) == 0 {
Exit("Error PrivValidator pubkey is empty\n")
}
if len(privVal.PrivKey.Data) == 0 {
Exit("Error PrivValidator privkey is empty\n")
}
addr, err := hex.DecodeString(privVal.Address)
if err != nil {
Exit(Fmt("Error PrivValidator DecodeString failed:%v\n", err))
}
privValImp := &PrivValidatorImp{
Address: addr,
}
tmp, err := hex.DecodeString(privVal.PrivKey.Data)
if err != nil {
Exit(Fmt("Error DecodeString PrivKey data failed: %v\n", err))
}
privKey, err := ConsensusCrypto.PrivKeyFromBytes(tmp)
if err != nil {
Exit(Fmt("Error PrivKeyFromBytes failed: %v\n", err))
}
privValImp.PrivKey = privKey
pubKey, err := PubKeyFromString(privVal.PubKey.Data)
if err != nil {
Exit(Fmt("Error PubKeyFromBytes failed: %v\n", err))
}
privValImp.PubKey = pubKey
if len(privVal.LastSignBytes) != 0 {
tmp, err = hex.DecodeString(privVal.LastSignBytes)
if err != nil {
Exit(Fmt("Error DecodeString LastSignBytes data failed: %v\n", err))
}
privValImp.LastSignBytes = tmp
}
if privVal.LastSignature != nil {
signature, err := SignatureFromString(privVal.LastSignature.Data)
if err != nil {
Exit(Fmt("Error SignatureFromBytes failed: %v\n", err))
}
privValImp.LastSignature = signature
} else {
privValImp.LastSignature = nil
}
privValImp.filePath = filePath
privValImp.Signer = signerFunc(privValImp)
return privValImp
}
// Save persists the PrivValidatorFS to disk.
func (pv *PrivValidatorImp) Save() {
pv.mtx.Lock()
defer pv.mtx.Unlock()
pv.save()
}
func (pv *PrivValidatorImp) save() {
if pv.filePath == "" {
PanicSanity("Cannot save PrivValidator: filePath not set")
}
addr := Fmt("%X", pv.Address[:])
privValFS := &PrivValidatorFS{
Address: addr,
LastSignature: nil,
}
privValFS.PrivKey = KeyText{Kind: "ed25519", Data: Fmt("%X", pv.PrivKey.Bytes()[:])}
privValFS.PubKey = KeyText{Kind: "ed25519", Data: pv.PubKey.KeyString()}
if len(pv.LastSignBytes) != 0 {
tmp := Fmt("%X", pv.LastSignBytes[:])
privValFS.LastSignBytes = tmp
}
if pv.LastSignature != nil {
sig := Fmt("%X", pv.LastSignature.Bytes()[:])
privValFS.LastSignature = &KeyText{Kind: "ed25519", Data: sig}
}
jsonBytes, err := json.Marshal(privValFS)
if err != nil {
// `@; BOOM!!!
PanicCrisis(err)
}
err = WriteFileAtomic(pv.filePath, jsonBytes, 0600)
if err != nil {
// `@; BOOM!!!
PanicCrisis(err)
}
}
// Reset resets all fields in the PrivValidatorFS.
// NOTE: Unsafe!
func (pv *PrivValidatorImp) Reset() {
pv.LastSignature = nil
pv.LastSignBytes = nil
pv.Save()
}
// SignVote signs a canonical representation of the vote, along with the
// chainID. Implements PrivValidator.
func (pv *PrivValidatorImp) SignVote(chainID string, vote *Vote) error {
pv.mtx.Lock()
defer pv.mtx.Unlock()
signBytes := SignBytes(chainID, vote)
signature, err := pv.Sign(signBytes)
if err != nil {
return errors.New(Fmt("Error signing vote: %v", err))
}
vote.Signature = signature.Bytes()
return nil
}
// SignVote signs a canonical representation of the vote, along with the
// chainID. Implements PrivValidator.
func (pv *PrivValidatorImp) SignNotify(chainID string, notify *Notify) error {
pv.mtx.Lock()
defer pv.mtx.Unlock()
signBytes := SignBytes(chainID, notify)
signature, err := pv.Sign(signBytes)
if err != nil {
return errors.New(Fmt("Error signing vote: %v", err))
}
notify.Signature = signature.Bytes()
return nil
}
// Persist height/round/step and signature
func (pv *PrivValidatorImp) saveSigned(signBytes []byte, sig crypto.Signature) {
pv.LastSignature = sig
pv.LastSignBytes = signBytes
pv.save()
}
// String returns a string representation of the PrivValidatorImp.
func (pv *PrivValidatorImp) String() string {
return Fmt("PrivValidator{%v}", pv.GetAddress())
}
// PrivValidatorsByAddress ...
type PrivValidatorsByAddress []*PrivValidatorImp
func (pvs PrivValidatorsByAddress) Len() int {
return len(pvs)
}
func (pvs PrivValidatorsByAddress) Less(i, j int) bool {
return bytes.Compare(pvs[i].GetAddress(), pvs[j].GetAddress()) == -1
}
func (pvs PrivValidatorsByAddress) Swap(i, j int) {
it := pvs[i]
pvs[i] = pvs[j]
pvs[j] = it
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/log/log15"
"io"
"time"
)
// error defines
var (
ErrNotifyInvalidValidatorAddress = errors.New("Invalid validator address for notify")
ErrNotifyInvalidValidatorIndex = errors.New("Invalid validator index for notify")
ErrNotifyInvalidSignature = errors.New("Invalid notify signature")
ErrVoteInvalidValidatorIndex = errors.New("Invalid validator index for vote")
ErrVoteInvalidValidatorAddress = errors.New("Invalid validator address for vote")
ErrVoteInvalidSignature = errors.New("Invalid vote signature")
ErrVoteNil = errors.New("Nil vote")
votelog = log15.New("module", "tendermint-vote")
ConsensusCrypto crypto.Crypto
)
// Signable is an interface for all signable things.
// It typically removes signatures before serializing.
type Signable interface {
WriteSignBytes(chainID string, w io.Writer, n *int, err *error)
}
// SignBytes is a convenience method for getting the bytes to sign of a Signable.
func SignBytes(chainID string, o Signable) []byte {
buf, n, err := new(bytes.Buffer), new(int), new(error)
o.WriteSignBytes(chainID, buf, n, err)
if *err != nil {
PanicCrisis(err)
}
return buf.Bytes()
}
// Vote Represents a vote from validators for consensus.
type Vote struct {
*DPosVote
}
// WriteSignBytes ...
func (vote *Vote) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) {
if *err != nil {
return
}
canonical := CanonicalJSONOnceVote{
chainID,
CanonicalVote(vote),
}
byteVote, e := json.Marshal(&canonical)
if e != nil {
*err = e
votelog.Error("vote WriteSignBytes marshal failed", "err", e)
return
}
number, writeErr := w.Write(byteVote)
*n = number
*err = writeErr
}
// Copy ...
func (vote *Vote) Copy() *Vote {
voteCopy := *vote
return &voteCopy
}
func (vote *Vote) String() string {
if vote == nil {
return "nil-Vote"
}
return fmt.Sprintf("Vote{VotedNodeIndex:%v, VotedNodeAddr:%X,Cycle[%v,%v],Period[%v,%v],StartHeight:%v,VoteId:%X,VoteTimeStamp:%v,VoteNodeIndex:%v,VoteNodeAddr:%X,Sig:%X}",
vote.VoteItem.VotedNodeIndex,
Fingerprint(vote.VoteItem.VotedNodeAddress),
vote.VoteItem.CycleStart,
vote.VoteItem.CycleStop,
vote.VoteItem.PeriodStart,
vote.VoteItem.PeriodStop,
vote.VoteItem.Height,
Fingerprint(vote.VoteItem.VoteId),
CanonicalTime(time.Unix(0, vote.VoteTimestamp)),
vote.VoterNodeIndex,
Fingerprint(vote.VoterNodeAddress),
Fingerprint(vote.Signature),
)
}
// Verify ...
func (vote *Vote) Verify(chainID string, pubKey crypto.PubKey) error {
addr := GenAddressByPubKey(pubKey)
if !bytes.Equal(addr, vote.VoterNodeAddress) {
return ErrVoteInvalidValidatorAddress
}
sig, err := ConsensusCrypto.SignatureFromBytes(vote.Signature)
if err != nil {
votelog.Error("vote Verify failed", "err", err)
return err
}
if !pubKey.VerifyBytes(SignBytes(chainID, vote), sig) {
return ErrVoteInvalidSignature
}
return nil
}
// Hash ...
func (vote *Vote) Hash() []byte {
if vote == nil {
//votelog.Error("vote hash is nil")
return nil
}
bytes, err := json.Marshal(vote)
if err != nil {
votelog.Error("vote hash marshal failed", "err", err)
return nil
}
return crypto.Ripemd160(bytes)
}
// Vote Represents a vote from validators for consensus.
type Notify struct {
*DPosNotify
}
// WriteSignBytes ...
func (notify *Notify) WriteSignBytes(chainID string, w io.Writer, n *int, err *error) {
if *err != nil {
return
}
canonical := CanonicalJSONOnceNotify{
chainID,
CanonicalNotify(notify),
}
byteVote, e := json.Marshal(&canonical)
if e != nil {
*err = e
votelog.Error("vote WriteSignBytes marshal failed", "err", e)
return
}
number, writeErr := w.Write(byteVote)
*n = number
*err = writeErr
}
// Copy ...
func (notify *Notify) Copy() *Notify {
notifyCopy := *notify
return &notifyCopy
}
func (notify *Notify) String() string {
if notify == nil {
return "nil-notify"
}
return fmt.Sprintf("Notify{VotedNodeIndex:%v, VotedNodeAddr:%X,Cycle[%v,%v],Period[%v,%v],StartHeight:%v,VoteId:%X,NotifyTimeStamp:%v,HeightStop:%v,NotifyNodeIndex:%v,NotifyNodeAddr:%X,Sig:%X}",
notify.Vote.VotedNodeIndex,
Fingerprint(notify.Vote.VotedNodeAddress),
notify.Vote.CycleStart,
notify.Vote.CycleStop,
notify.Vote.PeriodStart,
notify.Vote.PeriodStop,
notify.Vote.Height,
Fingerprint(notify.Vote.VoteId),
CanonicalTime(time.Unix(0, notify.NotifyTimestamp)),
notify.HeightStop,
notify.NotifyNodeIndex,
Fingerprint(notify.NotifyNodeAddress),
Fingerprint(notify.Signature),
)
}
// Verify ...
func (notify *Notify) Verify(chainID string, pubKey crypto.PubKey) error {
addr := GenAddressByPubKey(pubKey)
if !bytes.Equal(addr, notify.NotifyNodeAddress) {
return ErrNotifyInvalidValidatorAddress
}
sig, err := ConsensusCrypto.SignatureFromBytes(notify.Signature)
if err != nil {
votelog.Error("Notify Verify failed", "err", err)
return err
}
if !pubKey.VerifyBytes(SignBytes(chainID, notify), sig) {
return ErrNotifyInvalidSignature
}
return nil
}
// Hash ...
func (notify *Notify) Hash() []byte {
if notify == nil {
//votelog.Error("vote hash is nil")
return nil
}
bytes, err := json.Marshal(notify)
if err != nil {
votelog.Error("vote hash marshal failed", "err", err)
return nil
}
return crypto.Ripemd160(bytes)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"
)
const (
// RFC3339Millis ...
RFC3339Millis = "2006-01-02T15:04:05.000Z" // forced microseconds
timeFormat = RFC3339Millis
)
var (
randgen *rand.Rand
randMux sync.Mutex
// Fmt ...
Fmt = fmt.Sprintf
)
// Init ...
func Init() {
if randgen == nil {
randMux.Lock()
randgen = rand.New(rand.NewSource(time.Now().UnixNano()))
randMux.Unlock()
}
}
// WriteFile ...
func WriteFile(filePath string, contents []byte, mode os.FileMode) error {
return ioutil.WriteFile(filePath, contents, mode)
}
// WriteFileAtomic ...
func WriteFileAtomic(filePath string, newBytes []byte, mode os.FileMode) error {
dir := filepath.Dir(filePath)
f, err := ioutil.TempFile(dir, "")
if err != nil {
return err
}
_, err = f.Write(newBytes)
if err == nil {
err = f.Sync()
}
if closeErr := f.Close(); err == nil {
err = closeErr
}
if permErr := os.Chmod(f.Name(), mode); err == nil {
err = permErr
}
if err == nil {
err = os.Rename(f.Name(), filePath)
}
// any err should result in full cleanup
if err != nil {
if er := os.Remove(f.Name()); er != nil {
fmt.Printf("WriteFileAtomic Remove failed:%v", er)
}
}
return err
}
// Tempfile ...
func Tempfile(prefix string) (*os.File, string) {
file, err := ioutil.TempFile("", prefix)
if err != nil {
panic(err)
}
return file, file.Name()
}
// Fingerprint ...
func Fingerprint(slice []byte) []byte {
fingerprint := make([]byte, 6)
copy(fingerprint, slice)
return fingerprint
}
// Kill ...
func Kill() error {
p, err := os.FindProcess(os.Getpid())
if err != nil {
return err
}
return p.Signal(syscall.SIGTERM)
}
// Exit ...
func Exit(s string) {
fmt.Printf(s + "\n")
os.Exit(1)
}
// Parallel ...
func Parallel(tasks ...func()) {
var wg sync.WaitGroup
wg.Add(len(tasks))
for _, task := range tasks {
go func(task func()) {
task()
wg.Done()
}(task)
}
wg.Wait()
}
// Percent represents a percentage in increments of 1/1000th of a percent.
type Percent uint32
// Float ...
func (p Percent) Float() float64 {
return float64(p) * 1e-3
}
func (p Percent) String() string {
var buf [12]byte
b := strconv.AppendUint(buf[:0], uint64(p)/1000, 10)
n := len(b)
b = strconv.AppendUint(b, 1000+uint64(p)%1000, 10)
b[n] = '.'
return string(append(b, '%'))
}
// MinInt ...
func MinInt(a, b int) int {
if a < b {
return a
}
return b
}
// MaxInt ...
func MaxInt(a, b int) int {
if a > b {
return a
}
return b
}
// RandIntn ...
func RandIntn(n int) int {
if n <= 0 {
panic("invalid argument to Intn")
}
if n <= 1<<31-1 {
randMux.Lock()
i32 := randgen.Int31n(int32(n))
randMux.Unlock()
return int(i32)
}
randMux.Lock()
i64 := randgen.Int63n(int64(n))
randMux.Unlock()
return int(i64)
}
// RandUint32 ...
func RandUint32() uint32 {
randMux.Lock()
u32 := randgen.Uint32()
randMux.Unlock()
return u32
}
// RandInt63n ...
func RandInt63n(n int64) int64 {
randMux.Lock()
i64 := randgen.Int63n(n)
randMux.Unlock()
return i64
}
// PanicSanity ...
func PanicSanity(v interface{}) {
panic(Fmt("Panicked on a Sanity Check: %v", v))
}
// PanicCrisis ...
func PanicCrisis(v interface{}) {
panic(Fmt("Panicked on a Crisis: %v", v))
}
// PanicQ ...
func PanicQ(v interface{}) {
panic(Fmt("Panicked questionably: %v", v))
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package types
import (
"bytes"
"sort"
"strings"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/common/merkle"
)
var validatorsetlog = log15.New("module", "dpos-val")
// Validator ...
type Validator struct {
Address []byte `json:"address"`
PubKey []byte `json:"pub_key"`
}
// NewValidator ...
func NewValidator(pubKey crypto.PubKey) *Validator {
return &Validator{
Address: GenAddressByPubKey(pubKey),
PubKey: pubKey.Bytes(),
}
}
// Copy Creates a new copy of the validator so we can mutate accum.
// Panics if the validator is nil.
func (v *Validator) Copy() *Validator {
vCopy := *v
return &vCopy
}
func (v *Validator) String() string {
if v == nil {
return "nil-Validator"
}
return Fmt("Validator{%v %v}",
v.Address,
v.PubKey)
}
// Hash computes the unique ID of a validator with a given voting power.
// It excludes the Accum value, which changes with every round.
func (v *Validator) Hash() []byte {
hashBytes := v.Address
hashBytes = append(hashBytes, v.PubKey...)
return crypto.Ripemd160(hashBytes)
}
// ValidatorSet represent a set of *Validator at a given height.
// The validators can be fetched by address or index.
// The index is in order of .Address, so the indices are fixed
// for all rounds of a given blockchain height.
// On the other hand, the .AccumPower of each validator and
// the designated .GetProposer() of a set changes every round,
// upon calling .IncrementAccum().
// NOTE: Not goroutine-safe.
// NOTE: All get/set to validators should copy the value for safety.
// TODO: consider validator Accum overflow
type ValidatorSet struct {
// NOTE: persisted via reflect, must be exported.
Validators []*Validator `json:"validators"`
}
// NewValidatorSet ...
func NewValidatorSet(vals []*Validator) *ValidatorSet {
validators := make([]*Validator, len(vals))
for i, val := range vals {
validators[i] = val.Copy()
}
sort.Sort(ValidatorsByAddress(validators))
vs := &ValidatorSet{
Validators: validators,
}
return vs
}
// Copy ...
func (valSet *ValidatorSet) Copy() *ValidatorSet {
validators := make([]*Validator, len(valSet.Validators))
for i, val := range valSet.Validators {
// NOTE: must copy, since IncrementAccum updates in place.
validators[i] = val.Copy()
}
return &ValidatorSet{
Validators: validators,
}
}
// HasAddress ...
func (valSet *ValidatorSet) HasAddress(address []byte) bool {
idx := sort.Search(len(valSet.Validators), func(i int) bool {
return bytes.Compare(address, valSet.Validators[i].Address) <= 0
})
return idx != len(valSet.Validators) && bytes.Equal(valSet.Validators[idx].Address, address)
}
// GetByAddress ...
func (valSet *ValidatorSet) GetByAddress(address []byte) (index int, val *Validator) {
idx := sort.Search(len(valSet.Validators), func(i int) bool {
return bytes.Compare(address, valSet.Validators[i].Address) <= 0
})
if idx != len(valSet.Validators) && bytes.Equal(valSet.Validators[idx].Address, address) {
return idx, valSet.Validators[idx].Copy()
}
return -1, nil
}
// GetByIndex returns the validator by index.
// It returns nil values if index < 0 or
// index >= len(ValidatorSet.Validators)
func (valSet *ValidatorSet) GetByIndex(index int) (address []byte, val *Validator) {
if index < 0 || index >= len(valSet.Validators) {
return nil, nil
}
val = valSet.Validators[index]
return val.Address, val.Copy()
}
// Size ...
func (valSet *ValidatorSet) Size() int {
return len(valSet.Validators)
}
// Hash ...
func (valSet *ValidatorSet) Hash() []byte {
if len(valSet.Validators) == 0 {
return nil
}
hashables := make([][]byte, len(valSet.Validators))
for i, val := range valSet.Validators {
hashables[i] = val.Hash()
}
return merkle.GetMerkleRoot(hashables)
}
// Add ...
func (valSet *ValidatorSet) Add(val *Validator) (added bool) {
val = val.Copy()
idx := sort.Search(len(valSet.Validators), func(i int) bool {
return bytes.Compare(val.Address, valSet.Validators[i].Address) <= 0
})
if idx == len(valSet.Validators) {
valSet.Validators = append(valSet.Validators, val)
return true
} else if bytes.Equal(valSet.Validators[idx].Address, val.Address) {
return false
} else {
newValidators := make([]*Validator, len(valSet.Validators)+1)
copy(newValidators[:idx], valSet.Validators[:idx])
newValidators[idx] = val
copy(newValidators[idx+1:], valSet.Validators[idx:])
valSet.Validators = newValidators
return true
}
}
// Update ...
func (valSet *ValidatorSet) Update(val *Validator) (updated bool) {
index, sameVal := valSet.GetByAddress(val.Address)
if sameVal == nil {
return false
}
valSet.Validators[index] = val.Copy()
return true
}
// Remove ...
func (valSet *ValidatorSet) Remove(address []byte) (val *Validator, removed bool) {
idx := sort.Search(len(valSet.Validators), func(i int) bool {
return bytes.Compare(address, valSet.Validators[i].Address) <= 0
})
if idx == len(valSet.Validators) || !bytes.Equal(valSet.Validators[idx].Address, address) {
return nil, false
}
removedVal := valSet.Validators[idx]
newValidators := valSet.Validators[:idx]
if idx+1 < len(valSet.Validators) {
newValidators = append(newValidators, valSet.Validators[idx+1:]...)
}
valSet.Validators = newValidators
return removedVal, true
}
// Iterate ...
func (valSet *ValidatorSet) Iterate(fn func(index int, val *Validator) bool) {
for i, val := range valSet.Validators {
stop := fn(i, val.Copy())
if stop {
break
}
}
}
func (valSet *ValidatorSet) String() string {
return valSet.StringIndented("")
}
// StringIndented ...
func (valSet *ValidatorSet) StringIndented(indent string) string {
if valSet == nil {
return "nil-ValidatorSet"
}
valStrings := []string{}
valSet.Iterate(func(index int, val *Validator) bool {
valStrings = append(valStrings, val.String())
return false
})
return Fmt(`ValidatorSet{
%s Validators:
%s %v
%s}`,
indent,
indent, strings.Join(valStrings, "\n"+indent+" "),
indent)
}
// Implements sort for sorting validators by address.
// ValidatorsByAddress ...
type ValidatorsByAddress []*Validator
func (vs ValidatorsByAddress) Len() int {
return len(vs)
}
func (vs ValidatorsByAddress) Less(i, j int) bool {
return bytes.Compare(vs[i].Address, vs[j].Address) == -1
}
func (vs ValidatorsByAddress) Swap(i, j int) {
it := vs[i]
vs[i] = vs[j]
vs[j] = it
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package dpos
import (
"bytes"
"encoding/json"
"fmt"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
"math/rand"
)
const fee = 1e6
var (
r *rand.Rand
)
// State is a short description of the latest committed block of the Tendermint consensus.
// It keeps all information necessary to validate new blocks,
// including the last validator set and the consensus params.
// All fields are exposed so the struct can be easily serialized,
// but none of them should be mutated directly.
// Instead, use state.Copy() or state.NextState(...).
// NOTE: not goroutine-safe.
type ValidatorMgr struct {
// Immutable
ChainID string
// Validators are persisted to the database separately every time they change,
// so we can query for historical validator sets.
// Note that if s.LastBlockHeight causes a valset change,
// we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1
Validators *ttypes.ValidatorSet
// The latest AppHash we've received from calling abci.Commit()
AppHash []byte
}
// Copy makes a copy of the State for mutating.
func (s ValidatorMgr) Copy() ValidatorMgr {
return ValidatorMgr{
ChainID: s.ChainID,
Validators: s.Validators.Copy(),
AppHash: s.AppHash,
}
}
// Equals returns true if the States are identical.
func (s ValidatorMgr) Equals(s2 ValidatorMgr) bool {
return bytes.Equal(s.Bytes(), s2.Bytes())
}
// Bytes serializes the State using go-wire.
func (s ValidatorMgr) Bytes() []byte {
sbytes, err := json.Marshal(s)
if err != nil {
fmt.Printf("Error reading GenesisDoc: %v", err)
return nil
}
return sbytes
}
// IsEmpty returns true if the State is equal to the empty State.
func (s ValidatorMgr) IsEmpty() bool {
return s.Validators == nil // XXX can't compare to Empty
}
// GetValidators returns the last and current validator sets.
func (s ValidatorMgr) GetValidators() (current *ttypes.ValidatorSet) {
return s.Validators
}
// MakeGenesisState creates state from ttypes.GenesisDoc.
func MakeGenesisValidatorMgr(genDoc *ttypes.GenesisDoc) (ValidatorMgr, error) {
err := genDoc.ValidateAndComplete()
if err != nil {
return ValidatorMgr{}, fmt.Errorf("Error in genesis file: %v", err)
}
// Make validators slice
validators := make([]*ttypes.Validator, len(genDoc.Validators))
for i, val := range genDoc.Validators {
pubKey, err := ttypes.PubKeyFromString(val.PubKey.Data)
if err != nil {
return ValidatorMgr{}, fmt.Errorf("Error validate[%v] in genesis file: %v", i, err)
}
// Make validator
validators[i] = &ttypes.Validator{
Address: ttypes.GenAddressByPubKey(pubKey),
PubKey: pubKey.Bytes(),
}
}
return ValidatorMgr{
ChainID: genDoc.ChainID,
Validators: ttypes.NewValidatorSet(validators),
AppHash: genDoc.AppHash,
}, nil
}
package init package init
import ( import (
_ "github.com/33cn/plugin/plugin/consensus/dpos" //auto gen
_ "github.com/33cn/plugin/plugin/consensus/para" //auto gen _ "github.com/33cn/plugin/plugin/consensus/para" //auto gen
_ "github.com/33cn/plugin/plugin/consensus/pbft" //auto gen _ "github.com/33cn/plugin/plugin/consensus/pbft" //auto gen
_ "github.com/33cn/plugin/plugin/consensus/raft" //auto gen _ "github.com/33cn/plugin/plugin/consensus/raft" //auto gen
......
...@@ -36,7 +36,7 @@ var r *rand.Rand ...@@ -36,7 +36,7 @@ var r *rand.Rand
func createConn(ip string) { func createConn(ip string) {
var err error var err error
url := ip + ":8802" url := ip + ":9802"
fmt.Println("grpc url:", url) fmt.Println("grpc url:", url)
conn, err = grpc.Dial(url, grpc.WithInsecure()) conn, err = grpc.Dial(url, grpc.WithInsecure())
if err != nil { if err != nil {
......
...@@ -139,7 +139,7 @@ func Put(ip string, size string, privkey string) { ...@@ -139,7 +139,7 @@ func Put(ip string, size string, privkey string) {
fmt.Fprintln(os.Stderr, err) fmt.Fprintln(os.Stderr, err)
return return
} }
url := "http://" + ip + ":8801" url := "http://" + ip + ":9801"
if privkey == "" { if privkey == "" {
_, priv := genaddress() _, priv := genaddress()
privkey = common.ToHex(priv.Bytes()) privkey = common.ToHex(priv.Bytes())
......
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// source: guess.proto // source: dpos_msg.proto
package types package types
...@@ -2059,7 +2059,7 @@ func init() { ...@@ -2059,7 +2059,7 @@ func init() {
proto.RegisterType((*GuessGameRecords)(nil), "types.GuessGameRecords") proto.RegisterType((*GuessGameRecords)(nil), "types.GuessGameRecords")
} }
func init() { proto.RegisterFile("guess.proto", fileDescriptor_7574406c5d3430e8) } func init() { proto.RegisterFile("dpos_msg.proto", fileDescriptor_7574406c5d3430e8) }
var fileDescriptor_7574406c5d3430e8 = []byte{ var fileDescriptor_7574406c5d3430e8 = []byte{
// 1367 bytes of a gzipped FileDescriptorProto // 1367 bytes of a gzipped FileDescriptorProto
...@@ -2362,5 +2362,5 @@ var _Guess_serviceDesc = grpc.ServiceDesc{ ...@@ -2362,5 +2362,5 @@ var _Guess_serviceDesc = grpc.ServiceDesc{
}, },
}, },
Streams: []grpc.StreamDesc{}, Streams: []grpc.StreamDesc{},
Metadata: "guess.proto", Metadata: "dpos_msg.proto",
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment