Commit 2fef144d authored by mdj33's avatar mdj33 Committed by vipwzw

self consens stage by contract

parent 5dc2a303
......@@ -110,14 +110,13 @@ mainForkParacrossCommitTx=2270000
mainLoopCheckCommitTxDoneForkHeight=4320000
#精简commitMsg,删除多余的hash参数,缺省mainLoopCheckCommitTxDoneForkHeight一致,一些特殊的链如game需定制
rmCommitMsgHashParamMainHeight=4320000
#paraSelfConsInitDisable=false
#主链每隔几个没有相关平行链交易的区块,平行链上打包空区块,缺省从平行链blockHeight=0开始,依次增长,空块间隔不能为0
[[consensus.sub.para.emptyBlockInterval]]
blockHeight=0
interval=50
#平行链自共识开启对应的主链高度,必须大于等于MainForkParacrossCommitTx=2270000, -1 不开启,可以多个高度开启和关闭
[[consensus.sub.para.selfConsensusEnable]]
blockHeight=0
enable=false
#selfConsensEnablePreContract=["0-9848"]
[store]
......@@ -258,7 +257,7 @@ Enable=0
ForkParacrossWithdrawFromParachain=0
ForkParacrossCommitTx=0
ForkLoopCheckCommitTxDone=0
ForkConsensSupportJump=0
ForkParaSelfConsStages=0
[fork.sub.evm]
Enable=0
......
......@@ -71,11 +71,6 @@ type emptyBlockInterval struct {
Interval int64 `json:"interval,omitempty"`
}
type paraSelfConsEnable struct {
BlockHeight int64 `json:"blockHeight,omitempty"`
Enable bool `json:"enable,omitempty"`
}
type subConfig struct {
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
......@@ -86,7 +81,6 @@ type subConfig struct {
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
SelfConsensusEnable []*paraSelfConsEnable `json:"selfConsensusEnable,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
......@@ -123,20 +117,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
panic("para EmptyBlockInterval config not correct")
}
if len(subcfg.SelfConsensusEnable) == 0 {
selfEnable := &paraSelfConsEnable{Enable: false}
subcfg.SelfConsensusEnable = append(subcfg.SelfConsensusEnable, selfEnable)
}
if subcfg.SelfConsensusEnable[0].BlockHeight != 0 {
selfEnable := &paraSelfConsEnable{Enable: false}
subcfg.SelfConsensusEnable = append([]*paraSelfConsEnable{selfEnable}, subcfg.SelfConsensusEnable...)
}
err = checkSelfConsensEnable(subcfg.SelfConsensusEnable)
if err != nil {
panic("para selfConsensusEnable config not correct")
}
if subcfg.BatchFetchBlockCount <= 0 {
subcfg.BatchFetchBlockCount = types.MaxBlockCountPerTime
}
......@@ -172,7 +152,6 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
consensHeight: -2,
sendingHeight: -1,
consensDoneHeight: -1,
selfConsensMap: make(map[int64]bool),
resetCh: make(chan interface{}, 1),
quit: make(chan struct{}),
}
......@@ -184,13 +163,15 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para.commitMsgClient.waitConsensStopTimes = subcfg.WaitConsensStopTimes
}
para.commitMsgClient.setSelfConsensMap(subcfg.SelfConsensusEnable)
// 设置平行链共识起始高度,在共识高度为-1也就是从未共识过的环境中允许从设置的非0起始高度开始共识
//note:只有在主链LoopCheckCommitTxDoneForkHeight之后才支持设置ParaConsensStartHeight
if subcfg.ParaConsensStartHeight > 0 {
para.commitMsgClient.consensDoneHeight = subcfg.ParaConsensStartHeight - 1
}
err = para.commitMsgClient.setSelfConsEnable()
if err != nil {
panic(err)
}
para.blockSyncClient = &blockSyncClient{
paraClient: para,
......@@ -244,17 +225,6 @@ func checkEmptyBlockInterval(in []*emptyBlockInterval) error {
return nil
}
func checkSelfConsensEnable(in []*paraSelfConsEnable) error {
for i := 0; i < len(in); i++ {
if i > 0 && in[i].BlockHeight <= in[i-1].BlockHeight {
plog.Error("selfConsensEnable,blockHeight should be sequence", "preHeight", in[i-1].BlockHeight, "laterHeight", in[i].BlockHeight)
return types.ErrInvalidParam
}
}
return nil
}
//para 不检查任何的交易
func (client *client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
err := checkMinerTx(current)
......@@ -395,15 +365,6 @@ func (client *client) CreateGenesisTx() (ret []*types.Transaction) {
return
}
func (client *client) getSelfConsEnableStatus(height int64) *paraSelfConsEnable {
for i := len(client.subCfg.SelfConsensusEnable) - 1; i >= 0; i-- {
if height >= client.subCfg.SelfConsensusEnable[i].BlockHeight {
return client.subCfg.SelfConsensusEnable[i]
}
}
panic(fmt.Sprintf("para selfConsensusEnable not set for height=%d", height))
}
func (client *client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -175,7 +175,6 @@ func TestAddMinerTx(t *testing.T) {
para := &client{BaseClient: &drivers.BaseClient{}}
para.SetAPI(api)
para.subCfg = new(subConfig)
para.subCfg.SelfConsensusEnable = append(para.subCfg.SelfConsensusEnable, &paraSelfConsEnable{Enable: false})
para.privateKey = priKey
para.commitMsgClient = new(commitMsgClient)
para.commitMsgClient.paraClient = para
......@@ -286,37 +285,37 @@ func TestCheckEmptyInterval(t *testing.T) {
}
func TestCheckSelfConsensEnable(t *testing.T) {
int1 := &paraSelfConsEnable{BlockHeight: 0, Enable: false}
int2 := &paraSelfConsEnable{BlockHeight: 10, Enable: true}
int3 := &paraSelfConsEnable{BlockHeight: 5, Enable: false}
ints := []*paraSelfConsEnable{int1, int2, int3}
err := checkSelfConsensEnable(ints)
assert.Equal(t, types.ErrInvalidParam, err)
int3.BlockHeight = 15
err = checkSelfConsensEnable(ints)
assert.Nil(t, err)
}
func TestGetSelfConsEnableStatus(t *testing.T) {
int1 := &paraSelfConsEnable{BlockHeight: 0, Enable: false}
int2 := &paraSelfConsEnable{BlockHeight: 10, Enable: true}
int3 := &paraSelfConsEnable{BlockHeight: 15, Enable: false}
para := new(client)
para.subCfg = new(subConfig)
para.subCfg.SelfConsensusEnable = []*paraSelfConsEnable{int1, int2, int3}
selfConf := para.getSelfConsEnableStatus(5)
assert.Equal(t, int64(0), selfConf.BlockHeight)
assert.Equal(t, false, selfConf.Enable)
selfConf = para.getSelfConsEnableStatus(10)
assert.Equal(t, int64(10), selfConf.BlockHeight)
assert.Equal(t, true, selfConf.Enable)
selfConf = para.getSelfConsEnableStatus(16)
assert.Equal(t, int64(15), selfConf.BlockHeight)
assert.Equal(t, false, selfConf.Enable)
}
//func TestCheckSelfConsensEnable(t *testing.T) {
// int1 := &paraSelfConsEnable{BlockHeight: 0, Enable: false}
// int2 := &paraSelfConsEnable{BlockHeight: 10, Enable: true}
// int3 := &paraSelfConsEnable{BlockHeight: 5, Enable: false}
//
// ints := []*paraSelfConsEnable{int1, int2, int3}
// err := checkSelfConsensEnable(ints)
// assert.Equal(t, types.ErrInvalidParam, err)
//
// int3.BlockHeight = 15
// err = checkSelfConsensEnable(ints)
// assert.Nil(t, err)
//}
//func TestGetSelfConsEnableStatus(t *testing.T) {
// int1 := &paraSelfConsEnable{BlockHeight: 0, Enable: false}
// int2 := &paraSelfConsEnable{BlockHeight: 10, Enable: true}
// int3 := &paraSelfConsEnable{BlockHeight: 15, Enable: false}
//
// para := new(client)
// para.subCfg = new(subConfig)
// para.subCfg.SelfConsensusEnable = []*paraSelfConsEnable{int1, int2, int3}
// selfConf := para.getSelfConsEnableStatus(5)
// assert.Equal(t, int64(0), selfConf.BlockHeight)
// assert.Equal(t, false, selfConf.Enable)
//
// selfConf = para.getSelfConsEnableStatus(10)
// assert.Equal(t, int64(10), selfConf.BlockHeight)
// assert.Equal(t, true, selfConf.Enable)
//
// selfConf = para.getSelfConsEnableStatus(16)
// assert.Equal(t, int64(15), selfConf.BlockHeight)
// assert.Equal(t, false, selfConf.Enable)
//}
......@@ -17,6 +17,8 @@ import (
"sync"
"strconv"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
......@@ -33,6 +35,11 @@ const (
waitConsensStopTimes uint32 = 30 //30*10s = 5min
)
type paraSelfConsEnable struct {
startHeight int64
endHeight int64
}
type commitMsgClient struct {
paraClient *client
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
......@@ -45,12 +52,12 @@ type commitMsgClient struct {
sendingHeight int64
consensHeight int64
consensDoneHeight int64
selfConsensForked int32 //自共识比主链共识更高的异常场景,需要等待自共识<=主链共识再发送
selfConsensError int32 //自共识比主链共识更高的异常场景,需要等待自共识<=主链共识再发送
authAccountIn bool
isRollBack int32
checkTxCommitTimes int32
txFeeRate int64
selfConsensMap map[int64]bool //selfConsensEnable 分段 map
selfConsEnableList []*paraSelfConsEnable //适配在自共识合约配置前有自共识的平行链项目,fork之后,采用合约配置
privateKey crypto.PrivKey
quit chan struct{}
mutex sync.Mutex
......@@ -296,8 +303,8 @@ func (client *commitMsgClient) isSync() bool {
return false
}
if atomic.LoadInt32(&client.selfConsensForked) != 0 {
plog.Info("para is not Sync", "selfConsensForked", atomic.LoadInt32(&client.selfConsensForked))
if atomic.LoadInt32(&client.selfConsensError) != 0 {
plog.Info("para is not Sync", "selfConsensError", atomic.LoadInt32(&client.selfConsensError))
return false
}
......@@ -399,15 +406,25 @@ func (client *commitMsgClient) getTxsGroup(txsArr *types.Transactions) (*types.T
return newtx, nil
}
func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, int, error) {
var rawTxs types.Transactions
func (client *commitMsgClient) getExecName(commitHeight int64) string {
cfg := client.paraClient.GetAPI().GetConfig()
for _, status := range notifications {
if cfg.IsDappFork(commitHeight, pt.ParaX, pt.ForkParaSelfConsStages) {
return paracross.GetExecName()
}
execName := pt.ParaX
stat := client.paraClient.getSelfConsEnableStatus(status.Height)
if stat.Enable {
if client.isSelfConsEnable(commitHeight) {
execName = paracross.GetExecName(cfg)
}
return execName
}
func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, int, error) {
var rawTxs types.Transactions
cfg := client.paraClient.GetAPI().GetConfig()
for _, status := range notifications {
execName := client.getExecName(status.Height)
tx, err := paracross.CreateRawCommitTx4MainChain(cfg, status, execName, feeRate)
if err != nil {
plog.Error("para get commit tx", "block height", status.Height)
......@@ -426,10 +443,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, error) {
execName := pt.ParaX
cfg := client.paraClient.GetAPI().GetConfig()
stat := client.paraClient.getSelfConsEnableStatus(status.Height)
if stat.Enable {
execName = paracross.GetExecName(cfg)
}
execName := client.getExecName(status.Height)
tx, err := paracross.CreateRawCommitTx4MainChain(cfg, status, execName, feeRate)
if err != nil {
plog.Error("para get commit tx", "block height", status.Height)
......@@ -600,8 +614,6 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
return nil, nil
}
client.setConsensStart(ret)
//clear flag
for _, v := range ret {
v.NonCommitTxCounts = 0
......@@ -611,28 +623,6 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
}
// 主链共识可能从-1跃迁或者自共识可能分阶段开启场景,设置起始高度标志,主链只需要根据是否起始高度来允许跃迁
// 1. 主链ParaConsensStartHeight 在selfCons设置的Disable范围没问题,只有主链做共识,平行链收不到共识交易
// 2. 主链ParaConsensStartHeight 超过selfCons StartHeight也没问题, 自共识也从ParaConsensStartHeight 开始做自共识
func (client *commitMsgClient) setConsensStart(invs []*pt.ParacrossNodeStatus) {
for _, v := range invs {
//平行链自共识
v.IsStartHeight = client.selfConsensMap[v.Height]
//主链共识起始高度,只允许从-1跃迁,否则可能存在未共识的asset withdraw交易
if v.Height == client.paraClient.subCfg.ParaConsensStartHeight {
v.IsStartHeight = true
}
}
}
func (client *commitMsgClient) setSelfConsensMap(invs []*paraSelfConsEnable) {
for _, v := range invs {
client.selfConsensMap[v.BlockHeight] = v.Enable
}
}
func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus, error) {
var status pt.ParacrossNodeStatus
req := &types.ReqBlocks{Start: 0, End: 0}
......@@ -706,9 +696,9 @@ out:
//如果主链的共识高度小于自共识高度,需要等待自共识回滚
if mainStatus.Height < selfHeight {
atomic.StoreInt32(&client.selfConsensForked, 1)
atomic.StoreInt32(&client.selfConsensError, 1)
} else {
atomic.StoreInt32(&client.selfConsensForked, 0)
atomic.StoreInt32(&client.selfConsensError, 0)
}
preHeight := atomic.LoadInt64(&client.consensHeight)
......@@ -746,8 +736,24 @@ func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, er
return nil, err
}
cfg := client.paraClient.GetAPI().GetConfig()
selfCons := client.paraClient.getSelfConsEnableStatus(block.Height)
if selfCons.Enable {
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
FuncName: "GetSelfConsOneStage",
Param: types.Encode(&types.Int64{Data: block.Height}),
})
if err != nil {
plog.Error("getSelfConsensusStatus.GetSelfConsOneStage ", "err", err.Error())
return nil, err
}
stage, ok := ret.(*pt.SelfConsensStage)
if !ok {
plog.Error("getSelfConsensusStatus nok")
return nil, types.ErrInvalidParam
}
plog.Info("getSelfConsensusStatus ", "height", block.Height, "stageHeight", stage.BlockHeight, "enable", stage.Enable)
if stage.Enable == pt.ParaConfigYes {
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
......@@ -758,15 +764,14 @@ func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, er
plog.Error("getSelfConsensusStatus ", "err", err.Error())
return nil, err
}
status, ok := ret.(*pt.ParacrossStatus)
resp, ok := ret.(*pt.ParacrossStatus)
if !ok {
plog.Error("getSelfConsensusStatus ParacrossStatus nok")
return nil, err
}
//开启自共识后也要等到自共识真正切换之后再使用,如果本地区块已经过了自共识高度,但自共识的高度还没达成,就会导致共识机制出错
//本地共识高度一定要高于阶段共识高度的起始高度,为了适配平行链共识高度不连续场景,一定要设置新自共识起始高度高于当前主链共识高度,否则不生效
if status.Height >= selfCons.BlockHeight {
return status, nil
if resp.Height > stage.BlockHeight {
return resp, nil
}
}
return nil, types.ErrNotFound
......@@ -895,3 +900,34 @@ func (client *commitMsgClient) fetchPriKey() error {
plog.Info("para commit fetchPriKey success")
return nil
}
func (client *commitMsgClient) setSelfConsEnable() error {
var err error
selfEnables := types.Conf("config.consensus.sub.para").GStrList("selfConsensEnablePreContract")
for _, v := range selfEnables {
hs := strings.Split(v, "-")
enable := &paraSelfConsEnable{}
enable.startHeight, err = strconv.ParseInt(hs[0], 0, 64)
if err != nil {
plog.Error("para setSelfConsEnable", "v0", hs[0])
return err
}
enable.endHeight, err = strconv.ParseInt(hs[1], 0, 64)
if err != nil {
plog.Error("para setSelfConsEnable", "v1", hs[1])
return err
}
client.selfConsEnableList = append(client.selfConsEnableList, enable)
}
return nil
}
//适配在自共识合约配置前有自共识的平行链项目,fork之后,采用合约配置
func (client *commitMsgClient) isSelfConsEnable(height int64) bool {
for _, v := range client.selfConsEnableList {
if height >= v.startHeight && height <= v.endHeight {
return true
}
}
return false
}
......@@ -370,10 +370,10 @@ func (client *blockSyncClient) addMinerTx(preStateHash []byte, block *types.Bloc
status.PreStateHash = preStateHash
}
selfCons := client.paraClient.getSelfConsEnableStatus(status.Height)
tx, err := pt.CreateRawMinerTx(cfg, &pt.ParacrossMinerAction{
Status: status,
IsSelfConsensus: selfCons.Enable,
IsSelfConsensus: client.paraClient.commitMsgClient.isSelfConsEnable(status.Height),
})
if err != nil {
return err
......
......@@ -49,7 +49,6 @@ func initTestSyncBlock() {
func createParaTestInstance(t *testing.T, q queue.Queue) *client {
para := new(client)
para.subCfg = new(subConfig)
para.subCfg.SelfConsensusEnable = append(para.subCfg.SelfConsensusEnable, &paraSelfConsEnable{Enable: false})
baseCli := drivers.NewBaseClient(&types.Consensus{Name: "name"})
para.BaseClient = baseCli
......
......@@ -49,7 +49,6 @@ func TestCalcCommitMsgTxs(t *testing.T) {
para.SetAPI(api)
para.subCfg = new(subConfig)
para.subCfg.SelfConsensusEnable = append(para.subCfg.SelfConsensusEnable, &paraSelfConsEnable{Enable: true})
priKey := getPrivKey(t)
client := &commitMsgClient{
......@@ -91,7 +90,6 @@ func TestGetConsensusStatus(t *testing.T) {
para := &client{BaseClient: &drivers.BaseClient{}}
para.subCfg = new(subConfig)
para.subCfg.SelfConsensusEnable = append(para.subCfg.SelfConsensusEnable, &paraSelfConsEnable{Enable: true})
grpcClient := &typesmocks.Chain33Client{}
//grpcClient.On("GetFork", mock.Anything, &types.ReqKey{Key: []byte("ForkBlockHash")}).Return(&types.Int64{Data: 1}, errors.New("err")).Once()
para.grpcClient = grpcClient
......
......@@ -34,6 +34,7 @@ func ParcCmd() *cobra.Command {
CreateRawTransferToExecCmd(),
superNodeCmd(),
nodeGroupCmd(),
paraConfigCmd(),
GetParaInfoCmd(),
GetParaListCmd(),
IsSyncCmd(),
......@@ -485,6 +486,137 @@ func nodeList(cmd *cobra.Command, args []string) {
ctx.Run()
}
func addSelfConsStageCmdFlags(cmd *cobra.Command) {
cmd.Flags().Int64P("height", "g", 0, "height apply for self consensus enable or not ")
cmd.MarkFlagRequired("height")
cmd.Flags().Uint32P("enable", "e", 0, "if self consensus enable at height,1:enable,2:disable")
cmd.MarkFlagRequired("enable")
}
func selfConsStage(cmd *cobra.Command, args []string) {
height, _ := cmd.Flags().GetInt64("height")
enable, _ := cmd.Flags().GetUint32("enable")
var config pt.ParaStageConfig
config.OpTy = pt.ParaOpNewApply
config.Op = &pt.ParaStageConfig_Stage{Stage: &pt.SelfConsensStage{BlockHeight: height, Enable: enable}}
params := &rpctypes.CreateTxIn{
Execer: types.ExecName(pt.ParaX),
ActionName: "selfConsStageConfig",
Payload: types.MustPBToJSON(&config),
}
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.CreateTransaction", params, nil)
ctx.RunWithoutMarshal()
}
func selfConsStageCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "new",
Short: "apply for para chain's self consensus stages cmd",
Run: selfConsStage,
}
addSelfConsStageCmdFlags(cmd)
return cmd
}
func addVoteFlags(cmd *cobra.Command) {
cmd.Flags().StringP("id", "i", "", "operating target apply id")
cmd.MarkFlagRequired("id")
cmd.Flags().Uint32P("value", "v", 1, "vote value: 1:yes,2:no")
cmd.MarkFlagRequired("value")
}
func createVoteTx(cmd *cobra.Command, args []string) {
id, _ := cmd.Flags().GetString("id")
val, _ := cmd.Flags().GetUint32("value")
var config pt.ParaStageConfig
config.OpTy = pt.ParaOpVote
config.Op = &pt.ParaStageConfig_Vote{Vote: &pt.ConfigVoteInfo{Id: id, Value: val}}
params := &rpctypes.CreateTxIn{
Execer: types.ExecName(pt.ParaX),
ActionName: "selfConsStageConfig",
Payload: types.MustPBToJSON(&config),
}
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.CreateTransaction", params, nil)
ctx.RunWithoutMarshal()
}
func configVoteCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "vote",
Short: "vote for config cmd",
Run: createVoteTx,
}
addVoteFlags(cmd)
return cmd
}
func stageCancelTx(cmd *cobra.Command, args []string) {
id, _ := cmd.Flags().GetString("id")
var config pt.ParaStageConfig
config.OpTy = pt.ParaOpCancel
config.Op = &pt.ParaStageConfig_Cancel{Cancel: &pt.ConfigCancelInfo{Id: id}}
params := &rpctypes.CreateTxIn{
Execer: types.ExecName(pt.ParaX),
ActionName: "selfConsStageConfig",
Payload: types.MustPBToJSON(&config),
}
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
ctx := jsonclient.NewRPCCtx(rpcLaddr, "Chain33.CreateTransaction", params, nil)
ctx.RunWithoutMarshal()
}
func configCancelCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "cancel",
Short: "cancel for config cmd",
Run: stageCancelTx,
}
cmd.Flags().StringP("id", "i", "", "operating target apply id")
cmd.MarkFlagRequired("id")
return cmd
}
func paraStageConfigCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "stages",
Short: "self consensus stages config cmd",
}
cmd.AddCommand(selfConsStageCmd())
cmd.AddCommand(configVoteCmd())
cmd.AddCommand(configCancelCmd())
cmd.AddCommand(QuerySelfStagesCmd())
cmd.AddCommand(GetSelfConsStagesCmd())
cmd.AddCommand(GetSelfConsOneStageCmd())
return cmd
}
func paraConfigCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "config",
Short: "parachain config cmd",
}
cmd.AddCommand(paraStageConfigCmd())
return cmd
}
func nodeGroupCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "nodegroup",
......@@ -858,3 +990,85 @@ func nodeGroupList(cmd *cobra.Command, args []string) {
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.ListNodeGroupStatus", params, &res)
ctx.Run()
}
func stagesInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res pt.SelfConsensStages
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.GetSelfConsStages", nil, &res)
ctx.Run()
}
// GetParaInfoCmd get para chain status by height
func GetSelfConsStagesCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "all",
Short: "Get para chain self consensus stages",
Run: stagesInfo,
}
return cmd
}
func stageOneInfo(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res pt.SelfConsensStage
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.GetSelfConsOneStage", nil, &res)
ctx.Run()
}
// GetParaInfoCmd get para chain status by height
func GetSelfConsOneStageCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "one",
Short: "query para chain one self consensus stage",
Run: stageOneInfo,
}
cmd.Flags().Int64P("height", "g", 0, "height to para chain")
cmd.MarkFlagRequired("height")
return cmd
}
// QuerySelfStagesCmd 显示提案查询信息
func QuerySelfStagesCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "query",
Short: "show self consensus stage apply info",
Run: showSelfStages,
}
addShowSelfStagesflags(cmd)
return cmd
}
func addShowSelfStagesflags(cmd *cobra.Command) {
cmd.Flags().StringP("id", "q", "", "stage apply ID")
cmd.Flags().Uint32P("status", "s", 0, "status")
cmd.Flags().Int32P("count", "c", 1, "count, default is 1")
cmd.Flags().Int32P("direction", "d", 0, "direction, default is reserve")
cmd.Flags().Int64P("height", "t", -1, "height, default is -1")
cmd.Flags().Int32P("index", "i", -1, "index, default is -1")
}
func showSelfStages(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
id, _ := cmd.Flags().GetString("id")
status, _ := cmd.Flags().GetUint32("status")
count, _ := cmd.Flags().GetInt32("count")
direction, _ := cmd.Flags().GetInt32("direction")
height, _ := cmd.Flags().GetInt64("height")
index, _ := cmd.Flags().GetInt32("index")
params := &pt.ReqQuerySelfStages{
Status: status,
Id: id,
Count: count,
Direction: direction,
Height: height,
Index: index,
}
var res pt.ReplyQuerySelfStages
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.ListSelfStages", params, &res)
ctx.Run()
}
......@@ -318,6 +318,20 @@ func updateCommitAddrs(stat *pt.ParacrossHeightStatus, nodes map[string]struct{}
func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error) {
cfg := a.api.GetConfig()
var stage *pt.SelfConsensStage
if types.IsPara() && types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
_, stages, err := getSelfConsensStages(a.db)
if err != nil {
return &types.Receipt{}, nil
}
stage = getSelfConsOneStage(a.height, stages)
clog.Info("paracross.commit", "height", a.height, "stageHeight", stage.BlockHeight, "enable", stage.Enable)
if stage.Enable == pt.ParaConfigNo {
return &types.Receipt{}, nil
}
}
err := checkCommitInfo(cfg, commit)
if err != nil {
return nil, err
......@@ -456,11 +470,7 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
if commit.Status.Height > titleStatus.Height+1 {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
//平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始
allowJump, err := a.isAllowConsensJump(commit, titleStatus)
if err != nil {
return nil, err
}
if !allowJump {
if !a.isAllowConsensJump(stat, titleStatus, stage) {
return receipt, nil
}
}
......@@ -696,32 +706,27 @@ func (a *action) commitTxDoneByStat(stat *pt.ParacrossHeightStatus, titleStatus
}
//主链共识跳跃条件: 仅支持主链共识初始高度为-1,也就是没有共识过,共识过不允许再跳跃
func (a *action) isAllowMainConsensJump(commit *pt.ParacrossHeightStatus, titleStatus *pt.ParacrossStatus) (bool, error) {
func (a *action) isAllowMainConsensJump(commit *pt.ParacrossHeightStatus, titleStatus *pt.ParacrossStatus) bool {
cfg := a.api.GetConfig()
if cfg.IsDappFork(a.exec.GetMainHeight(), pt.ParaX, pt.ForkLoopCheckCommitTxDone) {
if titleStatus.Height == -1 {
return true, nil
return true
}
}
return false, nil
return false
}
//平行链自共识无缝切换条件:commit height为自共识分段起始高度
//分叉高度按平行链本身高度判断,而不是按主链高度,可以尽早支持isStartHeight标志,没共识过的平行链采用分段结构,可以从0开始,分叉高度也为0,
//共识过的平行链,由于之前按主链高度判断,创世0区块固定只在主链共识,平行链需要自动从1高度或更高高度过渡过来
func (a *action) isAllowParaConsensJump(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
if commit.Status.IsStartHeight || types.IsDappFork(a.height, pt.ParaX, pt.ForkConsensSupportJump) {
return commit.Status.IsStartHeight, nil
//平行链自共识无缝切换条件:1,平行链没有共识过,2:commit高度是大于自共识分叉高度且上一次共识的主链高度小于自共识分叉高度,保证只运行一次,
// 1. 分叉之前,开启过共识的平行链需要从1跳跃,没开启过的将使用新版本,从0开始发送,不用考虑从1跳跃的问题
// 2. 分叉之后,只有stage.blockHeight== commit.height,也就是stage起始高度时候允许跳跃
func (a *action) isAllowParaConsensJump(commit *pt.ParacrossHeightStatus, titleStatus *pt.ParacrossStatus, stage *pt.SelfConsensStage) bool {
if types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
return stage.BlockHeight == commit.Height
}
return a.isAllowParaConsensJumpOld(commit, titleStatus)
}
//平行链自共识无缝切换条件:1,平行链没有共识过,2:commit高度是大于自共识分叉高度且上一次共识的主链高度小于自共识分叉高度,保证只运行一次,
// 这样在主链没有共识空洞前提下,平行链允许有条件的共识跳跃
func (a *action) isAllowParaConsensJumpOld(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
return titleStatus.Height == -1, nil
//兼容分叉之前从1跳跃场景
return titleStatus.Height == -1
}
func (a *action) isAllowConsensJump(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
......@@ -943,9 +948,23 @@ func (a *action) Miner(miner *pt.ParacrossMinerAction) (*types.Receipt, error) {
logs = append(logs, log)
minerReceipt := &types.Receipt{Ty: types.ExecOk, KV: nil, Logs: logs}
isSelfConsensOn := miner.IsSelfConsensus
if types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
_, stages, err := getSelfConsensStages(a.db)
if err != nil && errors.Cause(err) != pt.ErrKeyNotExist {
return nil, err
}
if stages != nil {
stage := getSelfConsOneStage(miner.Status.Height, stages)
isSelfConsensOn = stage.Enable == pt.ParaConfigYes
} else {
isSelfConsensOn = false
}
}
//自共识后才挖矿
if miner.IsSelfConsensus {
if isSelfConsensOn {
//增发coins到paracross合约中,只处理发放,不做分配
totalReward := int64(0)
coinReward := cfg.MGInt("mver.consensus.paracross.coinReward", a.height)
......
......@@ -99,3 +99,9 @@ func (e *Paracross) Exec_NodeGroupConfig(payload *pt.ParaNodeGroupConfig, tx *ty
a := newAction(e, tx)
return a.NodeGroupConfig(payload)
}
//Exec_NodeGroupConfig node group config process
func (e *Paracross) Exec_SelfStageConfig(payload *pt.ParaStageConfig, tx *types.Transaction, index int) (*types.Receipt, error) {
a := newAction(e, tx)
return a.SelfStageConfig(payload)
}
......@@ -185,3 +185,8 @@ func (e *Paracross) ExecDelLocal_Withdraw(payload *types.AssetsWithdraw, tx *typ
func (e *Paracross) ExecDelLocal_TransferToExec(payload *types.AssetsTransferToExec, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return nil, nil
}
//ExecLocal_SelfConsensStageConfig transfer asset to exec local db process
func (e *Paracross) ExecDelLocal_SelfStageConfig(payload *pt.ParaStageConfig, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return e.execAutoDelLocal(tx, receiptData)
}
......@@ -285,3 +285,8 @@ func (e *Paracross) ExecLocal_Withdraw(payload *types.AssetsWithdraw, tx *types.
func (e *Paracross) ExecLocal_TransferToExec(payload *types.AssetsTransferToExec, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return nil, nil
}
//ExecLocal_SelfConsensStageConfig transfer asset to exec local db process
func (e *Paracross) ExecLocal_SelfStageConfig(payload *pt.ParaStageConfig, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
return e.execAutoLocalStage(tx, receiptData, index)
}
......@@ -32,6 +32,9 @@ var (
localNodeTitleStatus string
localNodeTitleDone string
localNodeGroupStatusTitle string
paraSelfConsensStages string
paraSelfConsensStageIDPrefix string
)
func setPrefix() {
......@@ -43,6 +46,10 @@ func setPrefix() {
paraNodeGroupStatusAddrs = "mavl-paracross-nodegroup-apply-title-"
paraNodeIDPrefix = "mavl-paracross-title-nodeid-"
paraNodeGroupIDPrefix = "mavl-paracross-title-nodegroupid-"
paraSelfConsensStages = "mavl-paracross-selfconsens-stages-"
paraSelfConsensStageIDPrefix = "mavl-paracross-selfconsens-id-"
localTx = "LODB-paracross-titleHeightAddr-"
localTitle = "LODB-paracross-title-"
localTitleHeight = "LODB-paracross-titleHeight-"
......@@ -92,16 +99,24 @@ func calcParaNodeGroupIDKey(title, hash string) string {
return fmt.Sprintf(paraNodeGroupIDPrefix+"%s-%s", title, hash)
}
func calcParaSelfConsStagesKey() []byte {
return []byte(fmt.Sprintf(paraSelfConsensStages))
}
func calcParaSelfConsensStageIDKey(hash string) string {
return fmt.Sprintf(paraSelfConsensStageIDPrefix+"%s", hash)
}
func getParaNodeIDSuffix(id string) string {
if !strings.HasPrefix(id, paraNodeIDUnifyPrefix) {
return id
}
ids := strings.Split(id, "-")
txID := ids[len(ids)-1]
if strings.HasPrefix(txID, "0x") {
ok, txID, ids := getRealTxHashID(id)
if ok {
return txID
}
//对于nodegroup 创建的"mavl-paracross-title-nodegroupid-user.p.para.-0xb6cd0274587...a61e444e9f848a4c02d7b-1"特殊场景
if len(ids) > 1 {
txID = ids[len(ids)-2] + "-" + txID
......@@ -112,6 +127,15 @@ func getParaNodeIDSuffix(id string) string {
return id
}
func getRealTxHashID(id string) (bool, string, []string) {
ids := strings.Split(id, "-")
txID := ids[len(ids)-1]
if strings.HasPrefix(txID, "0x") {
return true, txID, ids
}
return false, txID, ids
}
func calcLocalTxKey(title string, height int64, addr string) []byte {
return []byte(fmt.Sprintf(localTx+"%s-%012-%s", title, height, addr))
}
......
......@@ -408,3 +408,29 @@ func (p *Paracross) paracrossGetAssetTxResult(hash []byte) (types.Message, error
return &result, nil
}
//Query_GetSelfConsStages get self consensus stages configed
func (p *Paracross) Query_GetSelfConsStages(in *types.ReqNil) (types.Message, error) {
_, stages, err := getSelfConsensStages(p.GetStateDB())
if err != nil {
return nil, errors.Cause(err)
}
return stages, nil
}
//Query_GetSelfConsOneStage get self consensus one stage
func (p *Paracross) Query_GetSelfConsOneStage(in *types.Int64) (types.Message, error) {
_, stages, err := getSelfConsensStages(p.GetStateDB())
if err != nil {
return nil, errors.Cause(err)
}
stage := getSelfConsOneStage(in.Data, stages)
return stage, nil
}
// Query_ListSelfStages 批量查询
func (p *Paracross) Query_ListSelfStages(in *pt.ReqQuerySelfStages) (types.Message, error) {
return p.listSelfStages(in)
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package executor
import (
"github.com/33cn/chain33/common"
dbm "github.com/33cn/chain33/common/db"
"sort"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/pkg/errors"
)
func getStageID(db dbm.KV, id string) (*pt.SelfConsensStageInfo, error) {
realID := calcParaSelfConsensStageIDKey(id)
val, err := getDb(db, []byte(realID))
if err != nil {
return nil, err
}
var status pt.SelfConsensStageInfo
err = types.Decode(val, &status)
return &status, err
}
func makeStageConfigReceipt(prev, current *pt.SelfConsensStageInfo) *types.Receipt {
log := &pt.ReceiptSelfConsStageConfig{
Prev: prev,
Current: current,
}
return &types.Receipt{
Ty: types.ExecOk,
KV: []*types.KeyValue{
{Key: []byte(current.Id), Value: types.Encode(current)},
},
Logs: []*types.ReceiptLog{
{
Ty: pt.TyLogParaSelfConsStageConfig,
Log: types.Encode(log),
},
},
}
}
func makeStageVoteDoneReceipt(config *pt.SelfConsensStage, totalCount, commitCount, most int, pass string) *types.Receipt {
log := &pt.ReceiptSelfConsStageVoteDone{
Stage: config,
TotalNodes: int32(totalCount),
TotalVote: int32(commitCount),
MostVote: int32(most),
VoteRst: pass,
}
return &types.Receipt{
Ty: types.ExecOk,
KV: nil,
Logs: []*types.ReceiptLog{
{
Ty: pt.TyLogParaNodeVoteDone,
Log: types.Encode(log),
},
},
}
}
func makeStageGroupReceipt(prev, current *pt.SelfConsensStages) *types.Receipt {
key := calcParaSelfConsStagesKey()
log := &pt.ReceiptSelfConsStagesUpdate{Prev: prev, Current: current}
return &types.Receipt{
Ty: types.ExecOk,
KV: []*types.KeyValue{
{Key: key, Value: types.Encode(current)},
},
Logs: []*types.ReceiptLog{
{
Ty: pt.TyLogParaStageGroupUpdate,
Log: types.Encode(log),
},
},
}
}
func getSelfConsensStages(db dbm.KV) (map[int64]uint32, *pt.SelfConsensStages, error) {
key := calcParaSelfConsStagesKey()
item, err := db.Get(key)
if err != nil {
if isNotFound(err) {
err = pt.ErrKeyNotExist
}
return nil, nil, errors.Wrapf(err, "getSelfConsensStages key:%s", string(key))
}
var config pt.SelfConsensStages
err = types.Decode(item, &config)
if err != nil {
return nil, nil, errors.Wrap(err, "getSelfConsensStages decode config")
}
stagesMap := make(map[int64]uint32)
for _, v := range config.Items {
stagesMap[v.BlockHeight] = v.Enable
}
return stagesMap, &config, nil
}
func updateStages(db dbm.KV, stage *pt.SelfConsensStage) (*types.Receipt, error) {
_, stages, err := getSelfConsensStages(db)
if err != nil {
return nil, err
}
oldStages := *stages
stages.Items = append(stages.Items, stage)
sort.Slice(stages.Items, func(i, j int) bool { return stages.Items[i].BlockHeight < stages.Items[j].BlockHeight })
return makeStageGroupReceipt(&oldStages, stages), nil
}
func selfConsensInitStage(db dbm.KV) *types.Receipt {
close := types.IsEnable("consensus.sub.para.paraSelfConsInitDisable")
stage := &pt.SelfConsensStage{BlockHeight: 0, Enable: pt.ParaConfigYes}
if close {
stage.Enable = pt.ParaConfigNo
}
stages := &pt.SelfConsensStages{Items: []*pt.SelfConsensStage{stage}}
return makeStageGroupReceipt(nil, stages)
}
func getSelfConsOneStage(height int64, stages *pt.SelfConsensStages) *pt.SelfConsensStage {
for i := len(stages.Items) - 1; i >= 0; i-- {
if height >= stages.Items[i].BlockHeight {
return stages.Items[i]
}
}
clog.Error("getSelfConsOneStage height not hit", "height", height)
return &pt.SelfConsensStage{Enable: pt.ParaConfigNo}
}
func (a *action) checkValidStage(config *pt.SelfConsensStage) error {
//1. 设置高度必须大于当前区块高度
if config.BlockHeight <= a.height {
return errors.Wrapf(pt.ErrHeightHasPast, "checkValidStage config height:%d less than block height:%d", config.BlockHeight, a.height)
}
//2. 如果已经设置到stages中,简单起见,就不能更改了,应该也不会有很大影响
stages, _, err := getSelfConsensStages(a.db)
if err != nil {
return errors.Wrapf(err, "checkValidStage get stages")
}
if _, exist := stages[config.BlockHeight]; exist {
return errors.Wrapf(err, "checkValidStage config height:%d existed", config.BlockHeight)
}
//3. enable check
if config.Enable != pt.ParaConfigYes && config.Enable != pt.ParaConfigNo {
return errors.Wrapf(err, "checkValidStage config enable:%d not support", config.Enable)
}
return nil
}
func (a *action) stageApply(stage *pt.SelfConsensStage) (*types.Receipt, error) {
err := a.checkValidStage(stage)
if err != nil {
return nil, err
}
stat := &pt.SelfConsensStageInfo{
Id: calcParaSelfConsensStageIDKey(common.ToHex(a.txhash)),
Status: pt.ParaApplyJoining,
Stage: stage,
FromAddr: a.fromaddr,
ExecHeight: a.height}
return makeStageConfigReceipt(nil, stat), nil
}
func (a *action) stageCancel(config *pt.ConfigCancelInfo) (*types.Receipt, error) {
stat, err := getStageID(a.db, config.Id)
if err != nil {
return nil, err
}
//只能提案发起人撤销
if a.fromaddr != stat.FromAddr {
return nil, errors.Wrapf(types.ErrNotAllow, "stage id create by:%s,not by:%s", stat.FromAddr, a.fromaddr)
}
if stat.Status != pt.ParaApplyJoining {
return nil, errors.Wrapf(pt.ErrParaNodeOpStatusWrong, "stage config id:%s,status:%d", config.Id, stat.Status)
}
var copyStat pt.SelfConsensStageInfo
err = deepCopy(&copyStat, stat)
if err != nil {
clog.Error("selfConsensQuit deep copy fail", "copy", copyStat, "stat", stat)
return nil, err
}
stat.Status = pt.ParaApplyCanceled
stat.ExecHeight = a.height
return makeStageConfigReceipt(&copyStat, stat), nil
}
func (a *action) stageVote(config *pt.ConfigVoteInfo) (*types.Receipt, error) {
nodes, _, err := getParacrossNodes(a.db, types.GetTitle())
if err != nil {
return nil, errors.Wrapf(err, "getNodes for title:%s", types.GetTitle())
}
if !validNode(a.fromaddr, nodes) {
return nil, errors.Wrapf(pt.ErrNodeNotForTheTitle, "not validNode:%s", a.fromaddr)
}
stat, err := getStageID(a.db, config.Id)
if err != nil {
return nil, err
}
if stat.Status != pt.ParaApplyJoining {
return nil, errors.Wrapf(pt.ErrParaNodeOpStatusWrong, "config id:%s,status:%d", config.Id, stat.Status)
}
//stage blockHeight 也不能小于当前vote tx height,不然没有意义
err = a.checkValidStage(stat.Stage)
if err != nil {
return nil, err
}
var copyStat pt.SelfConsensStageInfo
err = deepCopy(&copyStat, stat)
if err != nil {
clog.Error("selfConsensVote deep copy fail", "copy", copyStat, "stat", stat)
return nil, err
}
if stat.Votes == nil {
stat.Votes = &pt.ParaNodeVoteDetail{}
}
found, index := hasVoted(stat.Votes.Addrs, a.fromaddr)
if found {
stat.Votes.Votes[index] = pt.ParaNodeVoteStr[config.Value]
} else {
stat.Votes.Addrs = append(stat.Votes.Addrs, a.fromaddr)
stat.Votes.Votes = append(stat.Votes.Votes, pt.ParaNodeVoteStr[config.Value])
}
//剔除已退出nodegroup的addr的投票
updateVotes(stat.Votes, nodes)
most, vote := getMostVote(stat.Votes)
if !isCommitDone(nodes, most) {
return makeStageConfigReceipt(&copyStat, stat), nil
}
clog.Info("paracross.nodeVote ----pass", "most", most, "pass", vote)
receipt := &types.Receipt{Ty: types.ExecOk}
if vote == pt.ParaVoteYes {
r, err := updateStages(a.db, stat.Stage)
if err != nil {
return nil, err
}
receipt = mergeReceipt(receipt, r)
}
stat.Status = pt.ParaApplyClosed
stat.ExecHeight = a.height
r := makeStageConfigReceipt(&copyStat, stat)
receipt = mergeReceipt(receipt, r)
r = makeStageVoteDoneReceipt(stat.Stage, len(nodes), len(stat.Votes.Addrs), most, pt.ParaNodeVoteStr[vote])
receipt = mergeReceipt(receipt, r)
return receipt, nil
}
//SelfConsensStageConfig support self consens stage config
func (a *action) SelfStageConfig(config *pt.ParaStageConfig) (*types.Receipt, error) {
if config.OpTy == pt.ParaOpNewApply {
return a.stageApply(config.GetStage())
} else if config.OpTy == pt.ParaOpQuit {
return a.stageCancel(config.GetCancel())
} else if config.OpTy == pt.ParaOpVote {
return a.stageVote(config.GetVote())
}
return nil, pt.ErrParaUnSupportNodeOper
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package executor
import (
"github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
func (e *Paracross) execAutoLocalStage(tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
set, err := e.execLocalStage(receiptData, index)
if err != nil {
return set, err
}
dbSet := &types.LocalDBSet{}
dbSet.KV = e.AddRollbackKV(tx, tx.Execer, set.KV)
return dbSet, nil
}
func (e *Paracross) execLocalStage(receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
table := NewStageTable(e.GetLocalDB())
txIndex := dapp.HeightIndexStr(e.GetHeight(), int64(index))
for _, log := range receiptData.Logs {
switch log.Ty {
case pt.TyLogParaSelfConsStageConfig:
{
var receipt pt.ReceiptSelfConsStageConfig
err := types.Decode(log.Log, &receipt)
if err != nil {
return nil, err
}
r := &pt.LocalSelfConsStageInfo{
Stage: receipt.Current,
TxIndex: txIndex,
}
err = table.Replace(r)
if err != nil {
return nil, err
}
}
default:
break
}
}
kvs, err := table.Save()
if err != nil {
return nil, err
}
dbSet := &types.LocalDBSet{}
dbSet.KV = append(dbSet.KV, kvs...)
return dbSet, nil
}
func (e *Paracross) execAutoDelLocal(tx *types.Transaction, receiptData *types.ReceiptData) (*types.LocalDBSet, error) {
kvs, err := e.DelRollbackKV(tx, tx.Execer)
if err != nil {
return nil, err
}
dbSet := &types.LocalDBSet{}
dbSet.KV = append(dbSet.KV, kvs...)
return dbSet, nil
}
func (e *Paracross) listSelfStages(req *pt.ReqQuerySelfStages) (types.Message, error) {
if req == nil {
return nil, types.ErrInvalidParam
}
localDb := e.GetLocalDB()
query := NewStageTable(localDb).GetQuery(localDb)
var primary []byte
if req.Height > 0 {
primary = []byte(dapp.HeightIndexStr(req.Height, int64(req.Index)))
}
indexName := ""
if req.Status > 0 {
indexName = "status"
} else if req.Id != "" {
indexName = "id"
}
cur := &StageRow{
LocalSelfConsStageInfo: &pt.LocalSelfConsStageInfo{},
}
cur.Stage.Status = req.Status
cur.Stage.Id = req.Id
prefix, err := cur.Get(indexName)
if err != nil {
clog.Error("Get", "indexName", indexName, "err", err)
return nil, err
}
rows, err := query.ListIndex(indexName, prefix, primary, req.Count, req.Direction)
if err != nil {
clog.Error("query List failed", "indexName", indexName, "prefix", "prefix", "key", string(primary), "err", err)
return nil, err
}
if len(rows) == 0 {
return nil, types.ErrNotFound
}
var rep pt.ReplyQuerySelfStages
for _, row := range rows {
r, ok := row.Data.(*pt.LocalSelfConsStageInfo)
if !ok {
clog.Error("listProposalProject", "err", "bad row type")
return nil, types.ErrDecode
}
ok, txID, _ := getRealTxHashID(r.Stage.Id)
if ok {
r.Stage.Id = txID
}
rep.StageInfo = append(rep.StageInfo, r.Stage)
}
return &rep, nil
}
package executor
import (
"fmt"
"github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/common/db/table"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
/*
table struct
data: self consens stage
index: status
*/
var boardOpt = &table.Option{
Prefix: "LODB-paracross",
Name: "stage",
Primary: "heightindex",
Index: []string{"id", "status"},
}
//NewBoardTable 新建表
func NewStageTable(kvdb db.KV) *table.Table {
rowmeta := NewStageRow()
table, err := table.NewTable(rowmeta, kvdb, boardOpt)
if err != nil {
panic(err)
}
return table
}
//StageRow table meta 结构
type StageRow struct {
*pt.LocalSelfConsStageInfo
}
//NewStageRow 新建一个meta 结构
func NewStageRow() *StageRow {
return &StageRow{LocalSelfConsStageInfo: nil}
}
//CreateRow 新建数据行(注意index 数据一定也要保存到数据中,不能就保存heightindex)
func (r *StageRow) CreateRow() *table.Row {
return &table.Row{Data: &pt.LocalSelfConsStageInfo{}}
}
//SetPayload 设置数据
func (r *StageRow) SetPayload(data types.Message) error {
if d, ok := data.(*pt.LocalSelfConsStageInfo); ok {
r.LocalSelfConsStageInfo = d
return nil
}
return types.ErrTypeAsset
}
//Get 按照indexName 查询 indexValue
func (r *StageRow) Get(key string) ([]byte, error) {
if key == "heightindex" {
return []byte(r.TxIndex), nil
} else if key == "id" {
return []byte(r.Stage.Id), nil
} else if key == "status" {
return []byte(fmt.Sprintf("%2d", r.Stage.Status)), nil
}
return nil, types.ErrNotFound
}
......@@ -264,12 +264,12 @@ func (a *action) nodeJoin(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
if err != nil && !isNotFound(err) {
return nil, errors.Wrapf(err, "nodeJoin get title=%s,nodeAddr=%s", config.Title, config.Addr)
}
if addrStat != nil && addrStat.Status != pt.ParacrossNodeQuited {
if addrStat != nil && addrStat.Status != pt.ParaApplyQuited {
return nil, errors.Wrapf(pt.ErrParaNodeAddrExisted, "nodeJoin nodeAddr existed:%s,status:%d", config.Addr, addrStat.Status)
}
stat := &pt.ParaNodeIdStatus{
Id: calcParaNodeIDKey(config.Title, common.ToHex(a.txhash)),
Status: pt.ParacrossNodeJoining,
Status: pt.ParaApplyJoining,
Title: config.Title,
TargetAddr: config.Addr,
FromAddr: a.fromaddr,
......@@ -296,13 +296,13 @@ func (a *action) nodeQuit(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
if err != nil {
return nil, errors.Wrapf(err, "nodeAddr:%s get error", config.Addr)
}
if addrStat.Status != pt.ParacrossNodeJoined {
if addrStat.Status != pt.ParaApplyJoined {
return nil, errors.Wrapf(pt.ErrParaNodeAddrNotExisted, "nodeAddr:%s status:%d", config.Addr, addrStat.Status)
}
stat := &pt.ParaNodeIdStatus{
Id: calcParaNodeIDKey(config.Title, common.ToHex(a.txhash)),
Status: pt.ParacrossNodeQuiting,
Status: pt.ParaApplyQuiting,
Title: config.Title,
TargetAddr: config.Addr,
FromAddr: a.fromaddr,
......@@ -328,7 +328,7 @@ func (a *action) nodeCancel(config *pt.ParaNodeAddrConfig) (*types.Receipt, erro
return nil, errors.Wrapf(pt.ErrNodeNotForTheTitle, "config title:%s,id title:%s", config.Title, stat.Title)
}
if stat.Status != pt.ParacrossNodeJoining && stat.Status != pt.ParacrossNodeQuiting {
if stat.Status != pt.ParaApplyJoining && stat.Status != pt.ParaApplyQuiting {
return nil, errors.Wrapf(pt.ErrParaNodeOpStatusWrong, "config id:%s,status:%d", config.Id, stat.Status)
}
......@@ -338,7 +338,7 @@ func (a *action) nodeCancel(config *pt.ParaNodeAddrConfig) (*types.Receipt, erro
clog.Error("nodeaccount.nodeQuit deep copy fail", "copy", copyStat, "stat", stat)
return nil, err
}
if stat.Status == pt.ParacrossNodeJoining {
if stat.Status == pt.ParaApplyJoining {
receipt := &types.Receipt{Ty: types.ExecOk}
cfg := a.api.GetConfig()
if !cfg.IsPara() {
......@@ -348,15 +348,15 @@ func (a *action) nodeCancel(config *pt.ParaNodeAddrConfig) (*types.Receipt, erro
}
receipt = mergeReceipt(receipt, r)
}
stat.Status = pt.ParacrossNodeCanceled
stat.Status = pt.ParaApplyCanceled
stat.Height = a.height
r := makeNodeConfigReceipt(a.fromaddr, config, &copyStat, stat)
receipt = mergeReceipt(receipt, r)
return receipt, nil
}
if stat.Status == pt.ParacrossNodeQuiting {
stat.Status = pt.ParacrossNodeCanceled
if stat.Status == pt.ParaApplyQuiting {
stat.Status = pt.ParaApplyCanceled
stat.Height = a.height
return makeNodeConfigReceipt(a.fromaddr, config, &copyStat, stat), nil
}
......@@ -376,19 +376,19 @@ func isSuperManager(cfg *types.Chain33Config, addr string) bool {
return false
}
func getMostVote(stat *pt.ParaNodeIdStatus) (int, int) {
func getMostVote(stat *pt.ParaNodeVoteDetail) (int, int) {
var ok, nok int
for _, v := range stat.GetVotes().Votes {
if v == pt.ParaNodeVoteStr[pt.ParaNodeVoteYes] {
for _, v := range stat.Votes {
if v == pt.ParaNodeVoteStr[pt.ParaVoteYes] {
ok++
} else {
nok++
}
}
if ok > nok {
return ok, pt.ParaNodeVoteYes
return ok, pt.ParaVoteYes
}
return nok, pt.ParaNodeVoteNo
return nok, pt.ParaVoteNo
}
......@@ -432,17 +432,18 @@ func (a *action) superManagerVoteProc(title string) error {
return nil
}
func updateVotes(stat *pt.ParaNodeIdStatus, nodes map[string]struct{}) {
func updateVotes(stat *pt.ParaNodeVoteDetail, nodes map[string]struct{}) {
votes := &pt.ParaNodeVoteDetail{}
for i, addr := range stat.Votes.Addrs {
for i, addr := range stat.Addrs {
if _, ok := nodes[addr]; ok {
votes.Addrs = append(votes.Addrs, addr)
votes.Votes = append(votes.Votes, stat.Votes.Votes[i])
votes.Votes = append(votes.Votes, stat.Votes[i])
}
}
stat.Votes = votes
stat = votes
}
//由于propasal id 和quit id分开,quit id不知道对应addr proposal id的coinfrozen信息,需要维护一个围绕addr的数据库结构信息
func (a *action) updateNodeAddrStatus(stat *pt.ParaNodeIdStatus) (*types.Receipt, error) {
cfg := a.api.GetConfig()
addrStat, err := getNodeAddr(a.db, stat.Title, stat.TargetAddr)
......@@ -453,27 +454,27 @@ func (a *action) updateNodeAddrStatus(stat *pt.ParaNodeIdStatus) (*types.Receipt
addrStat = &pt.ParaNodeAddrIdStatus{}
addrStat.Title = stat.Title
addrStat.Addr = stat.TargetAddr
addrStat.Status = pt.ParacrossNodeJoined
addrStat.Status = pt.ParaApplyJoined
addrStat.ProposalId = stat.Id
addrStat.QuitId = ""
return makeParaNodeStatusReceipt(a.fromaddr, nil, addrStat), nil
}
preStat := *addrStat
if stat.Status == pt.ParacrossNodeJoining {
addrStat.Status = pt.ParacrossNodeJoined
if stat.Status == pt.ParaApplyJoining {
addrStat.Status = pt.ParaApplyJoined
addrStat.ProposalId = stat.Id
addrStat.QuitId = ""
return makeParaNodeStatusReceipt(a.fromaddr, &preStat, addrStat), nil
}
if stat.Status == pt.ParacrossNodeQuiting {
if stat.Status == pt.ParaApplyQuiting {
proposalStat, err := getNodeID(a.db, addrStat.ProposalId)
if err != nil {
return nil, errors.Wrapf(err, "nodeAddr:%s quiting wrong proposeid:%s", stat.TargetAddr, addrStat.ProposalId)
}
addrStat.Status = pt.ParacrossNodeQuited
addrStat.Status = pt.ParaApplyQuited
addrStat.QuitId = stat.Id
receipt := makeParaNodeStatusReceipt(a.fromaddr, &preStat, addrStat)
......@@ -507,15 +508,15 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
if config.Title != stat.Title {
return nil, errors.Wrapf(pt.ErrNodeNotForTheTitle, "config title:%s,id title:%s", config.Title, stat.Title)
}
if stat.Status != pt.ParacrossNodeJoining && stat.Status != pt.ParacrossNodeQuiting {
if stat.Status != pt.ParaApplyJoining && stat.Status != pt.ParaApplyQuiting {
return nil, errors.Wrapf(pt.ErrParaNodeOpStatusWrong, "config id:%s,status:%d", config.Id, stat.Status)
}
//已经被其他id pass 场景
if stat.Status == pt.ParacrossNodeJoining && validNode(stat.TargetAddr, nodes) {
if stat.Status == pt.ParaApplyJoining && validNode(stat.TargetAddr, nodes) {
return nil, errors.Wrapf(pt.ErrParaNodeAddrExisted, "config id:%s,addr:%s", config.Id, stat.TargetAddr)
}
if stat.Status == pt.ParacrossNodeQuiting && !validNode(stat.TargetAddr, nodes) {
if stat.Status == pt.ParaApplyQuiting && !validNode(stat.TargetAddr, nodes) {
return nil, errors.Wrapf(pt.ErrParaNodeAddrNotExisted, "config id:%s,addr:%s", config.Id, stat.TargetAddr)
}
......@@ -538,9 +539,9 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
}
//剔除已退出nodegroup的addr的投票
updateVotes(stat, nodes)
updateVotes(stat.Votes, nodes)
most, vote := getMostVote(stat)
most, vote := getMostVote(stat.Votes)
if !isCommitDone(nodes, most) {
superManagerPass := false
if isSuperManager(cfg, a.fromaddr) {
......@@ -555,16 +556,16 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
}
//超级用户投yes票,共识停止了一定高度就可以通过,防止当前所有授权节点都忘掉私钥场景
if !(superManagerPass && most > 0 && vote == pt.ParaNodeVoteYes) {
if !(superManagerPass && most > 0 && vote == pt.ParaVoteYes) {
return makeNodeConfigReceipt(a.fromaddr, config, &copyStat, stat), nil
}
}
clog.Info("paracross.nodeVote ----pass", "most", most, "pass", vote)
receipt := &types.Receipt{Ty: types.ExecOk}
if vote == pt.ParaNodeVoteNo {
if stat.Status == pt.ParacrossNodeJoining {
stat.Status = pt.ParacrossNodeClosed
if vote == pt.ParaVoteNo {
if stat.Status == pt.ParaApplyJoining {
stat.Status = pt.ParaApplyClosed
stat.Height = a.height
//active coins
if !cfg.IsPara() {
......@@ -574,13 +575,13 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
}
receipt = mergeReceipt(receipt, r)
}
} else if stat.Status == pt.ParacrossNodeQuiting {
stat.Status = pt.ParacrossNodeClosed
} else if stat.Status == pt.ParaApplyQuiting {
stat.Status = pt.ParaApplyClosed
stat.Height = a.height
}
} else {
if stat.Status == pt.ParacrossNodeJoining {
if stat.Status == pt.ParaApplyJoining {
r, err := unpdateNodeGroup(a.db, config.Title, stat.TargetAddr, true)
if err != nil {
return nil, err
......@@ -593,9 +594,9 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
}
receipt = mergeReceipt(receipt, r)
stat.Status = pt.ParacrossNodeClosed
stat.Status = pt.ParaApplyClosed
stat.Height = a.height
} else if stat.Status == pt.ParacrossNodeQuiting {
} else if stat.Status == pt.ParaApplyQuiting {
r, err := unpdateNodeGroup(a.db, config.Title, stat.TargetAddr, false)
if err != nil {
return nil, err
......@@ -617,7 +618,7 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
receipt = mergeReceipt(receipt, r)
}
stat.Status = pt.ParacrossNodeClosed
stat.Status = pt.ParaApplyClosed
stat.Height = a.height
}
}
......@@ -927,6 +928,13 @@ func (a *action) nodeGroupApproveApply(config *pt.ParaNodeGroupConfig, apply *pt
receipt.KV = append(receipt.KV, r.KV...)
receipt.Logs = append(receipt.Logs, r.Logs...)
if types.IsPara() && types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
//不允许主链成功平行链失败导致不一致的情况,这里如果失败则手工设置init stage
r = selfConsensInitStage(a.db)
receipt.KV = append(receipt.KV, r.KV...)
receipt.Logs = append(receipt.Logs, r.Logs...)
}
return receipt, nil
}
......@@ -977,7 +985,7 @@ func (a *action) nodeGroupCreate(status *pt.ParaNodeGroupStatus) (*types.Receipt
for i, addr := range nodes {
stat := &pt.ParaNodeIdStatus{
Id: status.Id + "-" + strconv.Itoa(i),
Status: pt.ParacrossNodeClosed,
Status: pt.ParaApplyClosed,
Title: status.Title,
TargetAddr: addr,
Votes: &pt.ParaNodeVoteDetail{Addrs: []string{a.fromaddr}, Votes: []string{"yes"}},
......@@ -1042,20 +1050,20 @@ func (a *action) NodeConfig(config *pt.ParaNodeAddrConfig) (*types.Receipt, erro
return nil, pt.ErrInvalidTitle
}
if config.Op == pt.ParaNodeJoin {
if config.Op == pt.ParaOpNewApply {
return a.nodeJoin(config)
} else if config.Op == pt.ParaNodeQuit {
} else if config.Op == pt.ParaOpQuit {
return a.nodeQuit(config)
} else if config.Op == pt.ParaNodeCancel {
} else if config.Op == pt.ParaOpCancel {
if config.Id == "" {
return nil, types.ErrInvalidParam
}
return a.nodeCancel(config)
} else if config.Op == pt.ParaNodeVote {
if config.Id == "" || config.Value >= pt.ParaNodeVoteEnd {
} else if config.Op == pt.ParaOpVote {
if config.Id == "" || config.Value >= pt.ParaVoteEnd {
return nil, types.ErrInvalidParam
}
return a.nodeVote(config)
......
......@@ -129,7 +129,7 @@ func checkJoinReceipt(suite *NodeManageTestSuite, receipt *types.Receipt) {
assert.Nil(suite.T(), err, "decode ParaNodeAddrStatus failed")
//suite.T().Log("titleHeight", titleHeight)
assert.Equal(suite.T(), int32(pt.TyLogParaNodeConfig), receipt.Logs[0].Ty)
assert.Equal(suite.T(), int32(pt.ParacrossNodeJoining), stat.Status)
assert.Equal(suite.T(), int32(pt.ParaApplyJoining), stat.Status)
assert.NotNil(suite.T(), stat.Votes)
}
......@@ -144,7 +144,7 @@ func checkQuitReceipt(suite *NodeManageTestSuite, receipt *types.Receipt) {
assert.Nil(suite.T(), err, "decode ParaNodeAddrStatus failed")
//suite.T().Log("titleHeight", titleHeight)
assert.Equal(suite.T(), int32(pt.TyLogParaNodeConfig), receipt.Logs[0].Ty)
assert.Equal(suite.T(), int32(pt.ParacrossNodeQuiting), stat.Status)
assert.Equal(suite.T(), int32(pt.ParaApplyQuiting), stat.Status)
assert.NotNil(suite.T(), stat.Votes)
}
......@@ -178,9 +178,9 @@ func voteTest(suite *NodeManageTestSuite, id string, join bool) {
var count int
config := &pt.ParaNodeAddrConfig{
Title: chain33TestCfg.GetTitle(),
Op: pt.ParaNodeVote,
Op: pt.ParaOpVote,
Id: id,
Value: pt.ParaNodeVoteYes,
Value: pt.ParaVoteYes,
}
tx, err := pt.CreateRawNodeConfigTx(config)
suite.Nil(err)
......@@ -270,7 +270,7 @@ func (suite *NodeManageTestSuite) testNodeConfig() {
//Join test
config := &pt.ParaNodeAddrConfig{
Title: chain33TestCfg.GetTitle(),
Op: pt.ParaNodeJoin,
Op: pt.ParaOpNewApply,
Addr: Account14K,
}
tx, err := pt.CreateRawNodeConfigTx(config)
......@@ -290,7 +290,7 @@ func (suite *NodeManageTestSuite) testNodeConfig() {
//Quit test
config = &pt.ParaNodeAddrConfig{
Title: chain33TestCfg.GetTitle(),
Op: pt.ParaNodeQuit,
Op: pt.ParaOpQuit,
Addr: Account14K,
}
tx, err = pt.CreateRawNodeConfigTx(config)
......@@ -354,7 +354,7 @@ func TestUpdateVotes(t *testing.T) {
nodes["BB"] = struct{}{}
nodes["CC"] = struct{}{}
updateVotes(stat, nodes)
updateVotes(stat.Votes, nodes)
assert.Equal(t, []string{"BB", "CC"}, stat.Votes.Addrs)
assert.Equal(t, []string{"no", "no"}, stat.Votes.Votes)
}
......
......@@ -191,6 +191,90 @@ message ParacrossNodeStatus {
bool isStartHeight = 15;
}
message SelfConsensStages {
repeated SelfConsensStage items = 1;
}
message SelfConsensStage {
int64 blockHeight = 1;
uint32 enable = 2;
}
message SelfConsensStageInfo {
string id = 1;
uint32 status = 2;
SelfConsensStage stage = 3;
string fromAddr = 4;
int64 execHeight = 5;
ParaNodeVoteDetail votes = 6;
}
message LocalSelfConsStageInfo {
SelfConsensStageInfo stage = 1;
string txIndex = 2;
}
message ConfigVoteInfo {
string id = 1;
// 投票值 1:ok 2:nok
uint32 value = 2;
}
message ConfigCancelInfo {
string id = 1;
}
//广义配置类型
message ParaStageConfig {
string title = 1;
// 配置类型
uint32 opTy = 2;
oneof op {
SelfConsensStage stage = 10;
ConfigVoteInfo vote = 11;
ConfigCancelInfo cancel= 12;
}
}
message ReceiptSelfConsStageConfig {
SelfConsensStageInfo prev = 1;
SelfConsensStageInfo current = 2;
}
message ReceiptSelfConsStageVoteDone {
string id = 1;
SelfConsensStage stage = 2;
int32 totalNodes = 3;
int32 totalVote = 4;
int32 mostVote = 5;
string voteRst = 6;
}
message ReceiptSelfConsStagesUpdate{
SelfConsensStages prev = 1;
SelfConsensStages current = 2;
}
// query
message ReqQuerySelfStages {
uint32 status = 1;
string id = 2;
int32 count = 3;
int32 direction = 4;
int64 height = 5;
int32 index = 6;
}
message ReplyQuerySelfStages {
repeated SelfConsensStageInfo stageInfo = 1;
}
message ParacrossCommitAction {
ParacrossNodeStatus status = 1;
}
......@@ -211,6 +295,7 @@ message ParacrossAction {
AssetsTransferToExec transferToExec = 8;
ParaNodeAddrConfig nodeConfig = 9;
ParaNodeGroupConfig nodeGroupConfig = 10;
ParaStageConfig selfStageConfig = 11;
}
......
......@@ -370,3 +370,71 @@ func (c *Jrpc) ListNodeGroupStatus(req *pt.ReqParacrossNodeInfo, result *interfa
*result = data
return err
}
// GetNodeGroupAddrs get super node group addrs
func (c *channelClient) GetSelfConsStages(ctx context.Context, req *types.ReqNil) (*pt.SelfConsensStages, error) {
r := *req
data, err := c.Query(pt.GetExecName(), "GetSelfConsStages", &r)
if err != nil {
return nil, err
}
if resp, ok := data.(*pt.SelfConsensStages); ok {
return resp, nil
}
return nil, types.ErrDecode
}
// GetNodeGroupAddrs get super node group addrs
func (c *Jrpc) GetSelfConsStages(req *types.ReqNil, result *interface{}) error {
data, err := c.cli.GetSelfConsStages(context.Background(), req)
if err != nil {
return err
}
*result = data
return err
}
// GetNodeGroupAddrs get super node group addrs
func (c *channelClient) GetSelfConsOneStage(ctx context.Context, req *types.Int64) (*pt.SelfConsensStage, error) {
r := *req
data, err := c.Query(pt.GetExecName(), "GetSelfConsOneStage", &r)
if err != nil {
return nil, err
}
if resp, ok := data.(*pt.SelfConsensStage); ok {
return resp, nil
}
return nil, types.ErrDecode
}
// GetNodeGroupAddrs get super node group addrs
func (c *Jrpc) GetSelfConsOneStage(req *types.Int64, result *interface{}) error {
data, err := c.cli.GetSelfConsOneStage(context.Background(), req)
if err != nil {
return err
}
*result = data
return err
}
func (c *channelClient) ListSelfStages(ctx context.Context, req *pt.ReqQuerySelfStages) (*pt.ReplyQuerySelfStages, error) {
r := *req
data, err := c.Query(pt.GetExecName(), "ListSelfStages", &r)
if err != nil {
return nil, err
}
if resp, ok := data.(*pt.ReplyQuerySelfStages); ok {
return resp, nil
}
return nil, types.ErrDecode
}
// ListSelfStages get paracross self consensus stage list
func (c *Jrpc) ListSelfStages(req *pt.ReqQuerySelfStages, result *interface{}) error {
data, err := c.cli.ListSelfStages(context.Background(), req)
if err != nil {
return err
}
*result = data
return err
}
......@@ -51,4 +51,8 @@ var (
ErrParaConsensStopBlocksNotReach = errors.New("ErrParaConsensStopBlocksNotReach")
//ErrForkHeightNotReach fork height not reach
ErrForkHeightNotReach = errors.New("ErrForkHeightNotReach")
//ErrHeightHasPast height has past
ErrHeightHasPast = errors.New("ErrHeightHasPast")
// ErrKeyNotExist config key not exist
ErrKeyNotExist = errors.New("ErrKeyNotExist")
)
......@@ -43,6 +43,9 @@ const (
TyLogParaNodeGroupConfig = 660
TyLogParaNodeStatusUpdate = 661
TyLogParaNodeGroupStatusUpdate = 664
TyLogParaSelfConsStageConfig = 665
TyLogParaStageVoteDone = 666
TyLogParaStageGroupUpdate = 667
)
type paracrossCommitTx struct {
......@@ -78,6 +81,8 @@ const (
ParacrossActionNodeConfig
//ParacrossActionNodeGroupApply apply for node group initially
ParacrossActionNodeGroupApply
//ParacrossActionSelfConsensStageConfig apply for self consensus stage config
ParacrossActionSelfStageConfig
)
// status
......@@ -88,42 +93,49 @@ const (
ParacrossStatusCommitDone
)
// node config op
// config op
const (
ParaNodeJoin = iota + 1
ParaNodeVote
ParaNodeQuit
ParaNodeCancel
ParaOpNewApply = iota + 1
ParaOpVote
ParaOpQuit
ParaOpCancel
)
// node vote op
const (
ParaNodeVoteInvalid = iota
ParaNodeVoteYes
ParaNodeVoteNo
ParaNodeVoteEnd
ParaVoteInvalid = iota
ParaVoteYes
ParaVoteNo
ParaVoteEnd
)
const (
ParaConfigInvalid = iota
ParaConfigYes
ParaConfigNo
)
// ParaNodeVoteStr ...
var ParaNodeVoteStr = []string{"invalid", "yes", "no"}
//针对addr申请的id的生命周期
const (
// ParacrossNodeJoined pass to add by votes
ParacrossNodeJoined = iota + 10
// ParacrossNodeQuited pass to quite by votes
ParacrossNodeQuited
// ParaApplyJoining apply for join group
ParaApplyJoining = iota + 1
// ParaApplyQuiting apply for quiting group
ParaApplyQuiting
// ParaApplyClosed id voting closed
ParaApplyClosed
// ParaApplyCanceled to cancel apply of joining or quiting
ParaApplyCanceled
)
//voting status
//针对addr本身的生命周期,addr维护了申请id和quit id,方便查询如coinfrozen等额外信息
const (
// ParacrossNodeIDJoining apply for join group
ParacrossNodeJoining = iota + 1
// ParacrossNodeIDQuiting apply for quiting group
ParacrossNodeQuiting
// ParacrossNodeIDClosed id voting closed
ParacrossNodeClosed
// ParacrossNodeCanceled to cancel apply of joining or quiting
ParacrossNodeCanceled
// ParaApplyJoined pass to add by votes
ParaApplyJoined = iota + 10
// ParaApplyQuited pass to quite by votes
ParaApplyQuited
)
const (
......@@ -232,6 +244,22 @@ func CreateRawNodeGroupApplyTx(apply *ParaNodeGroupConfig) (*types.Transaction,
}
//CreateRawSelfConsStageApplyTx create raw tx for self consens stage
func CreateRawSelfConsStageApplyTx(apply *ParaStageConfig) (*types.Transaction, error) {
action := &ParacrossAction{
Ty: ParacrossActionSelfStageConfig,
Value: &ParacrossAction_SelfStageConfig{apply},
}
tx := &types.Transaction{
Payload: types.Encode(action),
}
return tx, nil
}
// CreateRawAssetTransferTx create asset transfer tx
func CreateRawAssetTransferTx(cfg *types.Chain33Config, param *types.CreateTx) (*types.Transaction, error) {
// 跨链交易需要在主链和平行链上执行, 所以应该可以在主链和平行链上构建
......@@ -339,8 +367,6 @@ func GetDappForkHeight(cfg *types.Chain33Config, forkKey string) int64 {
forkHeight = 10
case ForkLoopCheckCommitTxDone:
forkHeight = 60
case ForkConsensSupportJump:
forkHeight = 100
}
}
}
......
......@@ -26,7 +26,9 @@ var (
MainLoopCheckCommitTxDoneForkHeight = "mainLoopCheckCommitTxDoneForkHeight"
// ForkConsensSupportJump 支持主链共识从-1开始跳跃一次
ForkConsensSupportJump = "ForkConsensSupportJump"
)
// ForkParaSelfConsStages 平行链自共识分阶段共识
ForkParaSelfConsStages = "ForkParaSelfConsStages"
)
func init() {
// init executor type
......@@ -42,6 +44,8 @@ func InitFork(cfg *types.Chain33Config) {
cfg.RegisterDappFork(ParaX, ForkCommitTx, 1850000)
cfg.RegisterDappFork(ParaX, ForkLoopCheckCommitTxDone, 3230000)
cfg.RegisterDappFork(ParaX, ForkConsensSupportJump, types.MaxHeight)
//只在平行链启用
cfg.RegisterDappFork(ParaX, ForkParaSelfConsStages, types.MaxHeight)
}
func InitExecutor(cfg *types.Chain33Config) {
......@@ -87,6 +91,9 @@ func (p *ParacrossType) GetLogMap() map[int64]*types.LogInfo {
TyLogParaNodeVoteDone: {Ty: reflect.TypeOf(ReceiptParaNodeVoteDone{}), Name: "LogParaNodeVoteDone"},
TyLogParaNodeGroupConfig: {Ty: reflect.TypeOf(ReceiptParaNodeGroupConfig{}), Name: "LogParaNodeGroupConfig"},
TyLogParaNodeGroupStatusUpdate: {Ty: reflect.TypeOf(ReceiptParaNodeGroupConfig{}), Name: "LogParaNodeGroupStatusUpdate"},
TyLogParaSelfConsStageConfig: {Ty: reflect.TypeOf(ReceiptSelfConsStageConfig{}), Name: "LogParaSelfConsStageConfig"},
TyLogParaStageVoteDone: {Ty: reflect.TypeOf(ReceiptSelfConsStageVoteDone{}), Name: "LogParaSelfConfStageVoteDoen"},
TyLogParaStageGroupUpdate: {Ty: reflect.TypeOf(ReceiptSelfConsStagesUpdate{}), Name: "LogParaSelfConfStagesUpdate"},
}
}
......@@ -102,6 +109,7 @@ func (p *ParacrossType) GetTypeMap() map[string]int32 {
"TransferToExec": ParacrossActionTransferToExec,
"NodeConfig": ParacrossActionNodeConfig,
"NodeGroupConfig": ParacrossActionNodeGroupApply,
"SelfStageConfig": ParacrossActionSelfStageConfig,
}
}
......@@ -153,7 +161,18 @@ func (p ParacrossType) CreateTx(action string, message json.RawMessage) (*types.
return nil, types.ErrInvalidParam
}
return CreateRawNodeGroupApplyTx(&param)
} else if action == "selfConsStageConfig" {
if !cfg.IsPara() {
return nil, types.ErrNotSupport
}
var param ParaStageConfig
err := types.JSONToPB(message, &param)
//err := json.Unmarshal(message, &param)
if err != nil {
glog.Error("CreateTx.selfConsStageConfig", "Error", err)
return nil, types.ErrInvalidParam
}
return CreateRawSelfConsStageApplyTx(cfg, &param)
}
return nil, types.ErrNotSupport
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment