Commit 69366e2b authored by 张振华's avatar 张振华

update

parent 7f71d2a5
......@@ -211,7 +211,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
}
// PrivValidator returns the Node's PrivValidator.
// XXX: for convenience only!
func (client *Client) PrivValidator() ttypes.PrivValidator {
return client.privValidator
}
......@@ -291,20 +290,24 @@ OuterLoop:
dposlog.Info("QueryCandidators success but no enough candidators", "dposDelegateNum", dposDelegateNum, "candidatorNum", len(cands))
} else {
validators := make([]*ttypes.Validator, dposDelegateNum)
nodes := make([]string, dposDelegateNum)
for i, val := range cands {
// Make validator
validators[i] = &ttypes.Validator{
Address: address.PubKeyToAddress(val.Pubkey).Hash160[:],
PubKey: val.Pubkey,
}
nodes[i] = val.Ip + ":" + dposPort
}
valMgr.Validators = ttypes.NewValidatorSet(validators)
dposlog.Info("QueryCandidators success and update validator set", "old validators", valMgrTmp.Validators.String(), "new validators", valMgr.Validators.String())
dposlog.Info("QueryCandidators success and update validator set", "old validators", printValidators(valMgrTmp.Validators), "new validators", printValidators(valMgr.Validators))
dposlog.Info("QueryCandidators success and update validator node ips", "old validator ips", printNodeIPs(validatorNodes), "new validators ips", printNodeIPs(nodes))
validatorNodes = nodes
}
}
}
dposlog.Info("StartConsensus", "validators", valMgr.Validators)
dposlog.Info("StartConsensus", "validators", printValidators(valMgr.Validators))
// Log whether this node is a delegator or an observer
if valMgr.Validators.HasAddress(client.privValidator.GetAddress()) {
dposlog.Info("This node is a delegator")
......@@ -337,6 +340,26 @@ OuterLoop:
//go client.MonitorCandidators()
}
func printValidators(set *ttypes.ValidatorSet) string{
result := "Validators:["
for _, v := range set.Validators {
result = fmt.Sprintf("%s%s,", result, hex.EncodeToString(v.PubKey))
}
result += "]"
return result
}
func printNodeIPs(ips []string) string{
result := "nodeIPs:["
for _, v := range ips {
result = fmt.Sprintf("%s%s,", result, v)
}
result += "]"
return result
}
// GetGenesisBlockTime ...
func (client *Client) GetGenesisBlockTime() int64 {
return genesisBlockTime
......@@ -409,54 +432,6 @@ func (client *Client) CreateBlock() {
}
}
// CreateBlock a routine monitor whether some transactions available and tell client by available channel
/*
func (client *Client) CreateBlockWithPriorTxs(priorTxs []*types.Transaction) {
lastBlock := client.GetCurrentBlock()
txs := client.RequestTx(int(types.GetP(lastBlock.Height + 1).MaxTxNumber), nil)
if len(priorTxs) > 0 {
txs = append(txs, priorTxs...)
}
if len(txs) == 0 {
block := client.GetCurrentBlock()
if createEmptyBlocks {
emptyBlock := &types.Block{}
emptyBlock.StateHash = block.StateHash
emptyBlock.ParentHash = block.Hash()
emptyBlock.Height = block.Height + 1
emptyBlock.Txs = nil
emptyBlock.TxHash = zeroHash[:]
emptyBlock.BlockTime = client.blockTime
err := client.WriteBlock(lastBlock.StateHash, emptyBlock)
//判断有没有交易是被删除的,这类交易要从mempool 中删除
if err != nil {
return
}
} else {
dposlog.Info("Ignore to create new Block for no tx in mempool", "Height", block.Height+1)
}
return
}
//check dup
txs = client.CheckTxDup(txs, client.GetCurrentHeight())
var newblock types.Block
newblock.ParentHash = lastBlock.Hash()
newblock.Height = lastBlock.Height + 1
client.AddTxsToBlock(&newblock, txs)
//
newblock.Difficulty = types.GetP(0).PowLimitBits
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
newblock.BlockTime = client.blockTime
err := client.WriteBlock(lastBlock.StateHash, &newblock)
//判断有没有交易是被删除的,这类交易要从mempool 中删除
if err != nil {
return
}
}
*/
// StopC stop client
func (client *Client) StopC() <-chan struct{} {
return client.stopC
......@@ -539,46 +514,6 @@ func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
return cands, nil
}
/*
func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
var params rpctypes.Query4Jrpc
params.Execer = dty.DPosX
req := &dty.CandidatorQuery{
TopN: int32(dposDelegateNum),
}
params.FuncName = dty.FuncNameQueryCandidatorByTopN
params.Payload = types.MustPBToJSON(req)
var res dty.CandidatorReply
ctx := jsonrpc.NewRPCCtx(rpcAddr, "Chain33.Query", params, &res)
result, err := ctx.RunResult()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, err
}
res = *result.(*dty.CandidatorReply)
var cands []*dty.Candidator
for _, val := range res.GetCandidators() {
bPubkey, err := hex.DecodeString(val.Pubkey)
if err != nil {
return nil, err
}
cand := &dty.Candidator{
Pubkey: bPubkey,
Address: val.Address,
Ip: val.Ip,
Votes: val.Votes,
Status: val.Status,
}
cands = append(cands, cand)
}
return cands, nil
}
*/
func (client *Client)MonitorCandidators() {
ticker := time.NewTicker(30 * time.Second)
for {
......@@ -639,31 +574,6 @@ func (client *Client)isValidatorSetSame(v1, v2 *ttypes.ValidatorSet) bool {
return true
}
/*
func (client *Client)QueryCycleBoundaryInfo(cycle int64)(*dty.DposCBInfo, error) {
var params rpctypes.Query4Jrpc
params.Execer = dty.DPosX
req := &dty.DposCBQuery{
Ty: dty.QueryCBInfoByCycle,
Cycle: cycle,
}
params.FuncName = dty.FuncNameQueryCBInfoByCycle
params.Payload = types.MustPBToJSON(req)
var res dty.DposCBReply
ctx := jsonrpc.NewRPCCtx(rpcAddr, "Chain33.Query", params, &res)
result, err := ctx.RunResult()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, err
}
res = *result.(*dty.DposCBReply)
return res.CbInfo, nil
}
*/
func (client *Client)CreateRecordCBTx(info *dty.DposCBInfo)(tx*types.Transaction, err error) {
var action dty.DposVoteAction
action.Value = &dty.DposVoteAction_RecordCB{
......@@ -743,7 +653,7 @@ func (client *Client)QueryVrfInfos(pubkeys [][]byte, cycle int64)([]*dty.VrfInfo
res := msg.GetData().(types.Message).(*dty.DposVrfReply)
if len(res.Vrf) > 0 {
dposlog.Info("DposVrfQuerys ok") //, "info", fmt.Sprintf("Cycle:%d,pubkey:%s,Height:%d,M:%s,R:%s,P:%s", res.Vrf[0].Cycle, res.Vrf[0].Pubkey, res.Vrf[0].Height, res.Vrf[0].M, res.Vrf[0].R, res.Vrf[0].P))
dposlog.Info("DposVrfQuerys ok")
} else {
dposlog.Info("DposVrfQuerys ok,but no info")
}
......@@ -787,78 +697,3 @@ func (client *Client)QueryVrfInfos(pubkeys [][]byte, cycle int64)([]*dty.VrfInfo
return infos, nil
}
/*
func (client *Client)QueryVrfInfo(pubkeys []byte, cycle int64)(*dty.VrfInfo, error) {
req := &dty.DposVrfQuery{
Cycle: cycle,
Ty: dty.QueryVrfByCycleForPubkeys,
}
req.Pubkeys = append(req.Pubkeys, hex.EncodeToString(pubkeys))
param, err := proto.Marshal(req)
if err != nil {
dposlog.Error("Marshal DposVrfQuery failed", "err", err)
return nil, err
}
msg := client.GetQueueClient().NewMessage("execs", types.EventBlockChainQuery,
&types.ChainExecutor{
Driver: dty.DPosX,
FuncName: dty.FuncNameQueryVrfByCycleForPubkeys,
StateHash: zeroHash[:],
Param:param,
})
err = client.GetQueueClient().Send(msg, true)
if err != nil {
dposlog.Error("send DposVrfQuery to dpos exec failed", "err", err)
return nil, err
}
msg, err = client.GetQueueClient().Wait(msg)
if err != nil {
dposlog.Error("send DposVrfQuery wait failed", "err", err)
return nil, err
}
res := msg.GetData().(types.Message).(*dty.DposVrfReply)
if len(res.Vrf) > 0 {
dposlog.Info("DposVrfQuery ok", "info", fmt.Sprintf("Cycle:%d,pubkey:%s,Height:%d,M:%s,R:%s,P:%s", res.Vrf[0].Cycle, res.Vrf[0].Pubkey, res.Vrf[0].Height, res.Vrf[0].M, res.Vrf[0].R, res.Vrf[0].P))
} else {
dposlog.Info("DposVrfQuery ok,but no info")
}
vrf := res.Vrf[0]
bPubkey, err := hex.DecodeString(vrf.Pubkey)
if err != nil {
bPubkey = nil
}
bM, err := hex.DecodeString(vrf.M)
if err != nil {
bM = nil
}
bR, err := hex.DecodeString(vrf.R)
if err != nil {
bR = nil
}
bP, err := hex.DecodeString(vrf.P)
if err != nil {
bP = nil
}
info := &dty.VrfInfo{
Index: vrf.Index,
Pubkey: bPubkey,
Cycle: vrf.Cycle,
Height: vrf.Height,
Time: vrf.Time,
M: bM,
R: bR,
P: bP,
}
return info, nil
}
*/
\ No newline at end of file
......@@ -14,6 +14,7 @@ import (
"sync"
"sync/atomic"
"time"
"errors"
"github.com/33cn/chain33/common/crypto"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
......@@ -489,6 +490,20 @@ func (node *Node) startInitPeer(peer *peerConn) error {
// FilterConnByAddr TODO:can make fileter by addr
func (node *Node) FilterConnByAddr(addr net.Addr) error {
ip, _ := splitHostPort(addr.String())
legalIP := false
for _, v := range node.seeds {
host := strings.Split(v, ":")
if ip == host[0] {
legalIP = true
break
}
}
if !legalIP {
return errors.New(fmt.Sprintf("%s is not legal seeds ip", ip))
}
return nil
}
......
......@@ -8,10 +8,12 @@ import (
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"math"
"strings"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
dpostype "github.com/33cn/plugin/plugin/consensus/dpos/types"
......@@ -200,6 +202,38 @@ func recvCBInfo(cs *ConsensusState, info *dpostype.DPosCBInfo) {
cs.UpdateCBInfo(newInfo)
}
func printNotify(notify *dpostype.DPosNotify) string {
if notify.Vote.LastCBInfo != nil {
return fmt.Sprintf("vote:[VotedNodeIndex:%d, VotedNodeAddr:%s,Cycle:%d,CycleStart:%d,CycleStop:%d,PeriodStart:%d,PeriodStop:%d,Height:%d,VoteID:%s,CBInfo[Cycle:%d,StopHeight:%d,StopHash:%s],ShuffleType:%d,ValidatorSize:%d,VrfValidatorSize:%d,NoVrfValidatorSize:%d];HeightStop:%d,HashStop:%s,NotifyTimestamp:%d,NotifyNodeIndex:%d,NotifyNodeAddrress:%s,Sig:%s",
notify.Vote.VotedNodeIndex, hex.EncodeToString(notify.Vote.VotedNodeAddress), notify.Vote.Cycle, notify.Vote.CycleStart, notify.Vote.CycleStop, notify.Vote.PeriodStart, notify.Vote.PeriodStop, notify.Vote.Height, hex.EncodeToString(notify.Vote.VoteID),
notify.Vote.LastCBInfo.Cycle, notify.Vote.LastCBInfo.StopHeight, notify.Vote.LastCBInfo.StopHash, notify.Vote.ShuffleType, len(notify.Vote.Validators), len(notify.Vote.VrfValidators), len(notify.Vote.NoVrfValidators),
notify.HeightStop, hex.EncodeToString(notify.HashStop), notify.NotifyTimestamp, notify.NotifyNodeIndex, hex.EncodeToString(notify.NotifyNodeAddress), hex.EncodeToString(notify.Signature))
} else {
return fmt.Sprintf("vote:[VotedNodeIndex:%d, VotedNodeAddr:%s,Cycle:%d,CycleStart:%d,CycleStop:%d,PeriodStart:%d,PeriodStop:%d,Height:%d,VoteID:%s,CBInfo[],ShuffleType:%d,ValidatorSize:%d,VrfValidatorSize:%d,NoVrfValidatorSize:%d];HeightStop:%d,HashStop:%s,NotifyTimestamp:%d,NotifyNodeIndex:%d,NotifyNodeAddrress:%s,Sig:%s",
notify.Vote.VotedNodeIndex, hex.EncodeToString(notify.Vote.VotedNodeAddress), notify.Vote.Cycle, notify.Vote.CycleStart, notify.Vote.CycleStop, notify.Vote.PeriodStart, notify.Vote.PeriodStop, notify.Vote.Height, hex.EncodeToString(notify.Vote.VoteID),
notify.Vote.ShuffleType, len(notify.Vote.Validators), len(notify.Vote.VrfValidators), len(notify.Vote.NoVrfValidators),
notify.HeightStop, hex.EncodeToString(notify.HashStop), notify.NotifyTimestamp, notify.NotifyNodeIndex, hex.EncodeToString(notify.NotifyNodeAddress), hex.EncodeToString(notify.Signature))
}
}
func printVote(vote *dpostype.DPosVote) string {
return fmt.Sprintf("vote:[VotedNodeIndex:%d,VotedNodeAddress:%s,Cycle:%d,CycleStart:%d,CycleStop:%d,PeriodStart:%d,PeriodStop:%d,Height:%d,VoteID:%s,VoteTimestamp:%d,VoterNodeIndex:%d,VoterNodeAddress:%s,Sig:%s]",
vote.VoteItem.VotedNodeIndex, common.ToHex(vote.VoteItem.VotedNodeAddress), vote.VoteItem.Cycle, vote.VoteItem.CycleStart, vote.VoteItem.CycleStop, vote.VoteItem.PeriodStart, vote.VoteItem.PeriodStop, vote.VoteItem.Height,
hex.EncodeToString(vote.VoteItem.VoteID), vote.VoteTimestamp, vote.VoterNodeIndex, hex.EncodeToString(vote.VoterNodeAddress), hex.EncodeToString(vote.Signature))
}
func printVoteItem(voteItem *dpostype.VoteItem) string {
if voteItem.LastCBInfo != nil {
return fmt.Sprintf("vote:[VotedNodeIndex:%d, VotedNodeAddr:%s,Cycle:%d,CycleStart:%d,CycleStop:%d,PeriodStart:%d,PeriodStop:%d,Height:%d,VoteID:%s,CBInfo[Cycle:%d,StopHeight:%d,StopHash:%s],ShuffleType:%d,ValidatorSize:%d,VrfValidatorSize:%d,NoVrfValidatorSize:%d]",
voteItem.VotedNodeIndex, hex.EncodeToString(voteItem.VotedNodeAddress), voteItem.Cycle, voteItem.CycleStart, voteItem.CycleStop, voteItem.PeriodStart, voteItem.PeriodStop, voteItem.Height, hex.EncodeToString(voteItem.VoteID),
voteItem.LastCBInfo.Cycle, voteItem.LastCBInfo.StopHeight, voteItem.LastCBInfo.StopHash, voteItem.ShuffleType, len(voteItem.Validators), len(voteItem.VrfValidators), len(voteItem.NoVrfValidators))
} else {
return fmt.Sprintf("vote:[VotedNodeIndex:%d, VotedNodeAddr:%s,Cycle:%d,CycleStart:%d,CycleStop:%d,PeriodStart:%d,PeriodStop:%d,Height:%d,VoteID:%s,CBInfo[],ShuffleType:%d,ValidatorSize:%d,VrfValidatorSize:%d,NoVrfValidatorSize:%d]",
voteItem.VotedNodeIndex, hex.EncodeToString(voteItem.VotedNodeAddress), voteItem.Cycle, voteItem.CycleStart, voteItem.CycleStop, voteItem.PeriodStart, voteItem.PeriodStop, voteItem.Height, hex.EncodeToString(voteItem.VoteID),
voteItem.ShuffleType, len(voteItem.Validators), len(voteItem.VrfValidators), len(voteItem.NoVrfValidators))
}
}
// State is the base class of dpos state machine, it defines some interfaces.
type State interface {
timeOut(cs *ConsensusState)
......@@ -247,6 +281,7 @@ func (init *InitState) timeOut(cs *ConsensusState) {
cs.SetState(VotingStateObj)
dposlog.Info("Change state.", "from", "InitState", "to", "VotingState")
//通过node发送p2p消息到其他节点
dposlog.Info("VotingState send a vote", "vote info", printVote(vote.DPosVote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix())
cs.dposState.sendVote(cs, vote.DPosVote)
cs.scheduleDPosTimeout(time.Duration(timeoutVoting)*time.Millisecond, VotingStateType)
......@@ -312,20 +347,7 @@ func (voting *VotingState) sendVote(cs *ConsensusState, vote *dpostype.DPosVote)
}
func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote) {
dposlog.Info("VotingState get a vote", "VotedNodeIndex", vote.VoteItem.VotedNodeIndex,
"VotedNodeAddress", common.ToHex(vote.VoteItem.VotedNodeAddress),
"Cycle", vote.VoteItem.Cycle,
"CycleStart", vote.VoteItem.CycleStart,
"CycleStop", vote.VoteItem.CycleStop,
"PeriodStart", vote.VoteItem.PeriodStart,
"periodStop", vote.VoteItem.PeriodStop,
"Height", vote.VoteItem.Height,
"VoteID", common.ToHex(vote.VoteItem.VoteID),
"VoteTimestamp", vote.VoteTimestamp,
"VoterNodeIndex", vote.VoterNodeIndex,
"VoterNodeAddress", common.ToHex(vote.VoterNodeAddress),
"Signature", common.ToHex(vote.Signature),
"localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix())
dposlog.Info("VotingState get a vote", "vote info", printVote(vote), "localNodeIndex", cs.client.ValidatorIndex(), "now", time.Now().Unix())
if !cs.VerifyVote(vote) {
dposlog.Info("VotingState verify vote failed")
......@@ -337,7 +359,7 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
result, voteItem := cs.CheckVotes()
if result == voteSuccess {
dposlog.Info("VotingState get 2/3 result", "final vote:", voteItem.String())
dposlog.Info("VotingState get 2/3 result", "final vote:", printVoteItem(voteItem))
dposlog.Info("VotingState change state to VotedState")
//切换状态
cs.SetState(VotedStateObj)
......@@ -523,7 +545,6 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.scheduleDPosTimeout(time.Duration(timeoutWaitNotify)*time.Millisecond, WaitNotifyStateType)
if cs.cachedNotify != nil {
cs.dposState.recvNotify(cs, cs.cachedNotify)
}
return
}
......@@ -638,12 +659,12 @@ func (wait *WaitNofifyState) recvNotify(cs *ConsensusState, notify *dpostype.DPo
block := cs.client.GetCurrentBlock()
if block.Height > notify.HeightStop {
dposlog.Info("Local block height is advanced than notify, discard it.", "localheight", block.Height, "notify", notify.String())
dposlog.Info("Local block height is advanced than notify, discard it.", "localheight", block.Height, "notify", printNotify(notify))
return
} else if block.Height == notify.HeightStop && bytes.Equal(block.Hash(), notify.HashStop) {
dposlog.Info("Local block height is sync with notify", "notify", notify.String())
dposlog.Info("Local block height is sync with notify", "notify", printNotify(notify))
} else {
dposlog.Info("Local block height is not sync with notify", "localheight", cs.client.GetCurrentHeight(), "notify", notify.String())
dposlog.Info("Local block height is not sync with notify", "localheight", cs.client.GetCurrentHeight(), "notify", printNotify(notify))
hint := time.NewTicker(3 * time.Second)
beg := time.Now()
catchupFlag := false
......
......@@ -238,43 +238,6 @@ func queryVrfByCycleForTopN(kvdb db.KVDB, req *dty.DposVrfQuery) (types.Message,
}
vrfs := queryVrfByCycleAndPubkeys(kvdb, pubkeys, req.Cycle)
/*
VrfRPTable := dty.NewDposVrfRPTable(kvdb)
query := VrfRPTable.GetQuery(kvdb)
var tempCands [] *dty.JsonCandidator
var vrfs [] *dty.VrfInfo
for i := 0; i < len(res.Candidators); i ++ {
rows, err := query.ListIndex("pubkey_cycle", []byte(fmt.Sprintf("%s:%018d", res.Candidators[i].Pubkey, req.Cycle)), nil, 1, 0)
if err != nil {
logger.Error("queryVrf RP failed", "pubkey", res.Candidators[i].Pubkey, "cycle", req.Cycle)
tempCands = append(tempCands, res.Candidators[i])
continue
}
vrfRP := rows[0].Data.(*dty.DposVrfRP)
vrf := getVrfInfoFromVrfRP(vrfRP)
vrfs = append(vrfs, vrf)
}
if tempCands == nil || len(tempCands) == 0 {
return &dty.DposVrfReply{Vrf: getJsonVrfs(vrfs)}, nil
}
vrfMTable := dty.NewDposVrfMTable(kvdb)
query = vrfMTable.GetQuery(kvdb)
for i := 0; i < len(tempCands); i++ {
rows, err := query.ListIndex("pubkey_cycle", []byte(fmt.Sprintf("%s:%018d", tempCands[i].Pubkey, req.Cycle)), nil, 1, 0)
if err != nil {
logger.Error("queryVrf M failed", "pubkey", res.Candidators[i].Pubkey, "cycle", req.Cycle)
continue
}
vrfM := rows[0].Data.(*dty.DposVrfM)
vrf := getVrfInfoFromVrfM(vrfM)
vrfs = append(vrfs, vrf)
}
*/
return &dty.DposVrfReply{Vrf: getJsonVrfs(vrfs)}, nil
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment