Commit a9674899 authored by mdj33's avatar mdj33 Committed by vipwzw

self-consens stages

parent 4e39fe06
......@@ -104,15 +104,16 @@ authAccount=""
genesisAmount=100000000
#主链支持平行链共识tx分叉高度,需要和主链保持严格一致,不可修改,2270000是bityuan主链对应高度, ycc或其他按实际修改
MainForkParacrossCommitTx=2270000
#平行链自共识开启对应的主链高度,需要大于等于MainForkParacrossCommitTx=2270000, -1 不开启
MainParaSelfConsensusForkHeight=-1
#主链开启循环检查共识交易done的fork高度,需要和主链保持严格一致,不可修改,4320000是bityuan主链对应高度, ycc或其他按实际修改
MainLoopCheckCommitTxDoneForkHeight=4320000
#主链每隔几个没有相关平行链交易的区块,平行链上打包空区块,缺省从平行链blockHeight=0开始,依次增长,空块间隔不能为0
[[consensus.sub.para.emptyBlockInterval]]
blockHeight=0
interval=50
#平行链自共识开启对应的主链高度,必须大于等于MainForkParacrossCommitTx=2270000, -1 不开启,可以多个高度开启和关闭
[[consensus.sub.para.selfConsensusEnable]]
blockHeight=0
enable=false
[store]
......
......@@ -73,27 +73,32 @@ type emptyBlockInterval struct {
Interval int64 `json:"interval,omitempty"`
}
type paraSelfConsEnable struct {
BlockHeight int64 `json:"blockHeight,omitempty"`
Enable bool `json:"enable,omitempty"`
}
type subConfig struct {
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []*emptyBlockInterval `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
MainParaSelfConsensusForkHeight int64 `json:"mainParaSelfConsensusForkHeight,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
FetchFilterParaTxsClose bool `json:"fetchFilterParaTxsClose,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
WriteBlockSeconds int64 `json:"writeBlockSeconds,omitempty"`
ParaRemoteGrpcClient string `json:"paraRemoteGrpcClient,omitempty"`
StartHeight int64 `json:"startHeight,omitempty"`
GenesisStartHeightSame bool `json:"genesisStartHeightSame,omitempty"`
EmptyBlockInterval []*emptyBlockInterval `json:"emptyBlockInterval,omitempty"`
AuthAccount string `json:"authAccount,omitempty"`
WaitBlocks4CommitMsg int32 `json:"waitBlocks4CommitMsg,omitempty"`
GenesisAmount int64 `json:"genesisAmount,omitempty"`
MainBlockHashForkHeight int64 `json:"mainBlockHashForkHeight,omitempty"`
selfConsensusEnable []*paraSelfConsEnable `json:"selfConsensusEnable,omitempty"`
WaitConsensStopTimes uint32 `json:"waitConsensStopTimes,omitempty"`
MaxCacheCount int64 `json:"maxCacheCount,omitempty"`
MaxSyncErrCount int32 `json:"maxSyncErrCount,omitempty"`
FetchFilterParaTxsClose bool `json:"fetchFilterParaTxsClose,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen bool `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
}
// New function to init paracross env
......@@ -123,8 +128,18 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
subcfg.MainBlockHashForkHeight = defaultMainBlockHashForkHeight
}
if subcfg.MainParaSelfConsensusForkHeight <= 0 {
subcfg.MainParaSelfConsensusForkHeight = mainParaSelfConsensusForkHeight
if len(subcfg.selfConsensusEnable) == 0 {
selfEnable := &paraSelfConsEnable{Enable: false}
subcfg.selfConsensusEnable = append(subcfg.selfConsensusEnable, selfEnable)
}
if subcfg.selfConsensusEnable[0].BlockHeight != 0 {
selfEnable := &paraSelfConsEnable{Enable: false}
subcfg.selfConsensusEnable = append([]*paraSelfConsEnable{selfEnable}, subcfg.selfConsensusEnable...)
}
err = checkSelfConsensEnable(subcfg.selfConsensusEnable)
if err != nil {
panic("para selfConsensusEnable config not correct")
}
if subcfg.BatchFetchBlockCount <= 0 {
......@@ -161,7 +176,8 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
waitConsensStopTimes: waitConsensStopTimes,
consensHeight: -2,
sendingHeight: -1,
consensStartHeight: -1,
consensDoneHeight: -1,
selfConsensMap: make(map[int64]bool),
resetCh: make(chan interface{}, 1),
quit: make(chan struct{}),
}
......@@ -173,10 +189,12 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para.commitMsgClient.waitConsensStopTimes = subcfg.WaitConsensStopTimes
}
para.commitMsgClient.setSelfConsensMap(subcfg.selfConsensusEnable)
// 设置平行链共识起始高度,在共识高度为-1也就是从未共识过的环境中允许从设置的非0起始高度开始共识
//note:只有在主链LoopCheckCommitTxDoneForkHeight之后才支持设置ParaConsensStartHeight
if subcfg.ParaConsensStartHeight > 0 {
para.commitMsgClient.consensStartHeight = subcfg.ParaConsensStartHeight - 1
para.commitMsgClient.consensDoneHeight = subcfg.ParaConsensStartHeight - 1
}
para.blockSyncClient = &blockSyncClient{
......@@ -231,6 +249,17 @@ func checkEmptyBlockInterval(in []*emptyBlockInterval) error {
return nil
}
func checkSelfConsensEnable(in []*paraSelfConsEnable) error {
for i := 0; i < len(in); i++ {
if i > 0 && in[i].BlockHeight <= in[i-1].BlockHeight {
plog.Error("selfConsensEnable,blockHeight should be sequence", "preHeight", in[i-1].BlockHeight, "laterHeight", in[i].BlockHeight)
return types.ErrInvalidParam
}
}
return nil
}
//para 不检查任何的交易
func (client *client) CheckBlock(parent *types.Block, current *types.BlockDetail) error {
err := checkMinerTx(current)
......@@ -371,8 +400,13 @@ func (client *client) CreateGenesisTx() (ret []*types.Transaction) {
return
}
func (client *client) isParaSelfConsensusForked(height int64) bool {
return height > client.subCfg.MainParaSelfConsensusForkHeight
func (client *client) getSelfConsEnableStatus(height int64) *paraSelfConsEnable {
for i := len(client.subCfg.selfConsensusEnable) - 1; i >= 0; i-- {
if height >= client.subCfg.selfConsensusEnable[i].BlockHeight {
return client.subCfg.selfConsensusEnable[i]
}
}
panic(fmt.Sprintf("para selfConsensusEnable not set for height=%d", height))
}
func (client *client) ProcEvent(msg *queue.Message) bool {
......
......@@ -44,11 +44,13 @@ type commitMsgClient struct {
chainHeight int64
sendingHeight int64
consensHeight int64
consensStartHeight int64
consensDoneHeight int64
selfConsensForked int32 //自共识比主链共识更高的异常场景,需要等待自共识<=主链共识再发送
authAccountIn bool
isRollBack int32
checkTxCommitTimes int32
txFeeRate int64
selfConsensMap map[int64]bool //selfConsensEnable 分段 map
privateKey crypto.PrivKey
quit chan struct{}
mutex sync.Mutex
......@@ -153,8 +155,8 @@ func (client *commitMsgClient) sendCommitTx() {
consensHeight := client.getConsensusHeight()
//只有从未共识过,才可以设置从初始起始高度跳跃
if consensHeight == -1 && consensHeight < client.consensStartHeight {
consensHeight = client.consensStartHeight
if consensHeight == -1 && consensHeight < client.consensDoneHeight {
consensHeight = client.consensDoneHeight
}
chainHeight := atomic.LoadInt64(&client.chainHeight)
......@@ -294,6 +296,11 @@ func (client *commitMsgClient) isSync() bool {
return false
}
if atomic.LoadInt32(&client.selfConsensForked) != 0 {
plog.Info("para is not Sync", "selfConsensForked", atomic.LoadInt32(&client.selfConsensForked))
return false
}
if !client.authAccountIn {
plog.Info("para is not Sync", "authAccountIn", client.authAccountIn)
return false
......@@ -397,7 +404,8 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
cfg := client.paraClient.GetAPI().GetConfig()
for _, status := range notifications {
execName := pt.ParaX
if client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight) {
stat := client.paraClient.getSelfConsEnableStatus(status.Height)
if stat.Enable {
execName = paracross.GetExecName(cfg)
}
tx, err := paracross.CreateRawCommitTx4MainChain(cfg, status, execName, feeRate)
......@@ -418,7 +426,8 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, error) {
execName := pt.ParaX
cfg := client.paraClient.GetAPI().GetConfig()
if client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight) {
stat := client.paraClient.getSelfConsEnableStatus(status.Height)
if stat.Enable {
execName = paracross.GetExecName(cfg)
}
tx, err := paracross.CreateRawCommitTx4MainChain(cfg, status, execName, feeRate)
......@@ -591,6 +600,8 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
return nil, nil
}
client.setConsensStart(ret)
//clear flag
for _, v := range ret {
v.NonCommitTxCounts = 0
......@@ -600,6 +611,28 @@ func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossN
}
// 主链共识可能从-1跃迁或者自共识可能分阶段开启场景,设置起始高度标志,主链只需要根据是否起始高度来允许跃迁
// 1. 主链ParaConsensStartHeight 在selfCons设置的Disable范围没问题,只有主链做共识,平行链收不到共识交易
// 2. 主链ParaConsensStartHeight 超过selfCons StartHeight也没问题, 自共识也从ParaConsensStartHeight 开始做自共识
func (client *commitMsgClient) setConsensStart(invs []*pt.ParacrossNodeStatus) {
for _, v := range invs {
//平行链自共识
v.IsStartHeight = client.selfConsensMap[v.Height]
//主链共识起始高度,只允许从-1跃迁,否则可能存在未共识的asset withdraw交易
if v.Height == client.paraClient.subCfg.ParaConsensStartHeight {
v.IsStartHeight = true
}
}
}
func (client *commitMsgClient) setSelfConsensMap(invs []*paraSelfConsEnable) {
for _, v := range invs {
client.selfConsensMap[v.BlockHeight] = v.Enable
}
}
func (client *commitMsgClient) getGenesisNodeStatus() (*pt.ParacrossNodeStatus, error) {
var status pt.ParacrossNodeStatus
req := &types.ReqBlocks{Start: 0, End: 0}
......@@ -656,17 +689,8 @@ out:
isSync = true
}
status, err := client.getMainConsensusStatus()
if err != nil {
continue
}
//如果主链的共识高度产生了回滚,本地链也需要重新检查共识高度
if status.Height < atomic.LoadInt64(&client.consensHeight) {
atomic.StoreInt64(&client.consensHeight, status.Height)
client.resetNotify()
} else {
atomic.StoreInt64(&client.consensHeight, status.Height)
if client.paraClient.authAccount != "" {
client.GetProperFeeRate()
}
selfHeight := int64(-2)
......@@ -675,12 +699,26 @@ out:
selfHeight = selfStatus.Height
}
if client.paraClient.authAccount != "" {
client.GetProperFeeRate()
mainStatus, err := client.getMainConsensusStatus()
if err != nil {
continue
}
//如果主链的共识高度小于自共识高度,需要等待自共识回滚
if mainStatus.Height < selfHeight {
atomic.StoreInt32(&client.selfConsensForked, 1)
} else {
atomic.StoreInt32(&client.selfConsensForked, 0)
}
plog.Info("para consensusHeight", "mainHeight", status.Height, "selfHeight", selfHeight)
preHeight := atomic.LoadInt64(&client.consensHeight)
atomic.StoreInt64(&client.consensHeight, mainStatus.Height)
//如果主链的共识高度产生了回滚,本地链也需要重新检查共识高度,不然可能会一直等待共识追赶上来
if mainStatus.Height < preHeight {
client.resetNotify()
}
plog.Info("para consensusHeight", "mainHeight", mainStatus.Height, "selfHeight", selfHeight)
}
}
......@@ -708,7 +746,8 @@ func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, er
return nil, err
}
cfg := client.paraClient.GetAPI().GetConfig()
if client.paraClient.isParaSelfConsensusForked(block.MainHeight) {
selfCons := client.paraClient.getSelfConsEnableStatus(block.Height)
if selfCons.Enable {
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
......@@ -719,29 +758,15 @@ func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, er
plog.Error("getSelfConsensusStatus ", "err", err.Error())
return nil, err
}
resp, ok := ret.(*pt.ParacrossStatus)
status, ok := ret.(*pt.ParacrossStatus)
if !ok {
plog.Error("getSelfConsensusStatus ParacrossStatus nok")
return nil, err
}
//开启自共识后也要等到自共识真正切换之后再使用,如果本地区块已经过了自共识高度,但自共识的高度还没达成,就会导致共识机制出错
if resp.Height > -1 {
var statusMainHeight int64
if pt.IsParaForkHeight(cfg, resp.MainHeight, pt.ForkLoopCheckCommitTxDone) {
statusMainHeight = resp.MainHeight
} else {
block, err := client.paraClient.GetBlockByHeight(resp.Height)
if err != nil {
plog.Error("getSelfConsensusStatus GetBlocks", "err", err.Error())
return nil, err
}
statusMainHeight = block.MainHeight
}
//本地共识高度对应主链高度一定要高于自共识高度,为了适配平行链共识高度不连续场景
if client.paraClient.isParaSelfConsensusForked(statusMainHeight) {
return resp, nil
}
//本地共识高度一定要高于阶段共识高度的起始高度,为了适配平行链共识高度不连续场景,一定要设置新自共识起始高度高于当前主链共识高度,否则不生效
if status.Height >= selfCons.BlockHeight {
return status, nil
}
}
return nil, types.ErrNotFound
......
......@@ -369,9 +369,11 @@ func (client *blockSyncClient) addMinerTx(preStateHash []byte, block *types.Bloc
status.PreBlockHash = block.ParentHash
status.PreStateHash = preStateHash
}
selfCons := client.paraClient.getSelfConsEnableStatus(status.Height)
tx, err := pt.CreateRawMinerTx(cfg, &pt.ParacrossMinerAction{
Status: status,
IsSelfConsensus: client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight),
IsSelfConsensus: selfCons.Enable,
})
if err != nil {
return err
......
......@@ -456,7 +456,7 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
if commit.Status.Height > titleStatus.Height+1 {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
//平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始
allowJump, err := a.isAllowConsensJump(stat, titleStatus)
allowJump, err := a.isAllowConsensJump(commit, titleStatus)
if err != nil {
return nil, err
}
......@@ -707,29 +707,12 @@ func (a *action) isAllowMainConsensJump(commit *pt.ParacrossHeightStatus, titleS
return false, nil
}
//平行链自共识无缝切换条件:1,平行链没有共识过,2:commit高度是大于自共识分叉高度且上一次共识的主链高度小于自共识分叉高度,保证只运行一次,
// 这样在主链没有共识空洞前提下,平行链允许有条件的共识跳跃
func (a *action) isAllowParaConsensJump(commit *pt.ParacrossHeightStatus, titleStatus *pt.ParacrossStatus) (bool, error) {
if titleStatus.Height == -1 {
return true, nil
}
cfg := a.api.GetConfig()
selfConsensForkHeight := pt.GetDappForkHeight(cfg, pt.ParaSelfConsensForkHeight)
lastStatusMainHeight := int64(-1)
if titleStatus.Height > -1 {
s, err := getTitleHeight(a.db, calcTitleHeightKey(commit.Title, titleStatus.Height))
if err != nil {
clog.Error("paracross.Commit isAllowConsensJump getTitleHeight failed", "err", err.Error())
return false, err
}
lastStatusMainHeight = s.MainHeight
}
return commit.MainHeight > selfConsensForkHeight && lastStatusMainHeight < selfConsensForkHeight, nil
//平行链自共识无缝切换条件:commit height为自共识分段起始高度
func (a *action) isAllowParaConsensJump(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
return commit.Status.IsStartHeight, nil
}
func (a *action) isAllowConsensJump(commit *pt.ParacrossHeightStatus, titleStatus *pt.ParacrossStatus) (bool, error) {
func (a *action) isAllowConsensJump(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
cfg := a.api.GetConfig()
if cfg.IsPara() {
return a.isAllowParaConsensJump(commit, titleStatus)
......
......@@ -188,6 +188,7 @@ message ParacrossNodeStatus {
bytes crossTxResult = 12;
repeated bytes crossTxHashs = 13;
uint32 nonCommitTxCounts = 14;
bool isStartHeight = 15;
}
message ParacrossCommitAction {
......
......@@ -26,6 +26,8 @@ var (
ForkLoopCheckCommitTxDone = "ForkLoopCheckCommitTxDone"
// MainLoopCheckCommitTxDoneForkHeight 平行链的配置项,对应主链的ForkLoopCheckCommitTxDone高度
MainLoopCheckCommitTxDoneForkHeight = "MainLoopCheckCommitTxDoneForkHeight"
// ForkConsensSupportJump 支持主链共识从-1开始跳跃一次
ForkConsensSupportJump = "ForkConsensSupportJump"
)
func init() {
......@@ -41,6 +43,7 @@ func InitFork(cfg *types.Chain33Config) {
cfg.RegisterDappFork(ParaX, "ForkParacrossWithdrawFromParachain", 1298600)
cfg.RegisterDappFork(ParaX, ForkCommitTx, 1850000)
cfg.RegisterDappFork(ParaX, ForkLoopCheckCommitTxDone, 3230000)
cfg.RegisterDappFork(ParaX, ForkConsensSupportJump, types.MaxHeight)
}
func InitExecutor(cfg *types.Chain33Config) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment