Commit db26652c authored by caopingcp's avatar caopingcp Committed by vipwzw

add commands in valnode

parent def4afbb
...@@ -729,7 +729,8 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock) ...@@ -729,7 +729,8 @@ func (cs *ConsensusState) createProposalBlock() (block *ttypes.TendermintBlock)
// Mempool validated transactions // Mempool validated transactions
beg := time.Now() beg := time.Now()
pblock := cs.client.BuildBlock() pblock := cs.client.BuildBlock()
tendermintlog.Info(fmt.Sprintf("createProposalBlock BuildBlock. Current: %v/%v/%v", cs.Height, cs.Round, cs.Step), "txs-len", len(pblock.Txs), "cost", types.Since(beg)) tendermintlog.Info(fmt.Sprintf("createProposalBlock BuildBlock. Current: %v/%v/%v", cs.Height, cs.Round, cs.Step),
"txs-len", len(pblock.Txs), "cost", types.Since(beg))
if pblock.Height != cs.Height { if pblock.Height != cs.Height {
tendermintlog.Error("pblock.Height is not equal to cs.Height") tendermintlog.Error("pblock.Height is not equal to cs.Height")
...@@ -1097,6 +1098,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) { ...@@ -1097,6 +1098,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
valNodes, err := cs.client.QueryValidatorsByHeight(block.Header.Height) valNodes, err := cs.client.QueryValidatorsByHeight(block.Header.Height)
if err == nil && valNodes != nil { if err == nil && valNodes != nil {
if len(valNodes.Nodes) > 0 { if len(valNodes.Nodes) > 0 {
tendermintlog.Info("finalizeCommit validators of statecopy update", "update-valnodes", valNodes)
prevValSet := stateCopy.LastValidators.Copy() prevValSet := stateCopy.LastValidators.Copy()
nextValSet := prevValSet.Copy() nextValSet := prevValSet.Copy()
err := updateValidators(nextValSet, valNodes.Nodes) err := updateValidators(nextValSet, valNodes.Nodes)
...@@ -1107,10 +1109,9 @@ func (cs *ConsensusState) finalizeCommit(height int64) { ...@@ -1107,10 +1109,9 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
stateCopy.LastHeightValidatorsChanged = block.Header.Height + 1 stateCopy.LastHeightValidatorsChanged = block.Header.Height + 1
nextValSet.IncrementAccum(1) nextValSet.IncrementAccum(1)
stateCopy.Validators = nextValSet stateCopy.Validators = nextValSet
tendermintlog.Info("finalizeCommit validators of statecopy updated", "update-valnodes", valNodes)
} }
} }
tendermintlog.Debug("finalizeCommit real validators of statecopy", "validators", stateCopy.Validators) tendermintlog.Debug("finalizeCommit validators of statecopy", "validators", stateCopy.Validators)
// NewHeightStep! // NewHeightStep!
cs.updateToState(stateCopy) cs.updateToState(stateCopy)
...@@ -1383,7 +1384,7 @@ func (cs *ConsensusState) signVote(voteType byte, hash []byte) (*ttypes.Vote, er ...@@ -1383,7 +1384,7 @@ func (cs *ConsensusState) signVote(voteType byte, hash []byte) (*ttypes.Vote, er
} }
beg := time.Now() beg := time.Now()
err := cs.privValidator.SignVote(cs.state.ChainID, vote) err := cs.privValidator.SignVote(cs.state.ChainID, vote)
tendermintlog.Info("signVote", "height", cs.Height, "cost", types.Since(beg)) tendermintlog.Debug("signVote", "height", cs.Height, "cost", types.Since(beg))
return vote, err return vote, err
} }
......
...@@ -266,7 +266,7 @@ OuterLoop: ...@@ -266,7 +266,7 @@ OuterLoop:
} }
state = statetmp.Copy() state = statetmp.Copy()
} else { } else {
tendermintlog.Info("StartConsensus", "blockinfo", blockInfo) tendermintlog.Debug("StartConsensus", "blockinfo", blockInfo)
csState := blockInfo.GetState() csState := blockInfo.GetState()
if csState == nil { if csState == nil {
tendermintlog.Error("StartConsensus", "msg", "blockInfo.GetState is nil") tendermintlog.Error("StartConsensus", "msg", "blockInfo.GetState is nil")
...@@ -282,25 +282,24 @@ OuterLoop: ...@@ -282,25 +282,24 @@ OuterLoop:
} }
} }
tendermintlog.Info("load state finish", "state", state, "validators", state.Validators) tendermintlog.Debug("Load state finish", "state", state)
valNodes, err := client.QueryValidatorsByHeight(curHeight) valNodes, err := client.QueryValidatorsByHeight(curHeight)
if err == nil && valNodes != nil { if err == nil && valNodes != nil {
if len(valNodes.Nodes) > 0 { if len(valNodes.Nodes) > 0 {
tendermintlog.Info("StartConsensus validators update", "update-valnodes", valNodes)
prevValSet := state.LastValidators.Copy() prevValSet := state.LastValidators.Copy()
nextValSet := prevValSet.Copy() nextValSet := prevValSet.Copy()
err := updateValidators(nextValSet, valNodes.Nodes) err := updateValidators(nextValSet, valNodes.Nodes)
if err != nil { if err != nil {
tendermintlog.Error("Error changing validator set", "error", err) tendermintlog.Error("Error changing validator set", "error", err)
//return s, fmt.Errorf("Error changing validator set: %v", err)
} }
// change results from this height but only applies to the next height // change results from this height but only applies to the next height
state.LastHeightValidatorsChanged = curHeight + 1 state.LastHeightValidatorsChanged = curHeight + 1
nextValSet.IncrementAccum(1) nextValSet.IncrementAccum(1)
state.Validators = nextValSet state.Validators = nextValSet
tendermintlog.Info("StartConsensus validators updated", "update-valnodes", valNodes)
} }
} }
tendermintlog.Info("StartConsensus", "real validators", state.Validators) tendermintlog.Info("StartConsensus", "validators", state.Validators)
// Log whether this node is a validator or an observer // Log whether this node is a validator or an observer
if state.Validators.HasAddress(client.privValidator.GetAddress()) { if state.Validators.HasAddress(client.privValidator.GetAddress()) {
tendermintlog.Info("This node is a validator") tendermintlog.Info("This node is a validator")
...@@ -574,3 +573,38 @@ func (client *Client) LoadProposalBlock(height int64) *tmtypes.TendermintBlock { ...@@ -574,3 +573,38 @@ func (client *Client) LoadProposalBlock(height int64) *tmtypes.TendermintBlock {
} }
return proposalBlock return proposalBlock
} }
// Query_IsHealthy query whether consensus is sync
func (client *Client) Query_IsHealthy(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
isHealthy := false
if client.IsCaughtUp() && client.GetCurrentHeight() <= client.csState.GetRoundState().Height+1 {
isHealthy = true
}
return &tmtypes.IsHealthy{IsHealthy: isHealthy}, nil
}
// Query_NodeInfo query validator node info
func (client *Client) Query_NodeInfo(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
nodes := client.csState.GetRoundState().Validators.Validators
validators := make([]*tmtypes.Validator, 0)
for _, node := range nodes {
if node == nil {
validators = append(validators, &tmtypes.Validator{})
} else {
item := &tmtypes.Validator{
Address: node.Address,
PubKey: node.PubKey,
VotingPower: node.VotingPower,
Accum: node.Accum,
}
validators = append(validators, item)
}
}
return &tmtypes.ValidatorSet{Validators: validators, Proposer: &tmtypes.Validator{}}, nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package commands
import (
"encoding/hex"
"fmt"
"math/rand"
"os"
"time"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/address"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/rpc/jsonclient"
rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
vt "github.com/33cn/plugin/plugin/dapp/valnode/types"
"github.com/spf13/cobra"
)
// ValCmd valnode cmd register
func ValCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "valnode",
Short: "Construct valnode transactions",
Args: cobra.MinimumNArgs(1),
}
cmd.AddCommand(
IsSyncCmd(),
GetBlockInfoCmd(),
GetNodeInfoCmd(),
AddNodeCmd(),
)
return cmd
}
// IsSyncCmd query tendermint is sync
func IsSyncCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "is_sync",
Short: "Query tendermint consensus is sync",
Run: isSync,
}
return cmd
}
func isSync(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res bool
ctx := jsonclient.NewRPCCtx(rpcLaddr, "valnode.IsSync", nil, &res)
ctx.Run()
}
// GetNodeInfoCmd get validator nodes
func GetNodeInfoCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "nodes",
Short: "Get tendermint validator nodes",
Run: getNodeInfo,
}
return cmd
}
func getNodeInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res string
ctx := jsonclient.NewRPCCtx(rpcLaddr, "valnode.GetNodeInfo", nil, &res)
ctx.SetResultCb(parseNodeInfo)
ctx.Run()
}
func parseNodeInfo(arg interface{}) (interface{}, error) {
var result vt.ValidatorSet
res := arg.(*string)
data, err := hex.DecodeString(*res)
if err != nil {
return nil, err
}
err = types.Decode(data, &result)
if err != nil {
return nil, err
}
return result.Validators, nil
}
// GetBlockInfoCmd get block info
func GetBlockInfoCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "info",
Short: "Get tendermint consensus info",
Run: getBlockInfo,
}
addGetBlockInfoFlags(cmd)
return cmd
}
func addGetBlockInfoFlags(cmd *cobra.Command) {
cmd.Flags().Int64P("height", "t", 0, "block height (larger than 0)")
cmd.MarkFlagRequired("height")
}
func getBlockInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
height, _ := cmd.Flags().GetInt64("height")
req := &vt.ReqBlockInfo{
Height: height,
}
params := rpctypes.Query4Jrpc{
Execer: vt.ValNodeX,
FuncName: "GetBlockInfoByHeight",
Payload: types.MustPBToJSON(req),
}
var res vt.TendermintBlockInfo
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.Query", params, &res)
ctx.Run()
}
// AddNodeCmd add validator node
func AddNodeCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "add",
Short: "Add tendermint validator node",
Run: addNode,
}
addNodeFlags(cmd)
return cmd
}
func addNodeFlags(cmd *cobra.Command) {
cmd.Flags().StringP("pubkey", "p", "", "public key")
cmd.MarkFlagRequired("pubkey")
cmd.Flags().Int64P("power", "w", 0, "voting power")
cmd.MarkFlagRequired("power")
}
func addNode(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
pubkey, _ := cmd.Flags().GetString("pubkey")
power, _ := cmd.Flags().GetInt64("power")
pubkeybyte, err := hex.DecodeString(pubkey)
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
privkey, err := getprivkey()
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
value := &vt.ValNodeAction_Node{Node: &vt.ValNode{PubKey: pubkeybyte, Power: power}}
action := &vt.ValNodeAction{Value: value, Ty: vt.ValNodeActionUpdate}
tx := &types.Transaction{Execer: []byte(vt.ValNodeX), Payload: types.Encode(action), Fee: 0}
err = tx.SetRealFee(types.GInt("MinFee"))
if err != nil {
fmt.Fprintln(os.Stderr, err)
return
}
random := rand.New(rand.NewSource(time.Now().UnixNano()))
tx.Nonce = random.Int63()
tx.To = address.ExecAddress(vt.ValNodeX)
tx.Sign(types.SECP256K1, privkey)
txHex := types.Encode(tx)
data := hex.EncodeToString(txHex)
params := rpctypes.RawParm{
Data: data,
}
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.SendTransaction", params, nil)
ctx.RunWithoutMarshal()
}
func getprivkey() (crypto.PrivKey, error) {
key := "CC38546E9E659D15E6B4893F0AB32A06D103931A8230B0BDE71459D2B27D6944"
cr, err := crypto.New(types.GetSignName("", types.SECP256K1))
if err != nil {
return nil, err
}
bkey, err := common.FromHex(key)
if err != nil {
return nil, err
}
priv, err := cr.PrivKeyFromBytes(bkey)
if err != nil {
return nil, err
}
return priv, nil
}
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package executor package executor
import ( import (
"encoding/hex"
"errors" "errors"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
...@@ -20,7 +21,7 @@ func (val *ValNode) ExecLocal_Node(node *pty.ValNode, tx *types.Transaction, rec ...@@ -20,7 +21,7 @@ func (val *ValNode) ExecLocal_Node(node *pty.ValNode, tx *types.Transaction, rec
if node.GetPower() < 0 { if node.GetPower() < 0 {
return nil, errors.New("validator power must not be negative") return nil, errors.New("validator power must not be negative")
} }
clog.Info("update validator", "pubkey", node.GetPubKey(), "power", node.GetPower()) clog.Info("update validator", "pubkey", hex.EncodeToString(node.GetPubKey()), "power", node.GetPower())
key := CalcValNodeUpdateHeightIndexKey(val.GetHeight(), index) key := CalcValNodeUpdateHeightIndexKey(val.GetHeight(), index)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: types.Encode(node)}) set.KV = append(set.KV, &types.KeyValue{Key: key, Value: types.Encode(node)})
return set, nil return set, nil
......
...@@ -6,6 +6,8 @@ package valnode ...@@ -6,6 +6,8 @@ package valnode
import ( import (
"github.com/33cn/chain33/pluginmgr" "github.com/33cn/chain33/pluginmgr"
"github.com/33cn/plugin/plugin/dapp/valnode/rpc"
"github.com/33cn/plugin/plugin/dapp/valnode/commands"
"github.com/33cn/plugin/plugin/dapp/valnode/executor" "github.com/33cn/plugin/plugin/dapp/valnode/executor"
"github.com/33cn/plugin/plugin/dapp/valnode/types" "github.com/33cn/plugin/plugin/dapp/valnode/types"
) )
...@@ -15,7 +17,7 @@ func init() { ...@@ -15,7 +17,7 @@ func init() {
Name: types.ValNodeX, Name: types.ValNodeX,
ExecName: executor.GetName(), ExecName: executor.GetName(),
Exec: executor.Init, Exec: executor.Init,
Cmd: nil, Cmd: commands.ValCmd,
RPC: nil, RPC: rpc.Init,
}) })
} }
...@@ -186,3 +186,7 @@ message Heartbeat { ...@@ -186,3 +186,7 @@ message Heartbeat {
int32 sequence = 5; int32 sequence = 5;
bytes signature = 6; bytes signature = 6;
} }
message IsHealthy {
bool isHealthy = 1;
}
\ No newline at end of file
syntax = "proto3"; syntax = "proto3";
package types; package types;
import "common.proto";
import "tendermint.proto"; import "tendermint.proto";
message ValNode { message ValNode {
...@@ -27,3 +28,8 @@ message ReqNodeInfo { ...@@ -27,3 +28,8 @@ message ReqNodeInfo {
message ReqBlockInfo { message ReqBlockInfo {
int64 height = 1; int64 height = 1;
} }
service valnode {
rpc IsSync(ReqNil) returns (IsHealthy) {}
rpc GetNodeInfo(ReqNil) returns (ValidatorSet) {}
}
\ No newline at end of file
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"context"
"encoding/hex"
"github.com/33cn/chain33/types"
vt "github.com/33cn/plugin/plugin/dapp/valnode/types"
)
// IsSync query is sync
func (c *channelClient) IsSync(ctx context.Context, req *types.ReqNil) (*vt.IsHealthy, error) {
data, err := c.QueryConsensusFunc("tendermint", "IsHealthy", &types.ReqNil{})
if err != nil {
return nil, err
}
if resp, ok := data.(*vt.IsHealthy); ok {
return resp, nil
}
return nil, types.ErrDecode
}
// IsSync query is sync
func (c *Jrpc) IsSync(req *types.ReqNil, result *interface{}) error {
data, err := c.cli.IsSync(context.Background(), req)
if err != nil {
return err
}
*result = data.IsHealthy
return nil
}
// GetNodeInfo query block info
func (c *channelClient) GetNodeInfo(ctx context.Context, req *types.ReqNil) (*vt.ValidatorSet, error) {
data, err := c.QueryConsensusFunc("tendermint", "NodeInfo", &types.ReqNil{})
if err != nil {
return nil, err
}
if resp, ok := data.(*vt.ValidatorSet); ok {
return resp, nil
}
return nil, types.ErrDecode
}
// GetNodeInfo query block info
func (c *Jrpc) GetNodeInfo(req *types.ReqNil, result *interface{}) error {
data, err := c.cli.GetNodeInfo(context.Background(), req)
if err != nil {
return err
}
*result = hex.EncodeToString(types.Encode(data))
return nil
}
/*
* Copyright Fuzamei Corp. 2018 All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package rpc
//only load all plugin and system
import (
"encoding/hex"
"testing"
"github.com/33cn/chain33/client/mocks"
rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
vt "github.com/33cn/plugin/plugin/dapp/valnode/types"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)
func newGrpc(api *mocks.QueueProtocolAPI) *channelClient {
return &channelClient{
ChannelClient: rpctypes.ChannelClient{QueueProtocolAPI: api},
}
}
func newJrpc(api *mocks.QueueProtocolAPI) *Jrpc {
return &Jrpc{cli: newGrpc(api)}
}
func TestChannelClient_IsSync(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("valnode", nil, nil, nil)
req := &types.ReqNil{}
api.On("QueryConsensusFunc", "tendermint", "IsHealthy", req).Return(&vt.IsHealthy{IsHealthy: true}, nil)
result, err := client.IsSync(context.Background(), req)
assert.Nil(t, err)
assert.Equal(t, true, result.IsHealthy)
}
func TestJrpc_IsSync(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
J := newJrpc(api)
req := &types.ReqNil{}
var result interface{}
api.On("QueryConsensusFunc", "tendermint", "IsHealthy", req).Return(&vt.IsHealthy{IsHealthy: true}, nil)
err := J.IsSync(req, &result)
assert.Nil(t, err)
assert.Equal(t, true, result)
}
func TestChannelClient_GetNodeInfo(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("valnode", nil, nil, nil)
req := &types.ReqNil{}
node := &vt.Validator{
Address: []byte("aaa"),
PubKey: []byte("bbb"),
VotingPower: 10,
Accum: -1,
}
set := &vt.ValidatorSet{
Validators: []*vt.Validator{node},
Proposer: node,
}
api.On("QueryConsensusFunc", "tendermint", "NodeInfo", req).Return(set, nil)
result, err := client.GetNodeInfo(context.Background(), req)
assert.Nil(t, err)
assert.EqualValues(t, set, result)
}
func TestJrpc_GetNodeInfo(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
J := newJrpc(api)
req := &types.ReqNil{}
var result interface{}
node := &vt.Validator{
Address: []byte("aaa"),
PubKey: []byte("bbb"),
VotingPower: 10,
Accum: -1,
}
set := &vt.ValidatorSet{
Validators: []*vt.Validator{node},
Proposer: node,
}
api.On("QueryConsensusFunc", "tendermint", "NodeInfo", req).Return(set, nil)
err := J.GetNodeInfo(req, &result)
assert.Nil(t, err)
assert.EqualValues(t, hex.EncodeToString(types.Encode(set)), result)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"github.com/33cn/chain33/rpc/types"
vt "github.com/33cn/plugin/plugin/dapp/valnode/types"
)
// Jrpc valnode jrpc interface
type Jrpc struct {
cli *channelClient
}
// Grpc valnode Grpc interface
type Grpc struct {
*channelClient
}
type channelClient struct {
types.ChannelClient
}
// Init valnode rpc register
func Init(name string, s types.RPCServer) {
cli := &channelClient{}
grpc := &Grpc{channelClient: cli}
cli.Init(name, s, &Jrpc{cli: cli}, grpc)
vt.RegisterValnodeServer(s.GRPC(), grpc)
}
This diff is collapsed.
...@@ -17,6 +17,11 @@ func init() { ...@@ -17,6 +17,11 @@ func init() {
types.RegisterDappFork(ValNodeX, "Enable", 0) types.RegisterDappFork(ValNodeX, "Enable", 0)
} }
// GetExecName get exec name
func GetExecName() string {
return types.ExecName(ValNodeX)
}
// ValNodeType stuct // ValNodeType stuct
type ValNodeType struct { type ValNodeType struct {
types.ExecTypeBase types.ExecTypeBase
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment