Commit 2cb03695 authored by 张振华's avatar 张振华

update

parent 0ef31ffb
This diff is collapsed.
......@@ -325,7 +325,7 @@ OuterLoop:
// 对于受托节点,才需要初始化区块,启动共识相关程序等,后续支持投票要做成动态切换的。
if client.isDelegator {
client.InitBlock()
client.csState.InitCycleBoundaryInfo()
client.csState.Init()
node.Start()
}
......
......@@ -3,6 +3,16 @@ syntax = "proto3";
package types;
message CycleBoundaryInfo{
int64 cycle = 1;
int64 stopHeight = 2;
string stopHash = 3;
}
message SuperNode{
bytes address = 1;
bytes pubKey = 2;
}
message VoteItem {
int32 votedNodeIndex = 1; //被投票的节点索引
bytes votedNodeAddress = 2; //被投票的节点地址
......@@ -13,6 +23,11 @@ message VoteItem {
int64 periodStop = 7; //新节点负责出块的终止时间
int64 height = 8; //新节点负责出块的起始高度
bytes voteID = 9; //选票ID
CycleBoundaryInfo lastCBInfo = 10;
int64 shuffleType = 11;
repeated SuperNode validators = 12;
repeated SuperNode vrfValidators = 13;
repeated SuperNode noVrfValidators = 14;
}
//DPosVote Dpos共识的节点投票,为达成共识用。
......
......@@ -76,6 +76,97 @@ func DecideTaskByTime(now int64) (task Task) {
return task
}
func generateVote(cs *ConsensusState) *dpostype.Vote {
//获得当前高度
height := cs.client.GetCurrentHeight()
now := time.Now().Unix()
if cs.lastMyVote != nil && math.Abs(float64(now - cs.lastMyVote.VoteItem.PeriodStop)) <= 1 {
now += 2
}
//计算当前时间,属于哪一个周期,应该哪一个节点出块,应该出块的高度
task := DecideTaskByTime(now)
cs.ShuffleValidators(task.cycle)
addr, validator := cs.validatorMgr.GetValidatorByIndex(int(task.nodeID))
if addr == nil && validator == nil {
dposlog.Error("Address and Validator is nil", "node index", task.nodeID, "now", now, "cycle", dposCycle, "period", dposPeriod)
//cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return nil
}
//生成vote, 对于vote进行签名
voteItem := &dpostype.VoteItem{
VotedNodeAddress: addr,
VotedNodeIndex: int32(task.nodeID),
Cycle: task.cycle,
CycleStart: task.cycleStart,
CycleStop: task.cycleStop,
PeriodStart: task.periodStart,
PeriodStop: task.periodStop,
Height: height + 1,
}
cs.validatorMgr.FillVoteItem(voteItem)
encode, err := json.Marshal(voteItem)
if err != nil {
panic("Marshal vote failed.")
}
voteItem.VoteID = crypto.Ripemd160(encode)
index := cs.validatorMgr.GetIndexByPubKey(cs.privValidator.GetPubKey().Bytes())
if index == -1 {
panic("This node's address is not exist in Validators.")
}
vote := &dpostype.Vote{
DPosVote: &dpostype.DPosVote{
VoteItem: voteItem,
VoteTimestamp: now,
VoterNodeAddress: cs.privValidator.GetAddress(),
VoterNodeIndex: int32(index),
},
}
return vote
}
func checkVrf(cs *ConsensusState) {
now := time.Now().Unix()
task := DecideTaskByTime(now)
middleTime := task.cycleStart + (task.cycleStop - task.cycleStart) / 2
if now < middleTime {
info := cs.GetVrfInfoByCircle(task.cycle, VrfQueryTypeM)
if info == nil {
vrfM := &dty.DposVrfMRegist{
Pubkey: hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()),
Cycle: task.cycle,
M: cs.currentVote.LastCBInfo.StopHash,
}
cs.SendRegistVrfMTx(vrfM)
}
} else {
info := cs.GetVrfInfoByCircle(task.cycle, VrfQueryTypeRP)
if info != nil && len(info.M) > 0 && (len(info.R) == 0 || len(info.P) == 0){
hash, proof := cs.VrfEvaluate(info.M)
vrfRP := &dty.DposVrfRPRegist{
Pubkey: hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()),
Cycle: task.cycle,
R: hex.EncodeToString(hash[:]),
P: hex.EncodeToString(proof),
}
cs.SendRegistVrfRPTx(vrfRP)
}
}
}
// State is the base class of dpos state machine, it defines some interfaces.
type State interface {
timeOut(cs *ConsensusState)
......@@ -104,67 +195,14 @@ func (init *InitState) timeOut(cs *ConsensusState) {
//设定超时时间,超时后再检查链接数量
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
} else {
//获得当前高度
height := cs.client.GetCurrentHeight()
now := time.Now().Unix()
if cs.lastMyVote != nil && math.Abs(float64(now-cs.lastMyVote.VoteItem.PeriodStop)) <= 1 {
now += 2
}
//计算当前时间,属于哪一个周期,应该哪一个节点出块,应该出块的高度
task := DecideTaskByTime(now)
addr, validator := cs.validatorMgr.Validators.GetByIndex(int(task.nodeID))
if addr == nil && validator == nil {
dposlog.Error("Address and Validator is nil", "node index", task.nodeID, "now", now, "cycle", dposCycle, "period", dposPeriod)
//cs.SetState(InitStateObj)
vote := generateVote(cs)
if nil == vote {
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return
}
//生成vote, 对于vote进行签名
voteItem := &dpostype.VoteItem{
VotedNodeAddress: addr,
VotedNodeIndex: int32(task.nodeID),
Cycle: task.cycle,
CycleStart: task.cycleStart,
CycleStop: task.cycleStop,
PeriodStart: task.periodStart,
PeriodStop: task.periodStop,
Height: height + 1,
}
encode, err := json.Marshal(voteItem)
if err != nil {
panic("Marshal vote failed.")
//cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
//return
}
voteItem.VoteID = crypto.Ripemd160(encode)
index := -1
for i := 0; i < cs.validatorMgr.Validators.Size(); i++ {
if bytes.Equal(cs.validatorMgr.Validators.Validators[i].Address, cs.privValidator.GetAddress()) {
index = i
break
}
}
if index == -1 {
panic("This node's address is not exist in Validators.")
}
vote := &dpostype.Vote{DPosVote: &dpostype.DPosVote{
VoteItem: voteItem,
VoteTimestamp: now,
VoterNodeAddress: cs.privValidator.GetAddress(),
VoterNodeIndex: int32(index),
},
}
if err := cs.privValidator.SignVote(cs.validatorMgr.ChainID, vote); err != nil {
dposlog.Error("SignVote failed", "vote", vote.String())
//cs.SetState(InitStateObj)
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return
}
......@@ -279,6 +317,14 @@ func (voting *VotingState) recvVote(cs *ConsensusState, vote *dpostype.DPosVote)
cs.SetCurrentVote(voteItem)
//检查最终投票是否与自己的投票一致,如果不一致,需要更新本地的信息,保证各节点共识结果执行一致。
if !bytes.Equal(cs.myVote.VoteItem.VoteID, voteItem.VoteID) {
if !cs.validatorMgr.UpdateFromVoteItem(voteItem) {
panic("This node's validators are not the same with final vote, please check")
}
}
//进行VRF相关处理
checkVrf(cs)
//1s后检查是否出块,是否需要重新投票
cs.scheduleDPosTimeout(time.Millisecond*500, VotedStateType)
} else if result == continueToVote {
......@@ -330,6 +376,15 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
if bytes.Equal(cs.privValidator.GetAddress(), cs.currentVote.VotedNodeAddress) {
//当前节点为出块节点
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height + 1 < cs.currentVote.Height {
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType)
return
}
//时间到了节点切换时刻
if now >= cs.currentVote.PeriodStop {
//当前时间超过了节点切换时间,需要进行重新投票
dposlog.Info("VotedState timeOut over periodStop.", "periodStop", cs.currentVote.PeriodStop)
......@@ -344,28 +399,7 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
StopHash: hex.EncodeToString(block.Hash()),
Pubkey: hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()),
}
/*
err := cs.privValidator.SignCBInfo(info)
if err != nil {
dposlog.Error("SignCBInfo failed.", "err", err)
} else {
tx, err := cs.client.CreateRecordCBTx(info)
if err != nil {
dposlog.Error("CreateRecordCBTx failed.", "err", err)
}else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RecordCBTx.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
if err != nil {
dposlog.Error("Send RecordCBTx to mempool failed.", "err", err)
} else {
dposlog.Error("Send RecordCBTx to mempool ok.", "err", err)
}
}
}
*/
cs.SendCBTx(info)
info2 := &dty.DposCBInfo{
Cycle: info.Cycle,
......@@ -417,12 +451,6 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return
}
//如果区块未同步,则等待;如果区块已同步,则进行后续正常出块的判断和处理。
if block.Height + 1 < cs.currentVote.Height {
dposlog.Info("VotedState timeOut but block is not sync,wait...", "localHeight", block.Height, "vote height", cs.currentVote.Height)
cs.scheduleDPosTimeout(time.Second*1, VotedStateType)
return
}
//当前时间未到节点切换时间,则继续进行出块判断
if block.BlockTime >= task.blockStop {
......@@ -457,6 +485,8 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
} else {
dposlog.Info("This node is not current owner.", "current owner index", cs.currentVote.VotedNodeIndex, "this node index", cs.client.ValidatorIndex())
//根据时间进行vrf相关处理,如果在(cyclestart,middle)之间,发布M,如果在(middle,cyclestop)之间,发布R、P
checkVrf(cs)
//非当前出块节点,如果到了切换出块节点的时间,则进行状态切换,进行投票
if now >= cs.currentVote.PeriodStop {
//当前时间超过了节点切换时间,需要进行重新投票
......
This diff is collapsed.
......@@ -6,6 +6,7 @@ package types
import (
"bytes"
"crypto/ecdsa"
"encoding/hex"
"encoding/json"
"errors"
......@@ -14,8 +15,9 @@ import (
"sync"
"github.com/33cn/chain33/common/crypto"
dty "github.com/33cn/plugin/plugin/dapp/dposvote/types"
vrf "github.com/33cn/chain33/common/vrf/secp256k1"
"github.com/33cn/chain33/types"
secp256k1 "github.com/btcsuite/btcd/btcec"
)
// KeyText ...
......@@ -32,8 +34,10 @@ type PrivValidator interface {
SignVote(chainID string, vote *Vote) error
SignNotify(chainID string, notify *Notify) error
SignCBInfo(info *dty.DposCBInfo) error
SignMsg(msg []byte) (sig string, err error)
SignTx(tx *types.Transaction)
VrfEvaluate(input []byte) (hash [32]byte, proof []byte)
VrfProof(pubkey []byte, input []byte, hash [32]byte, proof []byte) bool
}
// PrivValidatorFS implements PrivValidator using data persisted to disk
......@@ -319,40 +323,58 @@ func (pv *PrivValidatorImp) SignNotify(chainID string, notify *Notify) error {
}
// SignCBInfo signs a canonical representation of the DposCBInfo, Implements PrivValidator.
func (pv *PrivValidatorImp) SignCBInfo(info *dty.DposCBInfo) error {
func (pv *PrivValidatorImp) SignMsg(msg []byte) (sig string, err error) {
pv.mtx.Lock()
defer pv.mtx.Unlock()
buf := new(bytes.Buffer)
info.Pubkey = hex.EncodeToString(pv.PubKey.Bytes())
canonical := dty.CanonicalOnceCBInfo{
Cycle: info.Cycle,
StopHeight: info.StopHeight,
StopHash: info.StopHash,
Pubkey: info.Pubkey,
}
byteCB, err := json.Marshal(&canonical)
if err != nil {
return errors.New(Fmt("Error marshal CanonicalOnceCBInfo: %v", err))
}
_, err = buf.Write(byteCB)
_, err = buf.Write(msg)
if err != nil {
return errors.New(Fmt("Error write buffer: %v", err))
return "", errors.New(Fmt("Error write buffer: %v", err))
}
signature := pv.PrivKey.Sign(buf.Bytes())
info.Signature = hex.EncodeToString(signature.Bytes())
return nil
sig = hex.EncodeToString(signature.Bytes())
return sig, nil
}
// SignTx signs a tx, Implements PrivValidator.
func (pv *PrivValidatorImp)SignTx(tx *types.Transaction){
tx.Sign(types.SECP256K1, pv.PrivKey)
}
// VrfEvaluate use input to generate hash & proof.
func (pv *PrivValidatorImp) VrfEvaluate(input []byte) (hash [32]byte, proof []byte) {
pv.mtx.Lock()
defer pv.mtx.Unlock()
privKey, _ := secp256k1.PrivKeyFromBytes(secp256k1.S256(), pv.PrivKey.Bytes())
vrfPriv := &vrf.PrivateKey{PrivateKey: (*ecdsa.PrivateKey)(privKey)}
hash, proof = vrfPriv.Evaluate(input)
return hash, proof
}
func (pv *PrivValidatorImp) VrfProof(pubkey []byte, input []byte, hash [32]byte, proof []byte) bool {
pv.mtx.Lock()
defer pv.mtx.Unlock()
pubKey, err := secp256k1.ParsePubKey(pubkey, secp256k1.S256())
if err != nil {
return false
}
vrfPub := &vrf.PublicKey{PublicKey: (*ecdsa.PublicKey)(pubKey)}
vrfHash, err := vrfPub.ProofToHash(input, proof)
if err != nil {
return false
}
if bytes.Equal(hash[:], vrfHash[:]){
return true
}
return false
}
// Persist height/round/step and signature
func (pv *PrivValidatorImp) saveSigned(signBytes []byte, sig crypto.Signature) {
......
......@@ -12,12 +12,19 @@ import (
"math/rand"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
dty "github.com/33cn/plugin/plugin/dapp/dposvote/types"
)
var (
r *rand.Rand
)
const (
ShuffleTypeNoVrf = iota
ShuffleTypeVrf
ShuffleTypePartVrf
)
// ValidatorMgr ...
type ValidatorMgr struct {
// Immutable
......@@ -28,7 +35,11 @@ type ValidatorMgr struct {
// Note that if s.LastBlockHeight causes a valset change,
// we set s.LastHeightValidatorsChanged = s.LastBlockHeight + 1
Validators *ttypes.ValidatorSet
VrfValidators *ttypes.ValidatorSet
NoVrfValidators *ttypes.ValidatorSet
LastCycleBoundaryInfo *dty.DposCBInfo
ShuffleCycle int64
ShuffleType int64 //0-no vrf 1-vrf 2-part vrf
// The latest AppHash we've received from calling abci.Commit()
AppHash []byte
}
......@@ -37,10 +48,19 @@ type ValidatorMgr struct {
func (s ValidatorMgr) Copy() ValidatorMgr {
return ValidatorMgr{
ChainID: s.ChainID,
Validators: s.Validators.Copy(),
AppHash: s.AppHash,
ShuffleCycle: s.ShuffleCycle,
ShuffleType: s.ShuffleType,
VrfValidators: s.VrfValidators.Copy(),
NoVrfValidators: s.NoVrfValidators.Copy(),
LastCycleBoundaryInfo: &dty.DposCBInfo{
Cycle: s.LastCycleBoundaryInfo.Cycle,
StopHeight: s.LastCycleBoundaryInfo.StopHeight,
StopHash: s.LastCycleBoundaryInfo.StopHash,
Pubkey: s.LastCycleBoundaryInfo.Pubkey,
Signature: s.LastCycleBoundaryInfo.Signature,
},
}
}
......@@ -97,3 +117,146 @@ func MakeGenesisValidatorMgr(genDoc *ttypes.GenesisDoc) (ValidatorMgr, error) {
AppHash: genDoc.AppHash,
}, nil
}
func (s *ValidatorMgr) GetValidatorByIndex(index int) (addres []byte, val *ttypes.Validator) {
if index < 0 || index >= len(s.Validators.Validators) {
return nil, nil
}
if s.ShuffleType == ShuffleTypeNoVrf {
val = s.Validators.Validators[index]
return val.Address, val.Copy()
} else if s.ShuffleType == ShuffleTypeVrf {
val = s.VrfValidators.Validators[index]
return address.PubKeyToAddress(val.PubKey).Hash160[:], val.Copy()
} else if s.ShuffleType == ShuffleTypePartVrf {
if index < len(s.VrfValidators.Validators) {
val = s.VrfValidators.Validators[index]
return address.PubKeyToAddress(val.PubKey).Hash160[:], val.Copy()
} else {
val = s.NoVrfValidators.Validators[index - len(s.VrfValidators.Validators)]
return address.PubKeyToAddress(val.PubKey).Hash160[:], val.Copy()
}
}
return nil, nil
}
func (s *ValidatorMgr) GetIndexByPubKey(pubkey []byte) (index int) {
if nil == pubkey {
return -1
}
index = -1
if s.ShuffleType == ShuffleTypeNoVrf {
for i := 0; i < s.Validators.Size(); i++ {
if bytes.Equal(s.Validators.Validators[i].PubKey, pubkey) {
index = i
return index
}
}
} else if s.ShuffleType == ShuffleTypeVrf {
for i := 0; i < s.VrfValidators.Size(); i++ {
if bytes.Equal(s.VrfValidators.Validators[i].PubKey, pubkey) {
index = i
return index
}
}
} else if s.ShuffleType == ShuffleTypePartVrf {
for i := 0; i < s.VrfValidators.Size(); i++ {
if bytes.Equal(s.VrfValidators.Validators[i].PubKey, pubkey) {
index = i
return index
}
}
for j := 0; j < s.NoVrfValidators.Size(); j++ {
if bytes.Equal(s.NoVrfValidators.Validators[j].PubKey, pubkey) {
index = j + s.VrfValidators.Size()
return index
}
}
}
return index
}
func (s *ValidatorMgr) FillVoteItem(voteItem *ttypes.VoteItem) {
voteItem.LastCBInfo = &ttypes.CycleBoundaryInfo{
Cycle: s.LastCycleBoundaryInfo.Cycle,
StopHeight: s.LastCycleBoundaryInfo.StopHeight,
StopHash: s.LastCycleBoundaryInfo.StopHash,
}
voteItem.ShuffleType = s.ShuffleType
for i := 0; i < s.Validators.Size(); i++ {
node := &ttypes.SuperNode{
PubKey: s.Validators.Validators[i].PubKey,
Address: s.Validators.Validators[i].Address,
}
voteItem.Validators = append(voteItem.Validators, node)
}
for i := 0; i < s.VrfValidators.Size(); i++ {
node := &ttypes.SuperNode{
PubKey: s.VrfValidators.Validators[i].PubKey,
Address: s.VrfValidators.Validators[i].Address,
}
voteItem.VrfValidators = append(voteItem.VrfValidators, node)
}
for i := 0; i < s.NoVrfValidators.Size(); i++ {
node := &ttypes.SuperNode{
PubKey: s.NoVrfValidators.Validators[i].PubKey,
Address: s.NoVrfValidators.Validators[i].Address,
}
voteItem.NoVrfValidators = append(voteItem.NoVrfValidators, node)
}
}
func (s *ValidatorMgr) UpdateFromVoteItem(voteItem *ttypes.VoteItem) bool {
validators := voteItem.Validators
for i := 0; i < s.Validators.Size(); i++ {
if !bytes.Equal(validators[i].PubKey, s.Validators.Validators[i].PubKey) {
return false
}
}
if s.LastCycleBoundaryInfo == nil ||
voteItem.LastCBInfo.Cycle != s.LastCycleBoundaryInfo.Cycle ||
voteItem.LastCBInfo.StopHeight != s.LastCycleBoundaryInfo.StopHeight ||
voteItem.LastCBInfo.StopHash != s.LastCycleBoundaryInfo.StopHash {
s.LastCycleBoundaryInfo = &dty.DposCBInfo{
Cycle: voteItem.LastCBInfo.Cycle,
StopHeight: voteItem.LastCBInfo.StopHeight,
StopHash: voteItem.LastCBInfo.StopHash,
}
}
var vrfVals []*ttypes.Validator
for i := 0; i < len(voteItem.VrfValidators); i++ {
val := &ttypes.Validator{
Address: voteItem.VrfValidators[i].Address,
PubKey: voteItem.VrfValidators[i].PubKey,
}
vrfVals = append(vrfVals, val)
}
s.VrfValidators = ttypes.NewValidatorSet(vrfVals)
var noVrfVals []*ttypes.Validator
for i := 0; i < len(voteItem.NoVrfValidators); i++ {
val := &ttypes.Validator{
Address: voteItem.NoVrfValidators[i].Address,
PubKey: voteItem.NoVrfValidators[i].PubKey,
}
noVrfVals = append(noVrfVals, val)
}
s.NoVrfValidators = ttypes.NewValidatorSet(noVrfVals)
return true
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment