Commit 56e73c7f authored by 张振华's avatar 张振华

update

parent 69366e2b
......@@ -105,6 +105,13 @@ blockInterval=3
continueBlockNum=12
isValidator=false
rpcAddr="http://localhost:9801"
#shuffleType为1表示使用固定出块顺序,为2表示使用vrf信息进行出块顺序洗牌
shuffleType=2
#是否更新topN,如果为true,根据下面几个配置项定期更新topN节点;如果为false,则一直使用初始配置的节点,不关注投票结果
whetherUpdateTopN=true
blockNumToUpdateDelegate=20000
registTopNHeightLimit=100
updateTopNHeightLimit=200
[store]
name="kvdb"
......
......@@ -119,6 +119,7 @@ type ConsensusState struct {
vrfInfoMap map[int64] *dty.VrfInfo
vrfInfosMap map[int64] []*dty.VrfInfo
cachedTopNCands []*dty.TopNCandidators
}
// NewConsensusState returns a new ConsensusState.
......@@ -284,6 +285,8 @@ func (cs *ConsensusState) handleMsg(mi MsgInfo) {
var err error
msg, peerID, peerIP := mi.Msg, string(mi.PeerID), mi.PeerIP
dposlog.Info("Recv consensus msg", "msg type", fmt.Sprintf("%T", msg), "peerid", peerID, "peerip", peerIP)
switch msg := msg.(type) {
case *dpostype.DPosVote:
cs.dposState.recvVote(cs, msg)
......@@ -602,6 +605,99 @@ func (cs *ConsensusState) Init() {
cs.InitCycleBoundaryInfo(task)
cs.InitCycleVrfInfo(task)
cs.InitCycleVrfInfos(task)
info := CalcTopNVersion(cs.client.GetCurrentHeight())
cs.InitTopNCandidators(info.Version)
}
// InitTopNCandidators method
func (cs *ConsensusState) InitTopNCandidators(version int64){
for version > 0 {
info, err := cs.client.QueryTopNCandidators(version)
if err == nil && info != nil && info.Status == dty.TopNCandidatorsVoteMajorOK{
cs.UpdateTopNCandidators(info)
return
}
version--
}
return
}
// UpdateTopNCandidators method
func (cs *ConsensusState) UpdateTopNCandidators(info *dty.TopNCandidators) {
if len(cs.cachedTopNCands) == 0 {
cs.cachedTopNCands = append(cs.cachedTopNCands, info)
return
}
if cs.cachedTopNCands[len(cs.cachedTopNCands) - 1].Version < info.Version {
cs.cachedTopNCands = append(cs.cachedTopNCands, info)
}
}
// GetTopNCandidatorsByVersion method
func (cs *ConsensusState) GetTopNCandidatorsByVersion(version int64)(info *dty.TopNCandidators) {
if len(cs.cachedTopNCands) == 0 || cs.cachedTopNCands[len(cs.cachedTopNCands) - 1].Version < version{
info, err := cs.client.QueryTopNCandidators(version)
if err == nil && info != nil {
if info.Status == dty.TopNCandidatorsVoteMajorOK {
cs.UpdateTopNCandidators(info)
}
return info
}
return nil
}
for i := len(cs.cachedTopNCands) - 1 ; i >= 0 ; i-- {
if cs.cachedTopNCands[i].Version == version {
return cs.cachedTopNCands[i]
} else if cs.cachedTopNCands[i].Version < version {
return nil
}
}
return nil
}
// GetLastestTopNCandidators method
func (cs *ConsensusState) GetLastestTopNCandidators()(info *dty.TopNCandidators) {
length := len(cs.cachedTopNCands)
if length > 0 {
return cs.cachedTopNCands[length -1]
}
return nil
}
// IsTopNRegisted method
func (cs *ConsensusState) IsTopNRegisted(info *dty.TopNCandidators) bool {
if nil == info {
return false
}
for i := 0; i < len(info.CandsVotes); i++ {
if bytes.Equal(info.CandsVotes[i].SignerPubkey, cs.privValidator.GetPubKey().Bytes()) {
return true
}
}
return false
}
// IsInTopN method
func (cs *ConsensusState) IsInTopN(info *dty.TopNCandidators) bool {
if nil == info || info.Status != dty.TopNCandidatorsVoteMajorOK || len(info.FinalCands) == 0 {
return false
}
for i := 0; i < len(info.FinalCands); i++ {
if bytes.Equal(info.FinalCands[i].Pubkey, cs.privValidator.GetPubKey().Bytes()) {
return true
}
}
return false
}
// InitCycleBoundaryInfo method
......@@ -748,7 +844,7 @@ func (cs *ConsensusState) SendCBTx(info *dty.DposCBInfo) bool {
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RecordCBTx.")
dposlog.Info("Sign RecordCBTx ok.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
......@@ -772,7 +868,7 @@ func (cs *ConsensusState) SendRegistVrfMTx(info *dty.DposVrfMRegist) bool {
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RegistVrfMTx.")
dposlog.Info("Sign RegistVrfMTx ok.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
......@@ -795,7 +891,7 @@ func (cs *ConsensusState) SendRegistVrfRPTx(info *dty.DposVrfRPRegist) bool {
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign RegVrfRPTx.")
dposlog.Info("Sign RegVrfRPTx ok.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
......@@ -803,7 +899,7 @@ func (cs *ConsensusState) SendRegistVrfRPTx(info *dty.DposVrfRPRegist) bool {
dposlog.Error("Send RegVrfRPTx to mempool failed.", "err", err)
return false
} else {
dposlog.Error("Send RegVrfRPTx to mempool ok.", "err", err)
dposlog.Info("Send RegVrfRPTx to mempool ok.", "err", err)
}
}
......@@ -979,6 +1075,16 @@ func (cs *ConsensusState) GetVrfInfosByCircle(cycle int64) (infos []*dty.VrfInfo
// ShuffleValidators method
func (cs *ConsensusState) ShuffleValidators(cycle int64){
if shuffleType == dposShuffleTypeFixOrderByAddr {
dposlog.Info("ShuffleType FixOrderByAddr,so do nothing", "cycle", cycle)
cs.validatorMgr.VrfValidators = nil
cs.validatorMgr.NoVrfValidators = nil
cs.validatorMgr.ShuffleCycle = cycle
cs.validatorMgr.ShuffleType = ShuffleTypeNoVrf
return
}
if cycle == cs.validatorMgr.ShuffleCycle {
//如果已经洗过牌,则直接返回,不重复洗牌
dposlog.Info("Shuffle for this cycle is done already.", "cycle", cycle)
......@@ -987,16 +1093,11 @@ func (cs *ConsensusState) ShuffleValidators(cycle int64){
cbInfo := cs.GetCBInfoByCircle(cycle - 1)
if cbInfo == nil {
dposlog.Info("GetCBInfoByCircle for Shuffle failed, don't use vrf to shuffle.", "cycle", cycle)
cs.validatorMgr.VrfValidators = nil
cs.validatorMgr.NoVrfValidators = nil
cs.validatorMgr.ShuffleCycle = cycle
cs.validatorMgr.ShuffleType = ShuffleTypeNoVrf
return
dposlog.Info("GetCBInfoByCircle failed", "cycle", cycle)
} else {
cs.validatorMgr.LastCycleBoundaryInfo = cbInfo
dposlog.Info("GetCBInfoByCircle ok", "cycle", cycle, "stopHeight", cbInfo.StopHeight, "stopHash", cbInfo.StopHash)
}
cs.validatorMgr.LastCycleBoundaryInfo = cbInfo
dposlog.Info("GetCBInfoByCircle for Shuffle ok", "cycle", cycle, "stopHeight", cbInfo.StopHeight, "stopHash", cbInfo.StopHash)
infos := cs.GetVrfInfosByCircle(cycle - 1)
if infos == nil {
......@@ -1015,7 +1116,7 @@ func (cs *ConsensusState) ShuffleValidators(cycle int64){
for i := 0; i < len(infos); i++ {
if isValidVrfInfo(infos[i]) {
var vrfBytes []byte
vrfBytes = append(vrfBytes, []byte(cbInfo.StopHash)...)
//vrfBytes = append(vrfBytes, []byte(cbInfo.StopHash)...)
vrfBytes = append(vrfBytes, infos[i].R...)
item := &ttypes.Validator{
......@@ -1089,4 +1190,44 @@ func (cs *ConsensusState) VrfEvaluate(input []byte)(hash [32]byte, proof []byte)
// VrfEvaluate method
func (cs *ConsensusState) VrfProof(pubkey []byte, input []byte, hash [32]byte, proof []byte) bool{
return cs.privValidator.VrfProof(pubkey, input, hash, proof)
}
// SendCBTx method
func (cs *ConsensusState) SendTopNRegistTx(reg *dty.TopNCandidatorRegist) bool {
//info.Pubkey = strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes()))
obj := dty.CanonicalTopNCandidator(reg.Cand)
reg.Cand.Hash = obj.ID()
reg.Cand.SignerPubkey = cs.privValidator.GetPubKey().Bytes()
byteCB, err := json.Marshal(reg.Cand)
if err != nil {
dposlog.Error("marshal TopNCandidator failed", "err", err)
}
sig, err := cs.privValidator.SignMsg(byteCB)
if err != nil {
dposlog.Error("TopNCandidator failed.", "err", err)
return false
} else {
reg.Cand.Signature = sig.Bytes()
tx, err := cs.client.CreateTopNRegistTx(reg)
if err != nil {
dposlog.Error("CreateTopNRegistTx failed.", "err", err)
return false
} else {
cs.privValidator.SignTx(tx)
dposlog.Info("Sign TopNRegistTx ok.")
//将交易发往交易池中,方便后续重启或者新加入的超级节点查询
msg := cs.client.GetQueueClient().NewMessage("mempool", types.EventTx, tx)
err = cs.client.GetQueueClient().Send(msg, false)
if err != nil {
dposlog.Error("Send TopNRegistTx to mempool failed.", "err", err)
return false
} else {
dposlog.Info("Send TopNRegistTx to mempool ok.")
}
}
}
return true
}
\ No newline at end of file
......@@ -7,11 +7,11 @@ package dpos
import (
"bytes"
"encoding/hex"
"fmt"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/util"
"strings"
"time"
"fmt"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/log/log15"
......@@ -27,7 +27,8 @@ import (
)
const dposVersion = "0.1.0"
const dposShuffleTypeFixOrderByAddr = 1
const dposShuffleTypeOrderByVrfInfo = 2
var (
dposlog = log15.New("module", "dpos")
genesis string
......@@ -48,6 +49,11 @@ var (
zeroHash [32]byte
dposPort string = "36656"
rpcAddr string = "http://0.0.0.0:8801"
shuffleType int32 = dposShuffleTypeOrderByVrfInfo //shuffleType为1表示使用固定出块顺序,为2表示使用vrf信息进行出块顺序洗牌
whetherUpdateTopN = false //是否更新topN,如果为true,根据下面几个配置项定期更新topN节点;如果为false,则一直使用初始配置的节点,不关注投票结果
blockNumToUpdateDelegate int64 = 20000
registTopNHeightLimit int64 = 100
updateTopNHeightLimit int64 = 200
)
func init() {
......@@ -86,6 +92,11 @@ type subConfig struct {
IsValidator bool `json:"isValidator"`
Port string `json:"port"`
RpcAddr string `json:"rpcAddr"`
ShuffleType int32 `json:"shuffleType"`
WhetherUpdateTopN bool `json:"whetherUpdateTopN"`
BlockNumToUpdateDelegate int64 `json:"blockNumToUpdateDelegate"`
RegistTopNHeightLimit int64 `json:"registTopNHeightLimit"`
UpdateTopNHeightLimit int64 `json:"updateTopNHeightLimit"`
}
func (client *Client) applyConfig(sub []byte) {
......@@ -147,6 +158,26 @@ func (client *Client) applyConfig(sub []byte) {
if subcfg.RpcAddr != "" {
rpcAddr = subcfg.RpcAddr
}
if subcfg.ShuffleType > 0 {
shuffleType = subcfg.ShuffleType
}
if subcfg.WhetherUpdateTopN {
whetherUpdateTopN = subcfg.WhetherUpdateTopN
}
if subcfg.BlockNumToUpdateDelegate > 0 {
blockNumToUpdateDelegate = subcfg.BlockNumToUpdateDelegate
}
if subcfg.RegistTopNHeightLimit > 0 {
registTopNHeightLimit = subcfg.RegistTopNHeightLimit
}
if subcfg.UpdateTopNHeightLimit > 0 {
updateTopNHeightLimit = subcfg.UpdateTopNHeightLimit
}
}
// New ...
......@@ -282,28 +313,38 @@ OuterLoop:
}
if block != nil {
//time.Sleep(time.Second * 5)
cands, err := client.QueryCandidators()
if err != nil {
dposlog.Info("QueryCandidators failed", "err", err)
} else {
if len(cands) != int(dposDelegateNum) {
dposlog.Info("QueryCandidators success but no enough candidators", "dposDelegateNum", dposDelegateNum, "candidatorNum", len(cands))
//cands, err := client.QueryCandidators()
info := CalcTopNVersion(block.Height)
version := info.Version
var topN *dty.TopNCandidators
for version >= 0 {
topN, err = client.QueryTopNCandidators(version)
if err !=nil || topN == nil {
version --
} else {
validators := make([]*ttypes.Validator, dposDelegateNum)
nodes := make([]string, dposDelegateNum)
for i, val := range cands {
// Make validator
validators[i] = &ttypes.Validator{
Address: address.PubKeyToAddress(val.Pubkey).Hash160[:],
PubKey: val.Pubkey,
}
nodes[i] = val.Ip + ":" + dposPort
break
}
}
if topN == nil {
dposlog.Info("QueryTopNCandidators failed, no candidators")
} else if len(topN.FinalCands) != int(dposDelegateNum) {
dposlog.Info("QueryTopNCandidators success but no enough candidators", "dposDelegateNum", dposDelegateNum, "candidatorNum", len(topN.FinalCands))
} else {
validators := make([]*ttypes.Validator, dposDelegateNum)
nodes := make([]string, dposDelegateNum)
for i, val := range topN.FinalCands {
// Make validator
validators[i] = &ttypes.Validator{
Address: address.PubKeyToAddress(val.Pubkey).Hash160[:],
PubKey: val.Pubkey,
}
valMgr.Validators = ttypes.NewValidatorSet(validators)
dposlog.Info("QueryCandidators success and update validator set", "old validators", printValidators(valMgrTmp.Validators), "new validators", printValidators(valMgr.Validators))
dposlog.Info("QueryCandidators success and update validator node ips", "old validator ips", printNodeIPs(validatorNodes), "new validators ips", printNodeIPs(nodes))
validatorNodes = nodes
nodes[i] = val.Ip + ":" + dposPort
}
valMgr.Validators = ttypes.NewValidatorSet(validators)
dposlog.Info("QueryCandidators success and update validator set", "old validators", printValidators(valMgrTmp.Validators), "new validators", printValidators(valMgr.Validators))
dposlog.Info("QueryCandidators success and update validator node ips", "old validator ips", printNodeIPs(validatorNodes), "new validators ips", printNodeIPs(nodes))
validatorNodes = nodes
}
}
......@@ -514,52 +555,6 @@ func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
return cands, nil
}
func (client *Client)MonitorCandidators() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <- ticker.C:
dposlog.Info("Monitor Candidators")
block, err := client.RequestLastBlock()
if err != nil {
panic(err)
}
if block != nil {
cands, err := client.QueryCandidators()
if err != nil {
dposlog.Info("Query Candidators failed", "err", err)
} else {
if len(cands) != int(dposDelegateNum) {
dposlog.Info("QueryCandidators success but no enough candidators", "dposDelegateNum", dposDelegateNum, "candidatorNum", len(cands))
} else {
validators := make([]*ttypes.Validator, dposDelegateNum)
for i, val := range cands {
// Make validator
validators[i] = &ttypes.Validator{
Address: address.PubKeyToAddress(val.Pubkey).Hash160[:],
PubKey: val.Pubkey,
}
}
validatorSet := ttypes.NewValidatorSet(validators)
dposlog.Info("QueryCandidators success and update validator set")
if !client.isValidatorSetSame(validatorSet, client.csState.validatorMgr.Validators){
dposlog.Info("ValidatorSet from contract is changed, so stop the node and restart the consensus.")
client.node.Stop()
time.Sleep(time.Second * 3)
go client.StartConsensus()
} else {
dposlog.Info("ValidatorSet from contract is the same,no change.")
}
}
}
}
}
}
}
func (client *Client)isValidatorSetSame(v1, v2 *ttypes.ValidatorSet) bool {
if v1 == nil || v2 == nil || len(v1.Validators) != len(v2.Validators){
return false
......@@ -697,3 +692,62 @@ func (client *Client)QueryVrfInfos(pubkeys [][]byte, cycle int64)([]*dty.VrfInfo
return infos, nil
}
func (client *Client)CreateTopNRegistTx(reg *dty.TopNCandidatorRegist)(tx*types.Transaction, err error) {
var action dty.DposVoteAction
action.Value = &dty.DposVoteAction_RegistTopN{
RegistTopN: reg,
}
action.Ty = dty.DPosVoteActionRegistTopNCandidator
tx, err = types.CreateFormatTx("dpos", types.Encode(&action))
if err != nil {
return nil, err
}
return tx, nil
}
// QueryCycleBoundaryInfo method
func (client *Client) QueryTopNCandidators(version int64)(*dty.TopNCandidators, error){
req := &dty.TopNCandidatorsQuery{Version: version}
param, err := proto.Marshal(req)
if err != nil {
dposlog.Error("Marshal TopNCandidatorsQuery failed", "version", version, "err", err)
return nil, err
}
msg := client.GetQueueClient().NewMessage("execs", types.EventBlockChainQuery,
&types.ChainExecutor{
Driver: dty.DPosX,
FuncName: dty.FuncNameQueryTopNByVersion,
StateHash: zeroHash[:],
Param:param,
})
err = client.GetQueueClient().Send(msg, true)
if err != nil {
dposlog.Error("send TopNCandidatorsQuery to dpos exec failed", "version", version, "err", err)
return nil, err
}
msg, err = client.GetQueueClient().Wait(msg)
if err != nil {
dposlog.Error("send TopNCandidatorsQuery wait failed", "version", version, "err", err)
return nil, err
}
res := msg.GetData().(types.Message).(*dty.TopNCandidatorsReply)
info := res.TopN
dposlog.Info("TopNCandidatorsQuery get reply", "version", info.Version, "status", info.Status, "final candidators", printCandidators(info.FinalCands))
return info, nil
}
func printCandidators(cands []*dty.Candidator) string {
result := "["
for i := 0; i < len(cands); i++ {
fmt.Sprintf("%spubkey:%s,ip:%s;", result, hex.EncodeToString(cands[i].Pubkey), cands[i].Ip)
}
result += "]"
return result
}
\ No newline at end of file
......@@ -12,7 +12,7 @@ import (
"math"
"strings"
"time"
"os"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
......@@ -53,6 +53,9 @@ var WaitNotifyStateObj = &WaitNofifyState{}
var LastCheckVrfMTime = int64(0)
var LastCheckVrfRPTime = int64(0)
var LastCheckRegTopNTime = int64(0)
var LastCheckUpdateTopNTime = int64(0)
// Task 为计算当前时间所属周期的数据结构
type Task struct {
NodeID int64
......@@ -65,6 +68,26 @@ type Task struct {
BlockStop int64
}
type TopNVersionInfo struct {
Version int64
HeightStart int64
HeightStop int64
HeightToStart int64
HeightRegLimit int64
HeightUpdateLimit int64
}
func CalcTopNVersion(height int64) (info TopNVersionInfo) {
info = TopNVersionInfo{}
info.Version = height / blockNumToUpdateDelegate
info.HeightToStart = height % blockNumToUpdateDelegate
info.HeightStart = info.Version * blockNumToUpdateDelegate
info.HeightStop = (info.Version + 1 ) * blockNumToUpdateDelegate - 1
info.HeightRegLimit = info.HeightStart + registTopNHeightLimit
info.HeightUpdateLimit = info.HeightStart + updateTopNHeightLimit
return info
}
// DecideTaskByTime 根据时间戳计算所属的周期,包括cycle周期,负责出块周期,当前出块周期
func DecideTaskByTime(now int64) (task Task) {
task.NodeID = now % dposCycle / dposPeriod
......@@ -138,6 +161,10 @@ func generateVote(cs *ConsensusState) *dpostype.Vote {
}
func checkVrf(cs *ConsensusState) {
if shuffleType != dposShuffleTypeOrderByVrfInfo {
return
}
now := time.Now().Unix()
task := DecideTaskByTime(now)
middleTime := task.CycleStart + (task.CycleStop - task.CycleStart) / 2
......@@ -191,6 +218,99 @@ func checkVrf(cs *ConsensusState) {
}
func checkTopNRegist(cs *ConsensusState) {
if whetherUpdateTopN == false {
return
}
now := time.Now().Unix()
if now - LastCheckRegTopNTime < dposBlockInterval * 3 {
//避免短时间频繁检查,5个区块以内不重复检查
return
}
height := cs.client.GetCurrentHeight()
info := CalcTopNVersion(height)
if height <= info.HeightRegLimit {
//在注册TOPN的区块区间内,则检查本节点是否注册成功,如果否则进行注册
topN := cs.GetTopNCandidatorsByVersion(info.Version)
if topN == nil || !cs.IsTopNRegisted(topN) {
cands, err := cs.client.QueryCandidators()
if err != nil || cands == nil {
dposlog.Error("QueryCandidators failed", "now", now, "height", height, "HeightRegLimit", info.HeightRegLimit, "pubkey", strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes())))
LastCheckRegTopNTime = now
return
}
topNCand := &dty.TopNCandidator {
Cands: cands,
Height: height,
SignerPubkey: cs.privValidator.GetPubKey().Bytes(),
}
obj := dty.CanonicalTopNCandidator(topNCand)
topNCand.Hash = obj.ID()
regist := &dty.TopNCandidatorRegist {
Cand: topNCand,
}
cs.SendTopNRegistTx(regist)
LastCheckRegTopNTime = now
} else {
dposlog.Info("TopN is already registed", "now", now, "height", height, "HeightRegLimit", info.HeightRegLimit, "pubkey", strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes())))
LastCheckRegTopNTime = now + (info.HeightStop - height) * dposBlockInterval
}
} else {
LastCheckRegTopNTime = now + (info.HeightStop - height) * dposBlockInterval
}
}
func checkTopNUpdate(cs *ConsensusState) {
if whetherUpdateTopN == false {
return
}
now := time.Now().Unix()
if now - LastCheckUpdateTopNTime < dposBlockInterval * 1 {
//避免短时间频繁检查,1个区块以内不重复检查
return
}
height := cs.client.GetCurrentHeight()
info := CalcTopNVersion(height)
if height >= info.HeightUpdateLimit {
topN := cs.GetLastestTopNCandidators()
if nil == topN {
dposlog.Error("No valid topN, do nothing", "now", now, "height", height, "HeightUpdateLimit", info.HeightUpdateLimit, "pubkey", strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes())))
LastCheckUpdateTopNTime = now + (info.HeightStop - height) * dposBlockInterval
return
}
for i := 0; i < len(topN.FinalCands); i++ {
if isPubkeyExist(topN.FinalCands[i].Pubkey, cs.validatorMgr.Validators.Validators) {
continue
} else {
dposlog.Error("TopN changed, so restart to use latest topN", "now", now, "height", height, "HeightUpdateLimit", info.HeightUpdateLimit, "pubkey", strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes())))
os.Exit(0)
}
}
dposlog.Info("TopN not changed,so do nothing", "now", now, "height", height, "HeightUpdateLimit", info.HeightUpdateLimit, "pubkey", strings.ToUpper(hex.EncodeToString(cs.privValidator.GetPubKey().Bytes())))
LastCheckUpdateTopNTime = now + (info.HeightStop - height) * dposBlockInterval
} else {
LastCheckUpdateTopNTime = now + (info.HeightUpdateLimit - height - 1) * dposBlockInterval
}
}
func isPubkeyExist(pubkey []byte, validators []*dpostype.Validator) bool {
for i := 0; i < len(validators); i++ {
if bytes.Equal(pubkey, validators[i].PubKey) {
return true
}
}
return false
}
func recvCBInfo(cs *ConsensusState, info *dpostype.DPosCBInfo) {
newInfo := &dty.DposCBInfo{
Cycle: info.Cycle,
......@@ -432,9 +552,11 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//当前时间超过了节点切换时间,需要进行重新投票
dposlog.Info("VotedState timeOut over periodStop.", "periodStop", cs.currentVote.PeriodStop, "cycleStop", cs.currentVote.CycleStop)
isCycleSwith := false
//如果到了cycle结尾,需要构造一个交易,把最终的CycleBoundary信息发布出去
if cs.currentVote.PeriodStop == cs.currentVote.CycleStop {
dposlog.Info("Create new tx for cycle change to record cycle boundary info.", "height", block.Height)
isCycleSwith = true
info := &dty.DposCBInfo{
Cycle: cs.currentVote.Cycle,
......@@ -492,13 +614,25 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
cs.SetNotify(notify2.DPosNotify)
cs.dposState.sendNotify(cs, notify.DPosNotify)
cs.ClearVotes()
//检查是否需要更新TopN,如果有更新,则更新TOPN节点后进入新的状态循环。
if isCycleSwith {
checkTopNUpdate(cs)
}
cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "VotedState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
return
}
//根据时间进行vrf相关处理,如果在(cyclestart,middle)之间,发布M,如果在(middle,cyclestop)之间,发布R、P
checkVrf(cs)
//检查是否应该注册topN,是否已经注册topN
checkTopNRegist(cs)
//当前时间未到节点切换时间,则继续进行出块判断
if block.BlockTime >= task.BlockStop {
//已出块,或者时间落后了。
......@@ -534,6 +668,10 @@ func (voted *VotedState) timeOut(cs *ConsensusState) {
//根据时间进行vrf相关处理,如果在(cyclestart,middle)之间,发布M,如果在(middle,cyclestop)之间,发布R、P
checkVrf(cs)
//检查是否应该注册topN,是否已经注册topN
checkTopNRegist(cs)
//非当前出块节点,如果到了切换出块节点的时间,则进行状态切换,进行投票
if now >= cs.currentVote.PeriodStop {
//当前时间超过了节点切换时间,需要进行重新投票
......@@ -622,6 +760,13 @@ type WaitNofifyState struct {
func (wait *WaitNofifyState) timeOut(cs *ConsensusState) {
//cs.clearVotes()
//检查是否需要更新TopN,如果有更新,则更新TOPN节点后进入新的状态循环。
now := time.Now().Unix()
if now >= cs.lastVote.PeriodStop && cs.lastVote.PeriodStop == cs.lastVote.CycleStop {
checkTopNUpdate(cs)
}
cs.SetState(InitStateObj)
dposlog.Info("Change state because of time.", "from", "WaitNofifyState", "to", "InitState")
cs.scheduleDPosTimeout(time.Duration(timeoutCheckConnections)*time.Millisecond, InitStateType)
......@@ -693,6 +838,12 @@ func (wait *WaitNofifyState) recvNotify(cs *ConsensusState, notify *dpostype.DPo
cs.SaveNotify()
cs.SetNotify(notify)
//检查是否需要更新TopN,如果有更新,则更新TOPN节点后进入新的状态循环。
now := time.Now().Unix()
if now >= cs.lastVote.PeriodStop && cs.lastVote.PeriodStop == cs.lastVote.CycleStop {
checkTopNUpdate(cs)
}
cs.SetState(InitStateObj)
dposlog.Info("Change state because recv notify.", "from", "WaitNofifyState", "to", "InitState")
cs.dposState.timeOut(cs)
......
......@@ -53,7 +53,9 @@ func DPosCmd() *cobra.Command {
DPosCreateCmd(),
DPosVrfVerifyCmd(),
DPosVrfEvaluateCmd(),
DPosCBRecordCmd(),
DPosCBQueryCmd(),
DPosTopNQueryCmd(),
)
return cmd
......@@ -797,9 +799,9 @@ func DPosCBRecordCmd() *cobra.Command {
func addCBRecordCmdFlags(cmd *cobra.Command) {
cmd.Flags().Int64P("cycle", "c", 0, "cycle")
cmd.MarkFlagRequired("cycle")
cmd.Flags().Int64P("height", "h", 0, "height")
cmd.Flags().Int64P("height", "m", 0, "height")
cmd.MarkFlagRequired("height")
cmd.Flags().StringP("hash", "m", "", "block hash")
cmd.Flags().StringP("hash", "s", "", "block hash")
cmd.MarkFlagRequired("hash")
cmd.Flags().StringP("privKey", "k", "", "private key")
cmd.MarkFlagRequired("privKey")
......@@ -867,7 +869,7 @@ func recordCB(cmd *cobra.Command, args []string) {
ctx.RunWithoutMarshal()
}
//DPosVrfQueryCmd 构造VRF相关信息查询的命令行
//DPosCBQueryCmd 查询Cycle Boundary info的命令
func DPosCBQueryCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "cbQuery",
......@@ -934,4 +936,39 @@ func cbQuery(cmd *cobra.Command, args []string) {
ctx := jsonrpc.NewRPCCtx(rpcLaddr, "Chain33.Query", params, &res)
ctx.Run()
}
}
//DPosVrfQueryCmd 构造VRF相关信息查询的命令行
func DPosTopNQueryCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "topNQuery",
Short: "query topN info",
Run: topNQuery,
}
addTopNQueryFlags(cmd)
return cmd
}
func addTopNQueryFlags(cmd *cobra.Command) {
cmd.Flags().Int64P("version", "v", 0, "version")
cmd.MarkFlagRequired("version")
}
func topNQuery(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
version, _ := cmd.Flags().GetInt64("version")
var params rpctypes.Query4Jrpc
params.Execer = dty.DPosX
req := &dty.TopNCandidatorsQuery{
Version: version,
}
params.FuncName = dty.FuncNameQueryTopNByVersion
params.Payload = types.MustPBToJSON(req)
var res dty.TopNCandidatorsReply
ctx := jsonrpc.NewRPCCtx(rpcLaddr, "Chain33.Query", params, &res)
ctx.Run()
}
\ No newline at end of file
......@@ -16,11 +16,14 @@ var logger = log.New("module", "execs.dposvote")
var driverName = dty.DPosX
var (
dposDelegateNum int64 = 3 //委托节点个数,从配置读取,以后可以根据投票结果来定
dposBlockInterval int64 = 3 //出块间隔,当前按3s
dposContinueBlockNum int64 = 6 //一个委托节点当选后,一次性持续出块数量
dposCycle = dposDelegateNum * dposBlockInterval * dposContinueBlockNum
dposPeriod = dposBlockInterval * dposContinueBlockNum
dposDelegateNum int64 = 3 //委托节点个数,从配置读取,以后可以根据投票结果来定
dposBlockInterval int64 = 3 //出块间隔,当前按3s
dposContinueBlockNum int64 = 6 //一个委托节点当选后,一次性持续出块数量
dposCycle = dposDelegateNum * dposBlockInterval * dposContinueBlockNum
dposPeriod = dposBlockInterval * dposContinueBlockNum
blockNumToUpdateDelegate int64 = 20000
registTopNHeightLimit int64 = 100
updateTopNHeightLimit int64 = 200
)
type CycleInfo struct {
......@@ -42,6 +45,10 @@ func calcCycleByTime(now int64) *CycleInfo {
}
}
func calcTopNVersion(height int64) (version, left int64) {
return height / blockNumToUpdateDelegate, height % blockNumToUpdateDelegate
}
func init() {
ety := types.LoadExecutorType(driverName)
ety.InitFuncList(types.ListMethod(&DPos{}))
......@@ -57,11 +64,14 @@ func Init(name string, sub []byte) {
drivers.Register(driverName, newDposVote, types.GetDappFork(driverName, "Enable"))
//读取一下配置项,用于和共识模块一致计算cycle
dposDelegateNum = types.Conf("config.consensus.sub.dpos").GInt("delegateNum")
dposBlockInterval = types.Conf("config.consensus.sub.dpos").GInt("blockInterval")
dposContinueBlockNum = types.Conf("config.consensus.sub.dpos").GInt("continueBlockNum")
dposCycle = dposDelegateNum * dposBlockInterval * dposContinueBlockNum
dposPeriod = dposBlockInterval * dposContinueBlockNum
dposDelegateNum = types.Conf("config.consensus.sub.dpos").GInt("delegateNum")
dposBlockInterval = types.Conf("config.consensus.sub.dpos").GInt("blockInterval")
dposContinueBlockNum = types.Conf("config.consensus.sub.dpos").GInt("continueBlockNum")
blockNumToUpdateDelegate = types.Conf("config.consensus.sub.dpos").GInt("blockNumToUpdateDelegate")
registTopNHeightLimit = types.Conf("config.consensus.sub.dpos").GInt("registTopNHeightLimit")
updateTopNHeightLimit = types.Conf("config.consensus.sub.dpos").GInt("updateTopNHeightLimit")
dposCycle = dposDelegateNum * dposBlockInterval * dposContinueBlockNum
dposPeriod = dposBlockInterval * dposContinueBlockNum
}
//DPos 执行器,用于Dpos候选节点注册、投票,VRF信息注册管理等功能
......
......@@ -79,6 +79,14 @@ func Key(id string) (key []byte) {
key = append(key, []byte(id)...)
return key
}
//Key State数据库中存储记录的Key值格式转换
func TopNKey(id string) (key []byte) {
key = append(key, []byte("mavl-"+dty.DPosX+"-"+"topn"+"-")...)
key = append(key, []byte(id)...)
return key
}
//queryVrfByTime 根据时间信息,查询TopN的受托节点的VRF信息
func queryVrfByTime(kvdb db.KVDB, req *dty.DposVrfQuery) (types.Message, error) {
if req.Ty != dty.QueryVrfByTime{
......@@ -130,7 +138,7 @@ func getVrfInfoFromVrfRP(vrfRP *dty.DposVrfRP) *dty.VrfInfo{
return vrf
}
func isRecordExist(vrfM *dty.DposVrfM, vrfs [] *dty.VrfInfo) bool {
func isVrfMRecordExist(vrfM *dty.DposVrfM, vrfs [] *dty.VrfInfo) bool {
if nil == vrfM || nil == vrfs || 0 == len(vrfs) {
return false
}
......@@ -271,7 +279,7 @@ func queryVrfByCycle(kvdb db.KVDB, req *dty.DposVrfQuery) (types.Message, error)
} else {
for i := 0; i < len(rows); i++ {
vrfM := rows[i].Data.(*dty.DposVrfM)
if !isRecordExist(vrfM, vrfs) {
if !isVrfMRecordExist(vrfM, vrfs) {
vrf := getVrfInfoFromVrfM(vrfM)
vrfs = append(vrfs, vrf)
}
......@@ -517,6 +525,34 @@ func (action *Action) newCandicatorInfo(regist *dty.DposCandidatorRegist) *dty.C
return candInfo
}
//readTopNCandicators 根据版本信息查询特定高度区间的TOPN候选节点信息
func (action *Action) readTopNCandicators(version int64) (*dty.TopNCandidators, error) {
strVersion := fmt.Sprintf("%018d", version)
data, err := action.db.Get(TopNKey(strVersion))
if err != nil {
logger.Error("readTopNCandicators have err:", "err", err.Error())
return nil, err
}
var cands dty.TopNCandidators
//decode
err = types.Decode(data, &cands)
if err != nil {
logger.Error("decode TopNCandidators have err:", err.Error())
return nil, err
}
return &cands, nil
}
func (action *Action) saveTopNCandicators(topCands *dty.TopNCandidators) (kvset []*types.KeyValue) {
value := types.Encode(topCands)
strVersion := fmt.Sprintf("%018d", topCands.Version)
err := action.db.Set(TopNKey(strVersion), value)
if err != nil {
logger.Error("saveCandicator have err:", err.Error())
}
kvset = append(kvset, &types.KeyValue{Key: TopNKey(strVersion), Value: value})
return kvset
}
//queryCBInfoByCycle 根据cycle查询stopHeight及stopHash等CBInfo信息,用于VRF计算
func queryCBInfoByCycle(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, error) {
......@@ -525,6 +561,7 @@ func queryCBInfoByCycle(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, erro
rows, err := query.ListIndex("cycle", []byte(fmt.Sprintf("%018d", req.Cycle)), nil, 1, 0)
if err != nil {
logger.Error("queryCBInfoByCycle have err", "cycle", req.Cycle, "err", err.Error())
return nil, err
}
......@@ -536,6 +573,8 @@ func queryCBInfoByCycle(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, erro
Pubkey: strings.ToUpper(hex.EncodeToString(cbInfo.Pubkey)),
Signature: hex.EncodeToString(cbInfo.StopHash),
}
logger.Info("queryCBInfoByCycle ok", "cycle", req.Cycle, "info", info.String())
return &dty.DposCBReply{CbInfo: info}, nil
}
......@@ -546,6 +585,7 @@ func queryCBInfoByHeight(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, err
rows, err := query.ListIndex("height", []byte(fmt.Sprintf("%018d", req.StopHeight)), nil, 1, 0)
if err != nil {
logger.Error("queryCBInfoByHeight have err", "height", req.StopHeight, "err", err.Error())
return nil, err
}
......@@ -557,6 +597,8 @@ func queryCBInfoByHeight(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, err
Pubkey: strings.ToUpper(hex.EncodeToString(cbInfo.Pubkey)),
Signature: hex.EncodeToString(cbInfo.StopHash),
}
logger.Info("queryCBInfoByHeight ok", "height", req.StopHeight, "info", info.String())
return &dty.DposCBReply{CbInfo: info}, nil
}
......@@ -567,10 +609,14 @@ func queryCBInfoByHash(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, error
hash, err := hex.DecodeString(req.StopHash)
if err != nil {
logger.Error("queryCBInfoByHash failed for decoding hash failed", "hash", req.StopHash, "err", err.Error())
return nil, err
}
rows, err := query.ListIndex("hash", hash, nil, 1, 0)
if err != nil {
logger.Error("queryCBInfoByHash have err", "hash", req.StopHash, "err", err.Error())
return nil, err
}
......@@ -582,9 +628,34 @@ func queryCBInfoByHash(kvdb db.KVDB, req *dty.DposCBQuery) (types.Message, error
Pubkey: strings.ToUpper(hex.EncodeToString(cbInfo.Pubkey)),
Signature: hex.EncodeToString(cbInfo.StopHash),
}
logger.Info("queryCBInfoByHash ok", "hash", req.StopHash, "info", info.String())
return &dty.DposCBReply{CbInfo: info}, nil
}
//queryTopNByVersion 根据version查询具体周期使用的TopN超级节点信息
func queryTopNByVersion(db dbm.KV, req *dty.TopNCandidatorsQuery) (types.Message, error) {
strVersion := fmt.Sprintf("%018d", req.Version)
data, err := db.Get(TopNKey(strVersion))
if err != nil || data == nil{
logger.Error("queryTopNByVersion have err:", "err", err.Error())
return nil, err
}
var cands dty.TopNCandidators
//decode
err = types.Decode(data, &cands)
if err != nil {
logger.Error("decode TopNCandidators have err:", err.Error())
return nil, err
}
reply := &dty.TopNCandidatorsReply{
TopN: &cands,
}
return reply, nil
}
//Regist 注册候选节点
func (action *Action) Regist(regist *dty.DposCandidatorRegist) (*types.Receipt, error) {
var logs []*types.ReceiptLog
......@@ -1142,3 +1213,108 @@ func (action *Action) RecordCB(cbInfo *dty.DposCBInfo) (*types.Receipt, error) {
return &types.Receipt{Ty: types.ExecOk, KV: kv, Logs: logs}, nil
}
//RegistTopN 注册TopN节点
func (action *Action) RegistTopN(regist *dty.TopNCandidatorRegist) (*types.Receipt, error) {
var logs []*types.ReceiptLog
var kv []*types.KeyValue
err := regist.Cand.Verify()
if err != nil {
logger.Error("RegistTopN failed for signature verify failed.", "addr", action.fromaddr, "execaddr", action.execaddr)
return nil, types.ErrInvalidParam
}
currentVersion, left := calcTopNVersion(action.mainHeight)
topNVersion, _ := calcTopNVersion(regist.Cand.Height)
if currentVersion != topNVersion {
logger.Error("RegistTopN failed for wrong version.", "addr", action.fromaddr, "execaddr", action.execaddr,
"regist height", regist.Cand.Height, "regist version", topNVersion, "current height", action.mainHeight, "current version", currentVersion)
return nil, types.ErrInvalidParam
}
if left >= registTopNHeightLimit {
logger.Error("RegistTopN failed for height limit.", "addr", action.fromaddr, "execaddr", action.execaddr,
"current height", action.mainHeight, "registTopNHeightLimit", registTopNHeightLimit, "height in new circle", left)
return nil, types.ErrInvalidParam
}
version := topNVersion - 1
for version >= 0 {
lastTopN, err := action.readTopNCandicators(version)
if err != nil {
logger.Error("read old TopN failed.", "addr", action.fromaddr, "execaddr", action.execaddr, "version", version)
if version == 0 {
//如果从没有注册过,认为是创世阶段,可信环境,只有可信的节点来注册,可以不做过多的判断。
break
} else {
version--
continue
}
}
if lastTopN.Status != dty.TopNCandidatorsVoteMajorOK {
logger.Error("Not legal topN exist.", "addr", action.fromaddr, "execaddr", action.execaddr, "version", version)
if version > 0 {
version--
continue
} else {
break
}
}
isLegalVoter := false
for i := 0; i < len(lastTopN.FinalCands); i++{
if bytes.Equal(regist.Cand.SignerPubkey, lastTopN.FinalCands[i].Pubkey) {
isLegalVoter = true
}
}
if !isLegalVoter {
logger.Error("RegistTopN failed for the voter is not legal topN.", "addr", action.fromaddr, "execaddr", action.execaddr, "voter pubkey", hex.EncodeToString(regist.Cand.SignerPubkey))
return nil, dty.ErrNotLegalTopN
}
break
}
topNCands, err := action.readTopNCandicators(topNVersion)
if topNCands == nil {
topNCands = &dty.TopNCandidators {
Version: topNVersion,
Status: dty.TopNCandidatorsVoteInit,
}
topNCands.CandsVotes = append(topNCands.CandsVotes, regist.Cand)
} else {
for i := 0; i < len(topNCands.CandsVotes); i++ {
if bytes.Equal(topNCands.CandsVotes[i].SignerPubkey, regist.Cand.SignerPubkey) {
logger.Error("RegistTopN failed for vote exist.", "addr", action.fromaddr, "execaddr", action.execaddr, "pubkey", hex.EncodeToString(regist.Cand.SignerPubkey))
return nil, types.ErrInvalidParam
}
}
topNCands.CandsVotes = append(topNCands.CandsVotes, regist.Cand)
}
topNCands.CheckVoteStauts(dposDelegateNum)
logger.Info("RegistTopN add one vote", "addr", action.fromaddr, "execaddr", action.execaddr, "version", topNVersion, "voter pubkey", hex.EncodeToString(regist.Cand.SignerPubkey))
log := &types.ReceiptLog{}
r := &dty.ReceiptTopN{}
log.Ty = dty.TyLogTopNCandidatorRegist
r.Index = action.getIndex()
r.Time = action.blocktime
r.Height = action.mainHeight
r.Version = topNVersion
r.Status = dty.TopNCandidatorStatusRegist
r.Pubkey = regist.Cand.SignerPubkey
r.TopN = regist.Cand
log.Log = types.Encode(r)
logs = append(logs, log)
kv = append(kv, action.saveTopNCandicators(topNCands)...)
return &types.Receipt{Ty: types.ExecOk, KV: kv, Logs: logs}, nil
}
\ No newline at end of file
......@@ -56,3 +56,9 @@ func (d *DPos) Exec_RecordCB(payload *dty.DposCBInfo, tx *types.Transaction, ind
action := NewAction(d, tx, index)
return action.RecordCB(payload)
}
//Exec_RegistTopN DPos执行器注册某一cycle中的TOPN信息
func (d *DPos) Exec_RegistTopN(payload *dty.TopNCandidatorRegist, tx *types.Transaction, index int) (*types.Receipt, error) {
action := NewAction(d, tx, index)
return action.RegistTopN(payload)
}
......@@ -212,6 +212,9 @@ func (d *DPos) execDelLocal(receipt *types.ReceiptData) (*types.LocalDBSet, erro
return nil, err
}
dbSet.KV = append(dbSet.KV, kv...)
case dty.TyLogTopNCandidatorRegist:
//do nothing now
}
}
......@@ -256,4 +259,9 @@ func (d *DPos) ExecDelLocal_VrfRPRegist(payload *dty.DposVrfRPRegist, tx *types.
//ExecDelLocal_RecordCB method
func (d *DPos) ExecDelLocal_RecordCB(payload *dty.DposCBInfo, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return d.execDelLocal(receiptData)
}
//ExecDelLocal_RegistTopN method
func (d *DPos) ExecDelLocal_RegistTopN(payload *dty.TopNCandidatorRegist, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return d.execDelLocal(receiptData)
}
\ No newline at end of file
......@@ -209,6 +209,8 @@ func (d *DPos) execLocal(receipt *types.ReceiptData) (*types.LocalDBSet, error)
return nil, err
}
dbSet.KV = append(dbSet.KV, kvs...)
} else if item.Ty == dty.TyLogTopNCandidatorRegist {
//do nothing
}
}
......@@ -253,4 +255,9 @@ func (d *DPos) ExecLocal_RegistVrfRP(payload *dty.DposVrfRPRegist, tx *types.Tra
//ExecLocal_RecordCB method
func (d *DPos) ExecLocal_RecordCB(payload *dty.DposCBInfo, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return d.execLocal(receiptData)
}
//ExecLocal_RegistTopN method
func (d *DPos) ExecLocal_RegistTopN(payload *dty.TopNCandidatorRegist, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return d.execLocal(receiptData)
}
\ No newline at end of file
......@@ -58,3 +58,8 @@ func (d *DPos) Query_QueryCBInfoByHeight(in *dty.DposCBQuery) (types.Message, er
func (d *DPos) Query_QueryCBInfoByHash(in *dty.DposCBQuery) (types.Message, error) {
return queryCBInfoByHash(d.GetLocalDB(), in)
}
//Query_QueryTopNByVersion method
func (d *DPos) Query_QueryTopNByVersion(in *dty.TopNCandidatorsQuery) (types.Message, error) {
return queryTopNByVersion(d.GetStateDB(), in)
}
\ No newline at end of file
......@@ -76,8 +76,11 @@ message DposVoteAction {
DposVrfRPRegist registVrfRP = 9;
DposVrfQuery vrfQuery = 10;
DposCBInfo recordCB = 11;
DposCBQuery cbQuery = 12;
TopNCandidatorRegist registTopN = 13;
TopNCandidatorsQuery topNQuery = 14;
}
int32 ty = 12;
int32 ty = 15;
}
message CandidatorQuery{
......@@ -261,4 +264,41 @@ message ReceiptCB {
int64 cycleMiddle = 8;
int64 cycleStop = 9;
DposCycleBoundaryInfo cbInfo = 10;
}
\ No newline at end of file
}
message TopNCandidator{
repeated Candidator cands = 1;
bytes hash = 2;
int64 height = 3;
bytes signerPubkey = 4;
bytes signature = 5;
}
message TopNCandidators{
repeated TopNCandidator candsVotes = 1;
int64 version = 2;
int64 status = 3;
repeated Candidator finalCands = 4;
}
message TopNCandidatorRegist{
TopNCandidator cand = 1;
}
message TopNCandidatorsQuery{
int64 version = 1;
}
message TopNCandidatorsReply{
TopNCandidators topN = 1;
}
message ReceiptTopN {
int64 Index = 1;
bytes pubkey = 2;
int64 status = 3;
int64 version = 4;
int64 height = 5;
int64 time = 6;
TopNCandidator topN = 10;
}
......@@ -14,6 +14,7 @@ const (
DposVoteActionRegistVrfM
DposVoteActionRegistVrfRP
DposVoteActionRecordCB
DPosVoteActionRegistTopNCandidator
CandidatorStatusRegist = iota + 1
CandidatorStatusVoted
......@@ -25,6 +26,8 @@ const (
VrfStatusRPRegist
CBStatusRecord = iota + 1
TopNCandidatorStatusRegist = iota + 1
)
//log ty
......@@ -37,6 +40,7 @@ const (
TyLogVrfMRegist = 1006
TyLogVrfRPRegist = 1007
TyLogCBInfoRecord = 1008
TyLogTopNCandidatorRegist = 1009
)
const (
......@@ -47,6 +51,10 @@ const (
VoteTypeVote int32 = 2
VoteTypeCancelVote int32 = 3
VoteTypeCancelAllVote int32 = 4
TopNCandidatorsVoteInit int64 = 0
TopNCandidatorsVoteMajorOK int64 = 1
TopNCandidatorsVoteMajorFail int64 = 2
)
//包的名字可以通过配置文件来配置
//建议用github的组织名称,或者用户名字开头, 再加上自己的插件的名字
......@@ -137,4 +145,7 @@ const (
//QueryCBInfoByHeight 根据stopHeight查询cycle boundary信息
QueryLatestCBInfoByHeight = 4
//FuncNameQueryTopNByVersion func name
FuncNameQueryTopNByVersion = "QueryTopNByVersion"
)
This diff is collapsed.
......@@ -24,5 +24,6 @@ var (
ErrSaveTable = errors.New("ErrSaveTable")
ErrCBRecordExist = errors.New("ErrCBRecordExist")
ErrCycleNotAllowed = errors.New("ErrCycleNotAllowed")
ErrVersionTopNNotExist = errors.New("ErrVersionTopNNotExist")
ErrNotLegalTopN = errors.New("ErrNotLegalTopN")
)
......@@ -7,6 +7,7 @@ import (
"errors"
"fmt"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
"github.com/33cn/chain33/common/crypto"
)
// CanonicalOnceCBInfo ...
......@@ -32,12 +33,7 @@ func CanonicalCBInfo(cb *DposCBInfo) CanonicalOnceCBInfo {
func (cb *DposCBInfo)Verify() error {
buf := new(bytes.Buffer)
canonical := CanonicalOnceCBInfo{
Cycle: cb.Cycle,
StopHeight: cb.StopHeight,
StopHash: cb.StopHash,
Pubkey: cb.Pubkey,
}
canonical := CanonicalCBInfo(cb)
byteCB, err := json.Marshal(&canonical)
if err != nil {
......@@ -73,4 +69,153 @@ func (cb *DposCBInfo)Verify() error {
}
return nil
}
type OnceCandidator struct {
Pubkey []byte `json:"pubkey,omitempty"`
Address string `json:"address,omitempty"`
Ip string `json:"ip,omitempty"`
}
// CanonicalOnceTopNCandidator ...
type CanonicalOnceTopNCandidator struct {
Cands []*OnceCandidator `json:"cands,omitempty"`
Hash []byte `json:"hash,omitempty"`
Height int64 `json:"height,omitempty"`
SignerPubkey []byte `json:"signerPubkey,omitempty"`
Signature []byte `json:"signature,omitempty"`
}
func (topN *CanonicalOnceTopNCandidator) onlyCopyCands() CanonicalOnceTopNCandidator{
obj := CanonicalOnceTopNCandidator{}
for i := 0; i < len(topN.Cands); i++ {
cand := &OnceCandidator{
Pubkey: topN.Cands[i].Pubkey,
Address: topN.Cands[i].Address,
Ip: topN.Cands[i].Ip,
}
obj.Cands = append(obj.Cands, cand)
}
return obj
}
func (topN *CanonicalOnceTopNCandidator) ID() []byte{
obj := topN.onlyCopyCands()
encode, err := json.Marshal(&obj)
if err != nil {
return nil
}
return crypto.Ripemd160(encode)
}
// CanonicalCBInfo ...
func CanonicalTopNCandidator(topN *TopNCandidator) CanonicalOnceTopNCandidator {
onceTopNCandidator := CanonicalOnceTopNCandidator{
Height: topN.Height,
SignerPubkey: topN.SignerPubkey,
}
for i := 0; i < len(topN.Cands); i++ {
cand := &OnceCandidator{
Pubkey: topN.Cands[i].Pubkey,
Address: topN.Cands[i].Address,
Ip: topN.Cands[i].Ip,
}
onceTopNCandidator.Cands = append(onceTopNCandidator.Cands, cand)
}
return onceTopNCandidator
}
func (topN *TopNCandidator)copyWithoutSig() *TopNCandidator {
cpy := &TopNCandidator{
Hash: topN.Hash,
Height: topN.Height,
SignerPubkey: topN.SignerPubkey,
}
cpy.Cands = make([]*Candidator, len(topN.Cands))
for i := 0; i < len(topN.Cands); i++ {
cpy.Cands[i] = topN.Cands[i]
}
return cpy
}
// Verify ...
func (topN *TopNCandidator)Verify() error {
buf := new(bytes.Buffer)
cpy := topN.copyWithoutSig()
byteCB, err := json.Marshal(cpy)
if err != nil {
return errors.New(fmt.Sprintf("Error marshal TopNCandidator: %v", err))
}
_, err = buf.Write(byteCB)
if err != nil {
return errors.New(fmt.Sprintf("Error write buffer: %v", err))
}
pubkey, err := ttypes.ConsensusCrypto.PubKeyFromBytes(topN.SignerPubkey)
if err != nil {
return errors.New(fmt.Sprintf("Error PubKeyFromBytes: %v", err))
}
sig, err := ttypes.ConsensusCrypto.SignatureFromBytes(topN.Signature)
if err != nil {
return errors.New(fmt.Sprintf("Error SignatureFromBytes: %v", err))
}
if !pubkey.VerifyBytes(buf.Bytes(), sig) {
return errors.New(fmt.Sprintf("Error VerifyBytes: %v", err))
}
return nil
}
func (cand *Candidator)Copy() *Candidator {
cpy := &Candidator{
Address: cand.Address,
Ip: cand.Ip,
Votes: cand.Votes,
Status: cand.Status,
}
cpy.Pubkey = make([]byte, len(cand.Pubkey))
copy(cpy.Pubkey, cand.Pubkey)
return cpy
}
func (topNs *TopNCandidators)CheckVoteStauts(delegateNum int64) {
if topNs.Status == TopNCandidatorsVoteMajorOK || topNs.Status == TopNCandidatorsVoteMajorFail {
return
}
voteMap := make(map[string] int64)
for i := 0; i < len(topNs.CandsVotes); i++ {
key := hex.EncodeToString(topNs.CandsVotes[i].Hash)
if _, ok := voteMap[key]; ok {
voteMap[key]++
if voteMap[key] >= (delegateNum * 2 / 3) {
topNs.Status = TopNCandidatorsVoteMajorOK
for j := 0; j < len(topNs.CandsVotes[i].Cands); j++ {
topNs.FinalCands = append(topNs.FinalCands, topNs.CandsVotes[i].Cands[j].Copy())
}
return
}
} else {
voteMap[key] = 1
}
}
var maxVotes int64 = 0
var sumVotes int64 = 0
for _, v := range voteMap {
if v > maxVotes {
maxVotes = v
}
sumVotes += v
}
if maxVotes + (delegateNum - sumVotes) < (delegateNum * 2 / 3) {
topNs.Status = TopNCandidatorsVoteMajorFail
}
}
\ No newline at end of file
......@@ -281,7 +281,7 @@ func (tx *DposCBRow) Get(key string) ([]byte, error) {
} else if key == "height" {
return []byte(fmt.Sprintf("%018d", tx.StopHeight)), nil
} else if key == "hash" {
return []byte(fmt.Sprintf("%X", tx.StopHash)), nil
return tx.StopHash, nil
}
return nil, types.ErrNotFound
......
......@@ -45,6 +45,7 @@ func (t *DPosType) GetTypeMap() map[string]int32 {
"RegistVrfM": DposVoteActionRegistVrfM,
"RegistVrfRP": DposVoteActionRegistVrfRP,
"RecordCB": DposVoteActionRecordCB,
"RegistTopN": DPosVoteActionRegistTopNCandidator,
}
}
......@@ -59,5 +60,6 @@ func (t *DPosType) GetLogMap() map[int64]*types.LogInfo {
TyLogVrfMRegist: {Ty: reflect.TypeOf(ReceiptVrf{}), Name: "TyLogVrfMRegist"},
TyLogVrfRPRegist: {Ty: reflect.TypeOf(ReceiptVrf{}), Name: "TyLogVrfRPRegist"},
TyLogCBInfoRecord: {Ty: reflect.TypeOf(ReceiptCB{}), Name: "TyLogCBInfoRecord"},
TyLogTopNCandidatorRegist: {Ty: reflect.TypeOf(ReceiptTopN{}), Name: "TyLogTopNCandidatorRegist"},
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment