Commit 60692fa9 authored by 张振华's avatar 张振华

update

parent d67f5b9f
......@@ -104,6 +104,7 @@ delegateNum=3
blockInterval=3
continueBlockNum=12
isValidator=false
rpcAddr="http://localhost:9801"
[store]
name="kvdb"
......
......@@ -245,9 +245,7 @@ func (cs *ConsensusState) receiveRoutine() {
// go to the next step
cs.handleTimeout(ti)
case <-cs.Quit:
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
dposlog.Info("ConsensusState recv quit signal.")
return
}
}
......
......@@ -5,6 +5,10 @@
package dpos
import (
"bytes"
"fmt"
"github.com/33cn/chain33/common/address"
"os"
"time"
"github.com/33cn/chain33/common/crypto"
......@@ -16,6 +20,10 @@ import (
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
ttypes "github.com/33cn/plugin/plugin/consensus/dpos/types"
jsonrpc "github.com/33cn/chain33/rpc/jsonclient"
rpctypes "github.com/33cn/chain33/rpc/types"
dty "github.com/33cn/plugin/plugin/dapp/dposvote/types"
)
const dposVersion = "0.1.0"
......@@ -39,6 +47,7 @@ var (
dposPeriod = dposBlockInterval * dposContinueBlockNum
zeroHash [32]byte
dposPort string = "36656"
rpcAddr string = "http://0.0.0.0:8801"
)
func init() {
......@@ -76,6 +85,7 @@ type subConfig struct {
ContinueBlockNum int64 `json:"continueBlockNum"`
IsValidator bool `json:"isValidator"`
Port string `json:"port"`
RpcAddr string `json:"rpcAddr"`
}
func (client *Client) applyConfig(sub []byte) {
......@@ -133,6 +143,10 @@ func (client *Client) applyConfig(sub []byte) {
if subcfg.IsValidator {
isValidator = true
}
if subcfg.RpcAddr != "" {
rpcAddr = subcfg.RpcAddr
}
}
// New ...
......@@ -260,9 +274,33 @@ OuterLoop:
return
}
valMgr = valMgrTmp.Copy()
//todo 对于动态选举或者其他原因导致代理节点发生变化等情况,在后续增加处理 zzh
dposlog.Debug("Load Validator Manager finish", "state", valMgr)
block, err := client.RequestLastBlock()
if err != nil {
panic(err)
}
if block != nil {
time.Sleep(time.Second * 5)
cands, err := client.QueryCandidators()
if err != nil {
dposlog.Info("QueryCandidators failed", "err", err)
} else {
if len(cands) != int(dposDelegateNum) {
dposlog.Info("QueryCandidators success but no enough candidators", "dposDelegateNum", dposDelegateNum, "candidatorNum", len(cands))
} else {
validators := make([]*ttypes.Validator, dposDelegateNum)
for i, val := range cands {
// Make validator
validators[i] = &ttypes.Validator{
Address: address.PubKeyToAddress(val.Pubkey).Hash160[:],
PubKey: val.Pubkey,
}
}
valMgr.Validators = ttypes.NewValidatorSet(validators)
dposlog.Info("QueryCandidators success and update validator set", "old validators", valMgrTmp.Validators.String(), "new validators", valMgr.Validators.String())
}
}
}
dposlog.Info("StartConsensus", "validators", valMgr.Validators)
// Log whether this node is a delegator or an observer
......@@ -292,6 +330,7 @@ OuterLoop:
node.Start()
}
go client.MonitorCandidators()
//go client.CreateBlock()
}
......@@ -397,3 +436,84 @@ func (client *Client) ValidatorIndex() int {
return -1
}
func (client *Client)QueryCandidators()([]*dty.Candidator, error) {
var params rpctypes.Query4Jrpc
params.Execer = dty.DPosX
req := &dty.CandidatorQuery{
TopN: int32(dposDelegateNum),
}
params.FuncName = dty.FuncNameQueryCandidatorByTopN
params.Payload = types.MustPBToJSON(req)
var res dty.CandidatorReply
ctx := jsonrpc.NewRPCCtx(rpcAddr, "Chain33.Query", params, &res)
result, err := ctx.RunResult()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return nil, err
}
res = *result.(*dty.CandidatorReply)
return res.GetCandidators(), nil
}
func (client *Client)MonitorCandidators() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <- ticker.C:
dposlog.Info("Monitor Candidators")
block, err := client.RequestLastBlock()
if err != nil {
panic(err)
}
if block != nil {
cands, err := client.QueryCandidators()
if err != nil {
dposlog.Info("Query Candidators failed", "err", err)
} else {
if len(cands) != int(dposDelegateNum) {
dposlog.Info("QueryCandidators success but no enough candidators", "dposDelegateNum", dposDelegateNum, "candidatorNum", len(cands))
} else {
validators := make([]*ttypes.Validator, dposDelegateNum)
for i, val := range cands {
// Make validator
validators[i] = &ttypes.Validator{
Address: address.PubKeyToAddress(val.Pubkey).Hash160[:],
PubKey: val.Pubkey,
}
}
validatorSet := ttypes.NewValidatorSet(validators)
dposlog.Info("QueryCandidators success and update validator set")
if !client.isValidatorSetSame(validatorSet, client.csState.validatorMgr.Validators){
dposlog.Info("ValidatorSet from contract is changed, so stop the node and restart the consensus.")
client.node.Stop()
time.Sleep(time.Second * 3)
go client.StartConsensus()
} else {
dposlog.Info("ValidatorSet from contract is the same,no change.")
}
}
}
}
}
}
}
func (client *Client)isValidatorSetSame(v1, v2 *ttypes.ValidatorSet) bool {
if v1 == nil || v2 == nil || len(v1.Validators) != len(v2.Validators){
return false
}
for i := 0; i < len(v1.Validators); i++ {
if !bytes.Equal(v1.Validators[i].PubKey, v2.Validators[i].PubKey){
return false
}
}
return true
}
\ No newline at end of file
......@@ -292,6 +292,10 @@ func (node *Node) listenRoutine() {
// StartConsensusRoutine if peers reached the threshold start consensus routine
func (node *Node) StartConsensusRoutine() {
for {
//zzh
if !node.IsRunning(){
break
}
//TODO:the peer count need be optimized
if node.peerSet.Size() >= 0 {
node.state.Start()
......@@ -310,6 +314,11 @@ func (node *Node) BroadcastRoutine() {
return
}
node.Broadcast(msg)
//zzh
if !node.IsRunning() {
break
}
}
}
......
......@@ -26,7 +26,12 @@ var (
WaitNotifyStateType = 4
// StateTypeMapping 为状态的整型值和字符串值的对应关系
StateTypeMapping = map[int]string{InitStateType: "InitState", VotingStateType: "VotingState", VotedStateType: "VotedState", WaitNotifyStateType: "WaitNotifyState"}
StateTypeMapping = map[int]string{
InitStateType: "InitState",
VotingStateType: "VotingState",
VotedStateType: "VotedState",
WaitNotifyStateType: "WaitNotifyState",
}
)
// InitStateObj is the InitState obj
......
......@@ -134,14 +134,17 @@ func addVoteFlags(cmd *cobra.Command) {
cmd.MarkFlagRequired("pubkey")
cmd.Flags().Int64P("votes", "v", 0, "votes")
cmd.MarkFlagRequired("votes")
cmd.Flags().StringP("addr", "a", "", "address of voter")
cmd.MarkFlagRequired("addr")
}
func vote(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
pubkey, _ := cmd.Flags().GetString("pubkey")
votes, _ := cmd.Flags().GetInt64("votes")
addr, _ := cmd.Flags().GetString("addr")
payload := fmt.Sprintf("{\"pubkey\":\"%s\", \"votes\":\"%d\"}", pubkey, votes)
payload := fmt.Sprintf("{\"pubkey\":\"%s\", \"votes\":\"%d\", \"fromAddr\":\"%s\"}", pubkey, votes, addr)
params := &rpctypes.CreateTxIn{
Execer: types.ExecName(dty.DPosX),
ActionName: dty.CreateVoteTx,
......
......@@ -172,7 +172,7 @@ func queryCands(kvdb db.KVDB, req *dty.CandidatorQuery) (types.Message, error) {
bPubkey, _ := hex.DecodeString(req.Pubkeys[i])
rows, err := query.ListIndex("pubkey", bPubkey, nil, 1, 0)
if err != nil {
return nil, err
continue
}
candInfo := rows[0].Data.(*dty.CandidatorInfo)
......@@ -194,35 +194,9 @@ func queryTopNCands(kvdb db.KVDB, req *dty.CandidatorQuery) (types.Message, erro
candTable := dty.NewDposCandidatorTable(kvdb)
query := candTable.GetQuery(kvdb)
rows, err := query.ListIndex("status", []byte(fmt.Sprintf("%2d", dty.CandidatorStatusVoted)), nil, 0, 0)
if err != nil {
return nil, err
}
number := int32(0)
for index := 0; index < len(rows); index++ {
candInfo := rows[index].Data.(*dty.CandidatorInfo)
cand := &dty.Candidator{
Pubkey: candInfo.Pubkey,
Address: candInfo.Address,
Ip: candInfo.Ip,
Votes: candInfo.Votes,
Status: candInfo.Status,
}
cands = append(cands, cand)
number ++
}
sort.Slice(cands, func(i, j int) bool {
return cands[i].Votes > cands[j].Votes
})
if number < req.TopN {
rows, err = query.ListIndex("status", []byte(fmt.Sprintf("%2d", dty.CandidatorStatusRegist)), nil, req.TopN - number, 0)
if err != nil {
return nil, err
}
rows, err := query.ListIndex("status", []byte(fmt.Sprintf("%2d", dty.CandidatorStatusVoted)), nil, 0, 0)
if err == nil {
for index := 0; index < len(rows); index++ {
candInfo := rows[index].Data.(*dty.CandidatorInfo)
cand := &dty.Candidator{
......@@ -234,10 +208,52 @@ func queryTopNCands(kvdb db.KVDB, req *dty.CandidatorQuery) (types.Message, erro
}
cands = append(cands, cand)
number ++
if number == req.TopN {
break
}
sort.Slice(cands, func(i, j int) bool {
return cands[i].Votes > cands[j].Votes
})
}
if number < req.TopN {
rows, err = query.ListIndex("status", []byte(fmt.Sprintf("%2d", dty.CandidatorStatusRegist)), nil, req.TopN - number, 0)
if err == nil {
for index := 0; index < len(rows); index++ {
candInfo := rows[index].Data.(*dty.CandidatorInfo)
cand := &dty.Candidator{
Pubkey: candInfo.Pubkey,
Address: candInfo.Address,
Ip: candInfo.Ip,
Votes: candInfo.Votes,
Status: candInfo.Status,
}
cands = append(cands, cand)
number ++
if number == req.TopN {
break
}
}
}
rows, err = query.ListIndex("status", []byte(fmt.Sprintf("%2d", dty.CandidatorStatusReRegist)), nil, req.TopN - number, 0)
if err == nil {
for index := 0; index < len(rows); index++ {
candInfo := rows[index].Data.(*dty.CandidatorInfo)
cand := &dty.Candidator{
Pubkey: candInfo.Pubkey,
Address: candInfo.Address,
Ip: candInfo.Ip,
Votes: candInfo.Votes,
Status: candInfo.Status,
}
cands = append(cands, cand)
number ++
if number == req.TopN {
break
}
}
}
} else {
cands = cands[0:req.TopN]
}
......@@ -319,6 +335,8 @@ func (action *Action) getReceiptLog(candInfo *dty.CandidatorInfo, statusChange b
log.Ty = dty.TyLogCandicatorVoted
} else if candInfo.Status == dty.CandidatorStatusCancelRegist {
log.Ty = dty.TyLogCandicatorCancelRegist
} else if candInfo.Status == dty.CandidatorStatusReRegist{
log.Ty = dty.TyLogCandicatorReRegist
}
r.Index = action.getIndex()
......@@ -333,6 +351,9 @@ func (action *Action) getReceiptLog(candInfo *dty.CandidatorInfo, statusChange b
if voted {
r.Votes = vote.Votes
r.FromAddr = vote.FromAddr
if r.Votes < 0 {
log.Ty = dty.TyLogCandicatorCancelVoted
}
}
r.CandInfo = candInfo
log.Log = types.Encode(r)
......@@ -387,18 +408,21 @@ func (action *Action) Regist(regist *dty.DposCandidatorRegist) (*types.Receipt,
return nil, dty.ErrCandidatorExist
}
if !action.CheckExecAccountBalance(action.fromaddr, dty.RegistFrozenCoins, 0) {
logger.Error("Regist failed", "addr", action.fromaddr, "execaddr", action.execaddr, "err", types.ErrNoBalance)
return nil, types.ErrNoBalance
}
acc := action.coinsAccount.LoadExecAccount(action.fromaddr, action.execaddr)
if acc.GetFrozen() < dty.RegistFrozenCoins {
if acc.GetBalance() + acc.GetFrozen() < dty.RegistFrozenCoins {
logger.Error("Regist failed", "addr", action.fromaddr, "execaddr", action.execaddr, "Balance", acc.GetBalance(), "Frozen", acc.GetFrozen(),"err", types.ErrNoBalance)
return nil, types.ErrNoBalance
}
receipt, err := action.coinsAccount.ExecFrozen(action.fromaddr, action.execaddr, dty.RegistFrozenCoins)
if err != nil {
logger.Error("ExecFrozen failed", "addr", action.fromaddr, "execaddr", action.execaddr, "amount", dty.RegistFrozenCoins, "err", err.Error())
return nil, err
receipt, err := action.coinsAccount.ExecFrozen(action.fromaddr, action.execaddr, dty.RegistFrozenCoins)
if err != nil {
logger.Error("ExecFrozen failed", "addr", action.fromaddr, "execaddr", action.execaddr, "amount", dty.RegistFrozenCoins, "err", err.Error())
return nil, err
}
logs = append(logs, receipt.Logs...)
kv = append(kv, receipt.KV...)
}
logs = append(logs, receipt.Logs...)
kv = append(kv, receipt.KV...)
logger.Info("Regist", "addr", action.fromaddr, "execaddr", action.execaddr, "new candicator", regist.String())
......@@ -434,7 +458,7 @@ func (action *Action) ReRegist(regist *dty.DposCandidatorRegist) (*types.Receipt
if err != nil || candInfo == nil {
logger.Info("ReRegist", "addr", action.fromaddr, "execaddr", action.execaddr, "candicator is not exist",
candInfo.String())
return nil, dty.ErrCandidatorExist
return nil, dty.ErrCandidatorNotExist
}
if candInfo.Status != dty.CandidatorStatusCancelRegist {
......@@ -443,24 +467,27 @@ func (action *Action) ReRegist(regist *dty.DposCandidatorRegist) (*types.Receipt
return nil, dty.ErrCandidatorInvalidStatus
}
if !action.CheckExecAccountBalance(action.fromaddr, dty.RegistFrozenCoins, 0) {
logger.Error("Regist failed", "addr", action.fromaddr, "execaddr", action.execaddr, "err", types.ErrNoBalance)
return nil, types.ErrNoBalance
}
acc := action.coinsAccount.LoadExecAccount(action.fromaddr, action.execaddr)
if acc.GetFrozen() < dty.RegistFrozenCoins {
if acc.GetBalance() + acc.GetFrozen() < dty.RegistFrozenCoins {
logger.Error("Regist failed", "addr", action.fromaddr, "execaddr", action.execaddr, "Balance", acc.GetBalance(), "Frozen", acc.GetFrozen(),"err", types.ErrNoBalance)
return nil, types.ErrNoBalance
}
receipt, err := action.coinsAccount.ExecFrozen(action.fromaddr, action.execaddr, dty.RegistFrozenCoins)
if err != nil {
logger.Error("ExecFrozen failed", "addr", action.fromaddr, "execaddr", action.execaddr, "amount", dty.RegistFrozenCoins, "err", err.Error())
return nil, err
receipt, err := action.coinsAccount.ExecFrozen(action.fromaddr, action.execaddr, dty.RegistFrozenCoins)
if err != nil {
logger.Error("ExecFrozen failed", "addr", action.fromaddr, "execaddr", action.execaddr, "amount", dty.RegistFrozenCoins, "err", err.Error())
return nil, err
}
logs = append(logs, receipt.Logs...)
kv = append(kv, receipt.KV...)
}
logs = append(logs, receipt.Logs...)
kv = append(kv, receipt.KV...)
logger.Info("Regist", "addr", action.fromaddr, "execaddr", action.execaddr, "new candicator",
regist.String())
logger.Info("Regist", "addr", action.fromaddr, "execaddr", action.execaddr, "new candicator", regist.String())
candInfo = action.newCandicatorInfo(regist)
candInfo.Status = dty.CandidatorStatusRegist
candInfo.Status = dty.CandidatorStatusReRegist
candInfo.PreStatus = dty.CandidatorStatusCancelRegist
candInfo.StartTime = action.blocktime
candInfo.StartHeight = action.mainHeight
candInfo.StartIndex = action.getIndex()
......@@ -523,6 +550,15 @@ func (action *Action) CancelRegist(req *dty.DposCandidatorCancelRegist) (*types.
kv = append(kv, receipt.KV...)
}
}
receipt, err := action.coinsAccount.ExecActive(action.fromaddr, action.execaddr, dty.RegistFrozenCoins)
if err != nil {
logger.Error("ExecActive failed", "addr", action.fromaddr, "execaddr", action.execaddr, "amount", dty.RegistFrozenCoins, "err", err.Error())
return nil, err
}
logs = append(logs, receipt.Logs...)
kv = append(kv, receipt.KV...)
candInfo.PreStatus = candInfo.Status
candInfo.Status = dty.CandidatorStatusCancelRegist
candInfo.PreIndex = candInfo.Index
......@@ -564,7 +600,7 @@ func (action *Action) Vote(vote *dty.DposVote) (*types.Receipt, error) {
candInfo.String())
statusChange := false
if candInfo.Status == dty.CandidatorStatusRegist {
if candInfo.Status == dty.CandidatorStatusRegist || candInfo.Status == dty.CandidatorStatusReRegist{
candInfo.PreStatus = candInfo.Status
candInfo.Status = dty.CandidatorStatusVoted
statusChange = true
......@@ -636,13 +672,13 @@ func (action *Action) CancelVote(vote *dty.DposCancelVote) (*types.Receipt, erro
enoughVotes := false
for _, voter := range candInfo.Voters {
if voter.FromAddr == action.fromaddr && bytes.Equal(voter.Pubkey, bPubkey){
if action.blocktime - voter.Time >= dty.VoteFrozenTime {
//if action.blocktime - voter.Time >= dty.VoteFrozenTime {
availVotes += voter.Votes
if availVotes >= votes {
enoughVotes = true
break
}
}
//}
}
}
if !enoughVotes {
......@@ -653,7 +689,7 @@ func (action *Action) CancelVote(vote *dty.DposCancelVote) (*types.Receipt, erro
for index, voter := range candInfo.Voters {
if voter.FromAddr == action.fromaddr && bytes.Equal(voter.Pubkey, bPubkey){
if action.blocktime - voter.Time >= 3 * 24 * 3600 {
//if action.blocktime - voter.Time >= dty.VoteFrozenTime {
if voter.Votes > votes {
voter.Votes -= votes
break
......@@ -664,7 +700,7 @@ func (action *Action) CancelVote(vote *dty.DposCancelVote) (*types.Receipt, erro
candInfo.Voters = append(candInfo.Voters[:index], candInfo.Voters[index+1:]...)
votes = votes - voter.Votes
}
}
//}
}
}
......
......@@ -84,6 +84,19 @@ func (d *DPos) updateCandVote(log *dty.ReceiptCandicator) (kvs []*types.KeyValue
}
kvs = append(kvs1, kvs2...)
} else if log.Status == dty.CandidatorStatusReRegist || log.Status == dty.CandidatorStatusCancelRegist{
candInfo := log.CandInfo
log.CandInfo = nil
err = canTable.Replace(candInfo)
if err != nil {
return nil, err
}
kvs, err = canTable.Save()
if err != nil {
return nil, err
}
}
return kvs, nil
......@@ -144,7 +157,7 @@ func (d *DPos) execLocal(receipt *types.ReceiptData) (*types.LocalDBSet, error)
}
for _, item := range receipt.Logs {
if item.Ty >= dty.CandidatorStatusRegist && item.Ty <= dty.CandidatorStatusCancelRegist {
if item.Ty >= dty.TyLogCandicatorRegist && item.Ty <= dty.TyLogCandicatorReRegist {
var candLog dty.ReceiptCandicator
err := types.Decode(item.Log, &candLog)
if err != nil {
......@@ -155,7 +168,7 @@ func (d *DPos) execLocal(receipt *types.ReceiptData) (*types.LocalDBSet, error)
return nil, err
}
dbSet.KV = append(dbSet.KV, kvs...)
} else if item.Ty >= dty.VrfStatusMRegist && item.Ty <= dty.VrfStatusRPRegist {
} else if item.Ty >= dty.TyLogVrfMRegist && item.Ty <= dty.TyLogVrfRPRegist {
var vrfLog dty.ReceiptVrf
err := types.Decode(item.Log, &vrfLog)
if err != nil {
......
......@@ -18,6 +18,7 @@ const (
CandidatorStatusVoted
CandidatorStatusCancelVoted
CandidatorStatusCancelRegist
CandidatorStatusReRegist
VrfStatusMRegist = iota + 1
VrfStatusRPRegist
......@@ -27,10 +28,11 @@ const (
const (
TyLogCandicatorRegist = 1001
TyLogCandicatorVoted = 1002
TyLogCandicatorCancelRegist = 1003
TyLogCandicatorReRegist = 1004
TyLogVrfMRegist = 1005
TyLogVrfRPRegist = 1006
TyLogCandicatorCancelVoted = 1003
TyLogCandicatorCancelRegist = 1004
TyLogCandicatorReRegist = 1005
TyLogVrfMRegist = 1006
TyLogVrfRPRegist = 1007
)
const (
......
......@@ -52,7 +52,9 @@ func (t *DPosType) GetLogMap() map[int64]*types.LogInfo {
return map[int64]*types.LogInfo{
TyLogCandicatorRegist: {Ty: reflect.TypeOf(ReceiptCandicator{}), Name: "TyLogCandicatorRegist"},
TyLogCandicatorVoted: {Ty: reflect.TypeOf(ReceiptCandicator{}), Name: "TyLogCandicatorVoted"},
TyLogCandicatorCancelVoted: {Ty: reflect.TypeOf(ReceiptCandicator{}), Name: "TyLogCandicatorCancelVoted"},
TyLogCandicatorCancelRegist: {Ty: reflect.TypeOf(ReceiptCandicator{}), Name: "TyLogCandicatorCancelRegist"},
TyLogCandicatorReRegist: {Ty: reflect.TypeOf(ReceiptCandicator{}), Name: "TyLogCandicatorReRegist"},
TyLogVrfMRegist: {Ty: reflect.TypeOf(ReceiptVrf{}), Name: "TyLogVrfMRegist"},
TyLogVrfRPRegist: {Ty: reflect.TypeOf(ReceiptVrf{}), Name: "TyLogVrfRPRegist"},
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment