Commit 63d2b2cb authored by mdj33's avatar mdj33 Committed by vipwzw

add stage cmd

parent 6d6bf32e
......@@ -108,15 +108,11 @@ mainBlockHashForkHeight=209186
mainForkParacrossCommitTx=2270000
#主链开启循环检查共识交易done的fork高度,需要和主链保持严格一致,不可修改,4320000是bityuan主链对应高度, ycc或其他按实际修改
mainLoopCheckCommitTxDoneForkHeight=4320000
#精简commitMsg,删除多余的hash参数,缺省mainLoopCheckCommitTxDoneForkHeight一致,一些特殊的链如game需定制
rmCommitMsgHashParamMainHeight=4320000
#paraSelfConsInitDisable=false
#主链每隔几个没有相关平行链交易的区块,平行链上打包空区块,缺省从平行链blockHeight=0开始,依次增长,空块间隔不能为0
[[consensus.sub.para.emptyBlockInterval]]
blockHeight=0
interval=50
#平行链自共识开启对应的主链高度,必须大于等于MainForkParacrossCommitTx=2270000, -1 不开启,可以多个高度开启和关闭
#selfConsensEnablePreContract=["0-9848"]
[store]
......
......@@ -72,26 +72,26 @@ type emptyBlockInterval struct {
}
type subConfig struct {
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []*emptyBlockInterval `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
FetchFilterParaTxsClose bool `json:"fetchFilterParaTxsClose,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitMsgHashParamMainHeight int64 `json:"rmCommitMsgHashParamMainHeight,omitempty"`
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []*emptyBlockInterval `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
FetchFilterParaTxsClose bool `json:"fetchFilterParaTxsClose,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
}
// New function to init paracross env
......
......@@ -365,12 +365,17 @@ func (client *blockSyncClient) addMinerTx(preStateHash []byte, block *types.Bloc
MainBlockHash: localBlock.MainHash,
MainBlockHeight: localBlock.MainHeight,
}
if status.MainBlockHeight < client.paraClient.subCfg.RmCommitMsgHashParamMainHeight {
maxHeight := pt.GetDappForkHeight(cfg,pt.ForkLoopCheckCommitTxDone)
if maxHeight < client.paraClient.subCfg.RmCommitParamMainHeight {
maxHeight = client.paraClient.subCfg.RmCommitParamMainHeight
}
if status.MainBlockHeight < maxHeight {
status.PreBlockHash = block.ParentHash
status.PreStateHash = preStateHash
}
//selfConsensEnablePreContract 是ForkParaSelfConsStages之前对是否开启共识的设置,fork之后采用合约控制,两个高度不能重叠
tx, err := pt.CreateRawMinerTx(cfg, &pt.ParacrossMinerAction{
Status: status,
IsSelfConsensus: client.paraClient.commitMsgClient.isSelfConsEnable(status.Height),
......
......@@ -40,13 +40,9 @@ function para_set_toml() {
sed -i $xsedfix 's/^mainForkParacrossCommitTx=.*/mainForkParacrossCommitTx=10/g' "${1}"
sed -i $xsedfix 's/^mainLoopCheckCommitTxDoneForkHeight=.*/mainLoopCheckCommitTxDoneForkHeight='''$MainLoopCheckForkHeight'''/g' "${1}"
sed -i $xsedfix 's/^rmCommitMsgHashParamMainHeight=.*/rmCommitMsgHashParamMainHeight='''$MainLoopCheckForkHeight'''/g' "${1}"
sed -i $xsedfix 's/^mainBlockHashForkHeight=.*/mainBlockHashForkHeight=1/g' "${1}"
#sed -i $xsedfix '/\[\[consensus.sub.para.selfConsensusEnable\]\]/{n;n;s/enable=false/enable=true/}' "${1}"
sed -i $xsedfix '/\[\[consensus.sub.para.selfConsensusEnable\]\]/!b;n;n;cenable=true' "${1}"
# rpc
sed -i $xsedfix 's/^jrpcBindAddr=.*/jrpcBindAddr="0.0.0.0:8901"/g' "${1}"
sed -i $xsedfix 's/^grpcBindAddr=.*/grpcBindAddr="0.0.0.0:8902"/g' "${1}"
......
......@@ -1059,7 +1059,7 @@ func showSelfStages(cmd *cobra.Command, args []string) {
height, _ := cmd.Flags().GetInt64("height")
index, _ := cmd.Flags().GetInt32("index")
params := &pt.ReqQuerySelfStages{
params := pt.ReqQuerySelfStages{
Status: status,
Id: id,
Count: count,
......
......@@ -320,15 +320,14 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
cfg := a.api.GetConfig()
var stage *pt.SelfConsensStage
if types.IsPara() && types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
_, stages, err := getSelfConsensStages(a.db)
stages, err := getSelfConsensStages(a.db)
if err != nil {
return &types.Receipt{}, nil
return nil, err
}
stage = getSelfConsOneStage(a.height, stages)
clog.Info("paracross.commit", "height", a.height, "stageHeight", stage.BlockHeight, "enable", stage.Enable)
if stage.Enable == pt.ParaConfigNo {
return &types.Receipt{}, nil
return nil, pt.ErrConsensClosed
}
}
......@@ -950,17 +949,12 @@ func (a *action) Miner(miner *pt.ParacrossMinerAction) (*types.Receipt, error) {
minerReceipt := &types.Receipt{Ty: types.ExecOk, KV: nil, Logs: logs}
isSelfConsensOn := miner.IsSelfConsensus
if types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
_, stages, err := getSelfConsensStages(a.db)
if err != nil && errors.Cause(err) != pt.ErrKeyNotExist {
var err error
isSelfConsensOn, err = isSelfConsOn(a.db, miner.Status.Height)
if err != nil {
clog.Error("paracross miner getConsensus ", "height", miner.Status.Height, "err", err)
return nil, err
}
if stages != nil {
stage := getSelfConsOneStage(miner.Status.Height, stages)
isSelfConsensOn = stage.Enable == pt.ParaConfigYes
} else {
isSelfConsensOn = false
}
}
//自共识后才挖矿
......
......@@ -411,7 +411,7 @@ func (p *Paracross) paracrossGetAssetTxResult(hash []byte) (types.Message, error
//Query_GetSelfConsStages get self consensus stages configed
func (p *Paracross) Query_GetSelfConsStages(in *types.ReqNil) (types.Message, error) {
_, stages, err := getSelfConsensStages(p.GetStateDB())
stages, err := getSelfConsensStages(p.GetStateDB())
if err != nil {
return nil, errors.Cause(err)
}
......@@ -421,7 +421,7 @@ func (p *Paracross) Query_GetSelfConsStages(in *types.ReqNil) (types.Message, er
//Query_GetSelfConsOneStage get self consensus one stage
func (p *Paracross) Query_GetSelfConsOneStage(in *types.Int64) (types.Message, error) {
_, stages, err := getSelfConsensStages(p.GetStateDB())
stages, err := getSelfConsensStages(p.GetStateDB())
if err != nil {
return nil, errors.Cause(err)
}
......
......@@ -84,43 +84,53 @@ func makeStageGroupReceipt(prev, current *pt.SelfConsensStages) *types.Receipt {
}
}
func getSelfConsensStages(db dbm.KV) (map[int64]uint32, *pt.SelfConsensStages, error) {
func getSelfConsensStages(db dbm.KV) (*pt.SelfConsensStages, error) {
key := calcParaSelfConsStagesKey()
item, err := db.Get(key)
if err != nil {
if isNotFound(err) {
err = pt.ErrKeyNotExist
}
return nil, nil, errors.Wrapf(err, "getSelfConsensStages key:%s", string(key))
return nil, errors.Wrapf(err, "getSelfConsensStages key:%s", string(key))
}
var config pt.SelfConsensStages
err = types.Decode(item, &config)
if err != nil {
return nil, nil, errors.Wrap(err, "getSelfConsensStages decode config")
return nil, errors.Wrap(err, "getSelfConsensStages decode config")
}
return &config, nil
}
func getSelfConsStagesMap(stages []*pt.SelfConsensStage) map[int64]uint32 {
stagesMap := make(map[int64]uint32)
for _, v := range config.Items {
for _, v := range stages {
stagesMap[v.BlockHeight] = v.Enable
}
return stagesMap
}
return stagesMap, &config, nil
func sortStages(stages *pt.SelfConsensStages, new *pt.SelfConsensStage) {
stages.Items = append(stages.Items, new)
sort.Slice(stages.Items, func(i, j int) bool { return stages.Items[i].BlockHeight < stages.Items[j].BlockHeight })
}
func updateStages(db dbm.KV, stage *pt.SelfConsensStage) (*types.Receipt, error) {
_, stages, err := getSelfConsensStages(db)
stages, err := getSelfConsensStages(db)
if err != nil {
return nil, err
}
oldStages := *stages
stages.Items = append(stages.Items, stage)
sort.Slice(stages.Items, func(i, j int) bool { return stages.Items[i].BlockHeight < stages.Items[j].BlockHeight })
return makeStageGroupReceipt(&oldStages, stages), nil
var old pt.SelfConsensStages
err = deepCopy(&old, stages)
if err != nil {
clog.Error("updateStages deep copy fail", "copy", old, "stat", stages)
return nil, err
}
sortStages(stages, stage)
return makeStageGroupReceipt(&old, stages), nil
}
func selfConsensInitStage(db dbm.KV) *types.Receipt {
func selfConsensInitStage() *types.Receipt {
close := types.IsEnable("consensus.sub.para.paraSelfConsInitDisable")
stage := &pt.SelfConsensStage{BlockHeight: 0, Enable: pt.ParaConfigYes}
if close {
......@@ -136,23 +146,36 @@ func getSelfConsOneStage(height int64, stages *pt.SelfConsensStages) *pt.SelfCon
return stages.Items[i]
}
}
clog.Error("getSelfConsOneStage height not hit", "height", height)
return &pt.SelfConsensStage{Enable: pt.ParaConfigNo}
}
func isSelfConsOn(db dbm.KV, height int64) (bool, error) {
stages, err := getSelfConsensStages(db)
if err != nil && errors.Cause(err) != pt.ErrKeyNotExist {
return false, err
}
if stages != nil {
stage := getSelfConsOneStage(height, stages)
return stage.Enable == pt.ParaConfigYes, nil
}
return false, nil
}
func (a *action) checkValidStage(config *pt.SelfConsensStage) error {
//1. 设置高度必须大于当前区块高度
if config.BlockHeight <= a.height {
return errors.Wrapf(pt.ErrHeightHasPast, "checkValidStage config height:%d less than block height:%d", config.BlockHeight, a.height)
}
//2. 如果已经设置到stages中,简单起见,就不能更改了,应该也不会有很大影响
stages, _, err := getSelfConsensStages(a.db)
stages, err := getSelfConsensStages(a.db)
if err != nil {
return errors.Wrapf(err, "checkValidStage get stages")
}
if _, exist := stages[config.BlockHeight]; exist {
stageMap := getSelfConsStagesMap(stages.Items)
if _, exist := stageMap[config.BlockHeight]; exist {
return errors.Wrapf(err, "checkValidStage config height:%d existed", config.BlockHeight)
}
......@@ -189,7 +212,7 @@ func (a *action) stageCancel(config *pt.ConfigCancelInfo) (*types.Receipt, error
return nil, errors.Wrapf(types.ErrNotAllow, "stage id create by:%s,not by:%s", stat.FromAddr, a.fromaddr)
}
if stat.Status != pt.ParaApplyJoining {
if stat.Status != pt.ParaApplyJoining && stat.Status != pt.ParaApplyVoting {
return nil, errors.Wrapf(pt.ErrParaNodeOpStatusWrong, "stage config id:%s,status:%d", config.Id, stat.Status)
}
......@@ -216,10 +239,10 @@ func (a *action) stageVote(config *pt.ConfigVoteInfo) (*types.Receipt, error) {
stat, err := getStageID(a.db, config.Id)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "stageVote getStageID:%s", config.Id)
}
if stat.Status != pt.ParaApplyJoining {
if stat.Status != pt.ParaApplyJoining && stat.Status != pt.ParaApplyVoting {
return nil, errors.Wrapf(pt.ErrParaNodeOpStatusWrong, "config id:%s,status:%d", config.Id, stat.Status)
}
//stage blockHeight 也不能小于当前vote tx height,不然没有意义
......@@ -234,7 +257,7 @@ func (a *action) stageVote(config *pt.ConfigVoteInfo) (*types.Receipt, error) {
clog.Error("selfConsensVote deep copy fail", "copy", copyStat, "stat", stat)
return nil, err
}
stat.Status = pt.ParaApplyVoting
if stat.Votes == nil {
stat.Votes = &pt.ParaNodeVoteDetail{}
}
......@@ -253,13 +276,13 @@ func (a *action) stageVote(config *pt.ConfigVoteInfo) (*types.Receipt, error) {
if !isCommitDone(nodes, most) {
return makeStageConfigReceipt(&copyStat, stat), nil
}
clog.Info("paracross.nodeVote ----pass", "most", most, "pass", vote)
clog.Info("paracross.stageVote ----pass", "most", most, "pass", vote)
receipt := &types.Receipt{Ty: types.ExecOk}
if vote == pt.ParaVoteYes {
r, err := updateStages(a.db, stat.Stage)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "stageVote updateStages")
}
receipt = mergeReceipt(receipt, r)
}
......@@ -279,7 +302,7 @@ func (a *action) SelfStageConfig(config *pt.ParaStageConfig) (*types.Receipt, er
if config.OpTy == pt.ParaOpNewApply {
return a.stageApply(config.GetStage())
} else if config.OpTy == pt.ParaOpQuit {
} else if config.OpTy == pt.ParaOpCancel {
return a.stageCancel(config.GetCancel())
} else if config.OpTy == pt.ParaOpVote {
......
......@@ -5,6 +5,7 @@
package executor
import (
"github.com/33cn/chain33/common/db/table"
"github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
......@@ -22,7 +23,7 @@ func (e *Paracross) execAutoLocalStage(tx *types.Transaction, receiptData *types
func (e *Paracross) execLocalStage(receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
table := NewStageTable(e.GetLocalDB())
txIndex := dapp.HeightIndexStr(e.GetHeight(), int64(index))
for _, log := range receiptData.Logs {
switch log.Ty {
case pt.TyLogParaSelfConsStageConfig:
......@@ -32,13 +33,18 @@ func (e *Paracross) execLocalStage(receiptData *types.ReceiptData, index int) (*
if err != nil {
return nil, err
}
r := &pt.LocalSelfConsStageInfo{
Stage: receipt.Current,
TxIndex: txIndex,
}
err = table.Replace(r)
if err != nil {
return nil, err
if receipt.Current.Status == pt.ParaApplyJoining {
txIndex := dapp.HeightIndexStr(e.GetHeight(), int64(index))
r := &pt.LocalSelfConsStageInfo{
Stage: receipt.Current,
TxIndex: txIndex,
}
err = table.Add(r)
if err != nil {
return nil, err
}
} else {
update(table, receipt.GetCurrent())
}
}
default:
......@@ -64,6 +70,22 @@ func (e *Paracross) execAutoDelLocal(tx *types.Transaction, receiptData *types.R
return dbSet, nil
}
func update(ldb *table.Table, stage *pt.SelfConsensStageInfo) error {
xs, err := ldb.ListIndex("id", []byte(stage.Id), nil, 1, 0)
if err != nil || len(xs) != 1 {
clog.Error("SelfStages update query List failed", "key", stage.Id, "err", err, "len", len(xs))
return nil
}
u, ok := xs[0].Data.(*pt.LocalSelfConsStageInfo)
if !ok {
clog.Error("SelfStages update decode failed", "data", xs[0].Data)
return nil
}
u.Stage = stage
return ldb.Update([]byte(u.TxIndex), u)
}
func (e *Paracross) listSelfStages(req *pt.ReqQuerySelfStages) (types.Message, error) {
if req == nil {
return nil, types.ErrInvalidParam
......@@ -80,33 +102,30 @@ func (e *Paracross) listSelfStages(req *pt.ReqQuerySelfStages) (types.Message, e
} else if req.Id != "" {
indexName = "id"
}
cur := &StageRow{
LocalSelfConsStageInfo: &pt.LocalSelfConsStageInfo{},
LocalSelfConsStageInfo: &pt.LocalSelfConsStageInfo{Stage: &pt.SelfConsensStageInfo{}},
}
cur.Stage.Status = req.Status
cur.Stage.Id = req.Id
cur.Stage.Id = calcParaSelfConsensStageIDKey(req.Id)
prefix, err := cur.Get(indexName)
if err != nil {
clog.Error("Get", "indexName", indexName, "err", err)
clog.Error("listSelfStages Get", "indexName", indexName, "err", err)
return nil, err
}
rows, err := query.ListIndex(indexName, prefix, primary, req.Count, req.Direction)
if err != nil {
clog.Error("query List failed", "indexName", indexName, "prefix", "prefix", "key", string(primary), "err", err)
clog.Error("listSelfStages query failed", "indexName", indexName, "prefix", prefix, "key", string(primary), "err", err)
return nil, err
}
if len(rows) == 0 {
return nil, types.ErrNotFound
}
var rep pt.ReplyQuerySelfStages
for _, row := range rows {
r, ok := row.Data.(*pt.LocalSelfConsStageInfo)
if !ok {
clog.Error("listProposalProject", "err", "bad row type")
clog.Error("listSelfStages", "err", "bad row type")
return nil, types.ErrDecode
}
ok, txID, _ := getRealTxHashID(r.Stage.Id)
......
......@@ -39,7 +39,7 @@ type StageRow struct {
//NewStageRow 新建一个meta 结构
func NewStageRow() *StageRow {
return &StageRow{LocalSelfConsStageInfo: nil}
return &StageRow{LocalSelfConsStageInfo: &pt.LocalSelfConsStageInfo{}}
}
//CreateRow 新建数据行(注意index 数据一定也要保存到数据中,不能就保存heightindex)
......
......@@ -930,7 +930,7 @@ func (a *action) nodeGroupApproveApply(config *pt.ParaNodeGroupConfig, apply *pt
if types.IsPara() && types.IsDappFork(a.height, pt.ParaX, pt.ForkParaSelfConsStages) {
//不允许主链成功平行链失败导致不一致的情况,这里如果失败则手工设置init stage
r = selfConsensInitStage(a.db)
r = selfConsensInitStage()
receipt.KV = append(receipt.KV, r.KV...)
receipt.Logs = append(receipt.Logs, r.Logs...)
}
......
......@@ -219,9 +219,9 @@ message LocalSelfConsStageInfo {
message ConfigVoteInfo {
string id = 1;
string id = 1;
// 投票值 1:ok 2:nok
uint32 value = 2;
uint32 value = 2;
}
message ConfigCancelInfo {
......@@ -230,9 +230,9 @@ message ConfigCancelInfo {
//广义配置类型
message ParaStageConfig {
string title = 1;
string title = 1;
// 配置类型
uint32 opTy = 2;
uint32 opTy = 2;
oneof op {
SelfConsensStage stage = 10;
ConfigVoteInfo vote = 11;
......@@ -247,12 +247,12 @@ message ReceiptSelfConsStageConfig {
}
message ReceiptSelfConsStageVoteDone {
string id = 1;
SelfConsensStage stage = 2;
int32 totalNodes = 3;
int32 totalVote = 4;
int32 mostVote = 5;
string voteRst = 6;
string id = 1;
SelfConsensStage stage = 2;
int32 totalNodes = 3;
int32 totalVote = 4;
int32 mostVote = 5;
string voteRst = 6;
}
message ReceiptSelfConsStagesUpdate{
......
......@@ -55,4 +55,6 @@ var (
ErrHeightHasPast = errors.New("ErrHeightHasPast")
// ErrKeyNotExist config key not exist
ErrKeyNotExist = errors.New("ErrKeyNotExist")
// ErrConsensClosed consensus closed
ErrConsensClosed = errors.New("ErrConsensClosed")
)
......@@ -128,6 +128,8 @@ const (
ParaApplyClosed
// ParaApplyCanceled to cancel apply of joining or quiting
ParaApplyCanceled
// ParaApplyVoting record voting status
ParaApplyVoting
)
//针对addr本身的生命周期,addr维护了申请id和quit id,方便查询如coinfrozen等额外信息
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment