Commit 0ef31ffb authored by 张振华's avatar 张振华

update

parent c0a88d2b
......@@ -6,6 +6,8 @@ package dpos
import (
"bytes"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/33cn/chain33/types"
......@@ -53,6 +55,11 @@ func (ti *timeoutInfo) String() string {
return fmt.Sprintf("%v", ti.Duration)
}
type vrfStatusInfo struct {
Cycle int64
}
// ConsensusState handles execution of the consensus algorithm.
// It processes votes and proposals, and upon reaching agreement,
// commits blocks to the chain and executes them against the application.
......@@ -271,6 +278,8 @@ func (cs *ConsensusState) handleMsg(mi MsgInfo) {
cs.dposState.recvNotify(cs, msg)
case *dpostype.DPosVoteReply:
cs.dposState.recvVoteReply(cs, msg)
case *dty.DposCBInfo:
cs.dposState.recvCBInfo(cs, msg)
default:
dposlog.Error("Unknown msg type", "msg", msg.String(), "peerid", peerID, "peerip", peerIP)
}
......@@ -613,10 +622,9 @@ func (cs *ConsensusState) UpdateCBInfo(info *dty.DposCBInfo) {
if valueNumber >= 5 {
delete(cs.cycleBoundaryMap, oldestCycle)
cs.cycleBoundaryMap[info.Cycle] = info
} else {
cs.cycleBoundaryMap[info.Cycle] = info
}
cs.cycleBoundaryMap[info.Cycle] = info
}
func (cs *ConsensusState) GetCBInfoByCircle(cycle int64) (info *dty.DposCBInfo) {
......@@ -626,4 +634,132 @@ func (cs *ConsensusState) GetCBInfoByCircle(cycle int64) (info *dty.DposCBInfo)
}
return nil
}
\ No newline at end of file
}
// VerifyNotify method
func (cs *ConsensusState) VerifyCBInfo(info *dty.DposCBInfo) bool {
// Verify signature
bPubkey, err := hex.DecodeString(info.Pubkey)
if err != nil {
return false
}
pubkey, err := dpostype.ConsensusCrypto.PubKeyFromBytes(bPubkey)
if err != nil {
dposlog.Error("Error pubkey from bytes", "err", err)
return false
}
bSig, err := hex.DecodeString(info.Signature)
if err != nil {
dposlog.Error("Error signature from bytes", "err", err)
return false
}
sig, err := ttypes.ConsensusCrypto.SignatureFromBytes(bSig)
if err != nil {
dposlog.Error("CBInfo Verify failed", "err", err)
return false
}
buf := new(bytes.Buffer)
canonical := dty.CanonicalOnceCBInfo{
Cycle: info.Cycle,
StopHeight: info.StopHeight,
StopHash: info.StopHash,
Pubkey: info.Pubkey,
}
byteCB, err := json.Marshal(&canonical)
if err != nil {
dposlog.Error("Error Marshal failed: ", "err", err)
return false
}
_, err = buf.Write(byteCB)
if err != nil {
dposlog.Error("Error buf.Write failed: ", "err", err)
return false
}
if !pubkey.VerifyBytes(buf.Bytes(), sig) {
dposlog.Error("Error Verify Bytes failed: ", "err", err)
return false
}
return true
}
func (cs *ConsensusState) SendCBTx(info *dty.DposCBInfo) bool {
err := cs.privValidator.SignCBInfo(info)
if err != nil {
dposlog.Error("SignCBInfo failed.", "err", err)
return false
} else {
tx, err := cs.client.CreateRecordCBTx(info)
if err != nil {
dposlog.Error("CreateRecordCBTx failed.", "err", err)
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RecordCBTx.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
if err != nil {
dposlog.Error("Send RecordCBTx to mempool failed.", "err", err)
return false
} else {
dposlog.Error("Send RecordCBTx to mempool ok.", "err", err)
}
}
}
return true
}
func (cs *ConsensusState) SendRegistVrfMTx(info *dty.DposVrfMRegist) bool {
tx, err := cs.client.CreateRegVrfMTx(info)
if err != nil {
dposlog.Error("CreateRegVrfMTx failed.", "err", err)
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RegistVrfMTx.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
if err != nil {
dposlog.Error("Send RegistVrfMTx to mempool failed.", "err", err)
return false
} else {
dposlog.Error("Send RegistVrfMTx to mempool ok.", "err", err)
}
}
return true
}
func (cs *ConsensusState) SendRegistVrfRPTx(info *dty.DposVrfRPRegist) bool {
tx, err := cs.client.CreateRegVrfRPTx(info)
if err != nil {
dposlog.Error("CreateRegVrfRPTx failed.", "err", err)
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RegVrfRPTx.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
if err != nil {
dposlog.Error("Send RegVrfRPTx to mempool failed.", "err", err)
return false
} else {
dposlog.Error("Send RegVrfRPTx to mempool ok.", "err", err)
}
}
return true
}
func (cs *ConsensusState) QueryVrf(info *dty.DposCBInfo) bool {
......@@ -7,9 +7,7 @@ package dpos
import (
"bytes"
"encoding/hex"
"fmt"
"github.com/33cn/chain33/common/address"
"os"
"time"
"github.com/33cn/chain33/common/crypto"
......@@ -22,9 +20,8 @@ import (
"github.com/33cn/chain33/util"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
jsonrpc "github.com/33cn/chain33/rpc/jsonclient"
rpctypes "github.com/33cn/chain33/rpc/types"
dty "github.com/33cn/plugin/plugin/dapp/dposvote/types"
"github.com/golang/protobuf/proto"
)
const dposVersion = "0.1.0"
......@@ -486,6 +483,58 @@ func (client *Client) ValidatorIndex() int {
}
func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
req := &dty.CandidatorQuery{
TopN: int32(dposDelegateNum),
}
param, err := proto.Marshal(req)
if err != nil {
dposlog.Error("Marshal CandidatorQuery failed", "err", err)
return nil, err
}
msg := client.GetQueueClient().NewMessage("execs", types.EventBlockChainQuery,
&types.ChainExecutor{
Driver: dty.DPosX,
FuncName: dty.FuncNameQueryCandidatorByTopN,
StateHash: zeroHash[:],
Param:param,
})
err = client.GetQueueClient().Send(msg, true)
if err != nil {
dposlog.Error("send CandidatorQuery to dpos exec failed", "err", err)
return nil, err
}
msg, err = client.GetQueueClient().Wait(msg)
if err != nil {
dposlog.Error("send CandidatorQuery wait failed", "err", err)
return nil, err
}
res := msg.GetData().(types.Message).(*dty.CandidatorReply)
var cands []*dty.Candidator
for _, val := range res.GetCandidators() {
bPubkey, err := hex.DecodeString(val.Pubkey)
if err != nil {
return nil, err
}
cand := &dty.Candidator{
Pubkey: bPubkey,
Address: val.Address,
Ip: val.Ip,
Votes: val.Votes,
Status: val.Status,
}
cands = append(cands, cand)
}
return cands, nil
}
/*
func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
var params rpctypes.Query4Jrpc
params.Execer = dty.DPosX
......@@ -523,7 +572,7 @@ func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
}
return cands, nil
}
*/
func (client *Client)MonitorCandidators() {
ticker := time.NewTicker(30 * time.Second)
for {
......@@ -584,6 +633,7 @@ func (client *Client)isValidatorSetSame(v1, v2 *ttypes.ValidatorSet) bool {
return true
}
/*
func (client *Client)QueryCycleBoundaryInfo(cycle int64)(*dty.DposCBInfo, error) {
var params rpctypes.Query4Jrpc
params.Execer = dty.DPosX
......@@ -607,7 +657,7 @@ func (client *Client)QueryCycleBoundaryInfo(cycle int64)(*dty.DposCBInfo, error)
return res.CbInfo, nil
}
*/
func (client *Client)CreateRecordCBTx(info *dty.DposCBInfo)(tx*types.Transaction, err error) {
var action dty.DposVoteAction
action.Value = &dty.DposVoteAction_RecordCB{
......@@ -620,4 +670,175 @@ func (client *Client)CreateRecordCBTx(info *dty.DposCBInfo)(tx*types.Transaction
}
return tx, nil
}
\ No newline at end of file
}
func (client *Client)CreateRegVrfMTx(info *dty.DposVrfMRegist)(tx*types.Transaction, err error) {
var action dty.DposVoteAction
action.Value = &dty.DposVoteAction_RegistVrfM{
RegistVrfM: info,
}
action.Ty = dty.DposVoteActionRegistVrfM
tx, err = types.CreateFormatTx("dpos", types.Encode(&action))
if err != nil {
return nil, err
}
return tx, nil
}
func (client *Client)CreateRegVrfRPTx(info *dty.DposVrfRPRegist)(tx*types.Transaction, err error) {
var action dty.DposVoteAction
action.Value = &dty.DposVoteAction_RegistVrfRP{
RegistVrfRP: info,
}
action.Ty = dty.DposVoteActionRegistVrfRP
tx, err = types.CreateFormatTx("dpos", types.Encode(&action))
if err != nil {
return nil, err
}
return tx, nil
}
func (client *Client)QueryVrfInfos(pubkeys [][]byte, cycle int64)([]*dty.VrfInfo, error) {
req := &dty.DposVrfQuery{
Cycle: cycle,
Ty: dty.QueryVrfByCycleForPubkeys,
}
for i := 0; i < len(pubkeys); i++ {
req.Pubkeys = append(req.Pubkeys, hex.EncodeToString(pubkeys[i]))
}
param, err := proto.Marshal(req)
if err != nil {
dposlog.Error("Marshal DposVrfQuery failed", "err", err)
return nil, err
}
msg := client.GetQueueClient().NewMessage("execs", types.EventBlockChainQuery,
&types.ChainExecutor{
Driver: dty.DPosX,
FuncName: dty.FuncNameQueryVrfByCycleForPubkeys,
StateHash: zeroHash[:],
Param:param,
})
err = client.GetQueueClient().Send(msg, true)
if err != nil {
dposlog.Error("send DposVrfQuery to dpos exec failed", "err", err)
return nil, err
}
msg, err = client.GetQueueClient().Wait(msg)
if err != nil {
dposlog.Error("send DposVrfQuery wait failed", "err", err)
return nil, err
}
res := msg.GetData().(types.Message).(*dty.DposVrfReply)
var infos []*dty.VrfInfo
for _, val := range res.Vrf {
bPubkey, err := hex.DecodeString(val.Pubkey)
if err != nil {
bPubkey = nil
}
bM, err := hex.DecodeString(val.M)
if err != nil {
bM = nil
}
bR, err := hex.DecodeString(val.R)
if err != nil {
bR = nil
}
bP, err := hex.DecodeString(val.P)
if err != nil {
bP = nil
}
info := &dty.VrfInfo{
Index: val.Index,
Pubkey: bPubkey,
Cycle: val.Cycle,
Height: val.Height,
Time: val.Time,
M: bM,
R: bR,
P: bP,
}
infos = append(infos, info)
}
return infos, nil
}
func (client *Client)QueryVrfInfo(pubkeys []byte, cycle int64)(*dty.VrfInfo, error) {
req := &dty.DposVrfQuery{
Cycle: cycle,
Ty: dty.QueryVrfByCycleForPubkeys,
}
req.Pubkeys = append(req.Pubkeys, hex.EncodeToString(pubkeys))
param, err := proto.Marshal(req)
if err != nil {
dposlog.Error("Marshal DposVrfQuery failed", "err", err)
return nil, err
}
msg := client.GetQueueClient().NewMessage("execs", types.EventBlockChainQuery,
&types.ChainExecutor{
Driver: dty.DPosX,
FuncName: dty.FuncNameQueryVrfByCycleForPubkeys,
StateHash: zeroHash[:],
Param:param,
})
err = client.GetQueueClient().Send(msg, true)
if err != nil {
dposlog.Error("send DposVrfQuery to dpos exec failed", "err", err)
return nil, err
}
msg, err = client.GetQueueClient().Wait(msg)
if err != nil {
dposlog.Error("send DposVrfQuery wait failed", "err", err)
return nil, err
}
res := msg.GetData().(types.Message).(*dty.DposVrfReply)
vrf := res.Vrf[0]
bPubkey, err := hex.DecodeString(vrf.Pubkey)
if err != nil {
bPubkey = nil
}
bM, err := hex.DecodeString(vrf.M)
if err != nil {
bM = nil
}
bR, err := hex.DecodeString(vrf.R)
if err != nil {
bR = nil
}
bP, err := hex.DecodeString(vrf.P)
if err != nil {
bP = nil
}
info := &dty.VrfInfo{
Index: vrf.Index,
Pubkey: bPubkey,
Cycle: vrf.Cycle,
Height: vrf.Height,
Time: vrf.Time,
M: bM,
R: bR,
P: bP,
}
return info, nil
}
......@@ -15,8 +15,6 @@ import (
"github.com/33cn/chain33/common/crypto"
dpostype "github.com/33cn/plugin/plugin/consensus/dpos/types"
dty "github.com/33cn/plugin/plugin/dapp/dposvote/types"
"github.com/33cn/chain33/types"
)
var (
......@@ -87,6 +85,8 @@ type State interface {
recvVoteReply(cs *ConsensusState, reply *dpostype.DPosVoteReply)
sendNotify(cs *ConsensusState, notify *dpostype.DPosNotify)
recvNotify(cs *ConsensusState, notify *dpostype.DPosNotify)
//sendCBInfo(cs *ConsensusState, info *dty.DposCBInfo)
recvCBInfo(cs *ConsensusState, info *dty.DposCBInfo)
}
// InitState is the initial state of dpos state machine
......@@ -217,6 +217,14 @@ func (init *InitState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNotif
cs.SetNotify(notify)
}
//func (init *InitState) sendCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
// dposlog.Info("InitState does not support sendCBInfo,so do nothing")
//}
func (init *InitState) recvCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
dposlog.Info("InitState recvCBInfo")
cs.UpdateCBInfo(info)
}
// VotingState is the voting state of dpos state machine until timeout or get an agreement by votes.
type VotingState struct {
}
......@@ -303,6 +311,14 @@ func (voting *VotingState) recvNotify(cs *ConsensusState, notify *dpostype.DPosN
dposlog.Info("VotingState does not support recvNotify,so do nothing")
}
//func (voting *VotingState) sendCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
// dposlog.Info("VotingState does not support sendCBInfo,so do nothing")
//}
func (voting *VotingState) recvCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
dposlog.Info("VotingState recvCBInfo")
cs.UpdateCBInfo(info)
}
// VotedState is the voted state of dpos state machine after getting an agreement for a period
type VotedState struct {
}
......@@ -318,7 +334,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//当前时间超过了节点切换时间,需要进行重新投票
dposlog.Info("VotedState timeOut over periodStop.", "periodStop", cs.currentVote.PeriodStop)
//如果到了cycle结尾,需要再出一个块,把最终的CycleBoundary信息发布出去
//如果到了cycle结尾,需要构造一个交易,把最终的CycleBoundary信息发布出去
if now >= cs.currentVote.CycleStop {
dposlog.Info("Create new tx for cycle change to record cycle boundary info.", "height", block.Height)
......@@ -328,7 +344,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
StopHash: hex.EncodeToString(block.Hash()),
Pubkey: hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()),
}
/*
err := cs.privValidator.SignCBInfo(info)
if err != nil {
dposlog.Error("SignCBInfo failed.", "err", err)
......@@ -349,8 +365,17 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
}
}
}
*/
cs.SendCBTx(info)
info2 := &dty.DposCBInfo{
Cycle: info.Cycle,
StopHeight: info.StopHeight,
StopHash: info.StopHash,
Pubkey: info.Pubkey,
Signature: info.Signature,
}
cs.UpdateCBInfo(info)
voted.sendCBInfo(cs, info2)
}
//当前时间超过了节点切换时间,需要进行重新投票
......@@ -393,7 +418,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
return
}
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height+1 < cs.currentVote.Height {
if block.Height + 1 < cs.currentVote.Height {
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType)
return
......@@ -408,7 +433,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
return
} else if block.BlockTime < task.blockStart {
//本出块周期尚未出块,则进行出块
if task.blockStop-now <= 1 {
if task.blockStop - now <= 1 {
dposlog.Info("Create new block.", "height", block.Height+1)
cs.client.SetBlockTime(task.blockStop)
......@@ -506,6 +531,15 @@ func (voted *VotedState) recvNotify(cs *ConsensusState, notify *dpostype.DPosNot
}
}
func (voted *VotedState) sendCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
cs.broadcastChannel <- MsgInfo{TypeID: dpostype.CBInfoID, Msg: info, PeerID: cs.ourID, PeerIP: ""}
}
func (voted *VotedState) recvCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
dposlog.Info("VotedState recvCBInfo")
cs.UpdateCBInfo(info)
}
// WaitNofifyState is the state of dpos state machine to wait notify.
type WaitNofifyState struct {
}
......@@ -585,6 +619,7 @@ func (wait *WaitNofifyState) recvNotify(cs *ConsensusState, notify *dpostype.DPo
hint.Stop()
}
/*
info := &dty.DposCBInfo{
Cycle: notify.Vote.Cycle,
StopHeight: notify.HeightStop,
......@@ -592,6 +627,7 @@ func (wait *WaitNofifyState) recvNotify(cs *ConsensusState, notify *dpostype.DPo
}
cs.UpdateCBInfo(info)
*/
cs.ClearCachedNotify()
cs.SaveNotify()
......@@ -603,3 +639,12 @@ func (wait *WaitNofifyState) recvNotify(cs *ConsensusState, notify *dpostype.DPo
cs.dposState.timeOut(cs)
//cs.scheduleDPosTimeout(time.Second * 1, InitStateType)
}
//func (wait *WaitNofifyState) sendCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
// dposlog.Info("WaitNofifyState does not support sendCBInfo,so do nothing")
//}
func (wait *WaitNofifyState) recvCBInfo(cs *ConsensusState, info *dty.DposCBInfo) {
dposlog.Info("WaitNofifyState recvCBInfo")
cs.UpdateCBInfo(info)
}
\ No newline at end of file
......@@ -19,6 +19,8 @@ const (
VoteID = byte(0x06)
VoteReplyID = byte(0x07)
NotifyID = byte(0x08)
CBInfoID = byte(0x09)
PacketTypePing = byte(0xff)
PacketTypePong = byte(0xfe)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment