Commit 04edebce authored by mdj33's avatar mdj33 Committed by vipwzw

adjust commit msg ok

parent 02443aa3
...@@ -163,9 +163,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -163,9 +163,10 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
paraClient: para, paraClient: para,
waitMainBlocks: waitBlocks, waitMainBlocks: waitBlocks,
waitConsensStopTimes: waitConsensTimes, waitConsensStopTimes: waitConsensTimes,
commitNotify: make(chan int64, 1), commitCh: make(chan int64, 1),
resetNotify: make(chan int64, 1), resetCh: make(chan int64, 1),
chainHeight: -1, consensHeight: -2,
sendingHeight: -1,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
c.SetChild(para) c.SetChild(para)
...@@ -246,7 +247,7 @@ func (client *client) GetStartSeq(height int64) (int64, []byte) { ...@@ -246,7 +247,7 @@ func (client *client) GetStartSeq(height int64) (int64, []byte) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
if lastHeight < height { if lastHeight < height && lastHeight > 0 {
panic(fmt.Sprintf("lastHeight(%d) less than startHeight(%d) in mainchain", lastHeight, height)) panic(fmt.Sprintf("lastHeight(%d) less than startHeight(%d) in mainchain", lastHeight, height))
} }
...@@ -369,6 +370,16 @@ func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transacti ...@@ -369,6 +370,16 @@ func (client *client) createBlock(lastBlock *types.Block, txs []*types.Transacti
return err return err
} }
func (client *client) createBlockTemp(txs []*types.Transaction, mainBlock *types.BlockSeq) error{
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
return err
}
return client.createBlock(lastBlock,txs,0,mainBlock)
}
// 向blockchain写区块 // 向blockchain写区块
func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64) error { func (client *client) WriteBlock(prev []byte, paraBlock *types.Block, seq int64) error {
//共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail //共识模块不执行block,统一由blockchain模块执行block并做去重的处理,返回执行后的blockdetail
......
...@@ -30,8 +30,8 @@ type commitMsgClient struct { ...@@ -30,8 +30,8 @@ type commitMsgClient struct {
paraClient *client paraClient *client
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2 waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数 waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
commitNotify chan int64 commitCh chan int64
resetNotify chan int64 resetCh chan int64
sendMsgCh chan *types.Transaction sendMsgCh chan *types.Transaction
minerSwitch int32 minerSwitch int32
currentTx unsafe.Pointer currentTx unsafe.Pointer
...@@ -68,15 +68,15 @@ out: ...@@ -68,15 +68,15 @@ out:
for { for {
select { select {
//正常的触发检查 //正常的触发检查
case <-client.commitNotify: case <-client.commitCh:
//回滚场景 //回滚场景
if atomic.LoadInt64(&client.chainHeight) < client.sendingHeight { if atomic.LoadInt64(&client.chainHeight) < client.sendingHeight {
client.resetSendTx() client.clearSendingTx()
} }
client.procSendTx() client.procSendTx()
//发送出错场景,需要reset 重发 //发送出错场景,需要reset 重发
case <-client.resetNotify: case <-client.resetCh:
client.resetSendTx() client.clearSendingTx()
client.procSendTx() client.procSendTx()
//例行检查发送 //例行检查发送
case <-readTick: case <-readTick:
...@@ -91,14 +91,22 @@ out: ...@@ -91,14 +91,22 @@ out:
client.paraClient.wg.Done() client.paraClient.wg.Done()
} }
func (client *commitMsgClient) resetSendTx() { func (client *commitMsgClient) commitNotify() {
client.sendingHeight = 0 client.commitCh <- 1
}
func (client *commitMsgClient) resetNotify() {
client.resetCh <- 1
}
func (client *commitMsgClient) clearSendingTx() {
client.sendingHeight = -1
client.setCurrentTx(nil) client.setCurrentTx(nil)
} }
func (client *commitMsgClient) procSendTx() { func (client *commitMsgClient) procSendTx() {
plog.Debug("para readTick", "notify", atomic.LoadInt64(&client.chainHeight), plog.Info("para procSendTx ---send", "consensHeight",atomic.LoadInt64(&client.consensHeight),
"finishHeight", client.sendingHeight, "txIsNil", client.currentTx == nil, "sync", client.isSync()) "chainHeight", atomic.LoadInt64(&client.chainHeight),
"sendingHeight", client.sendingHeight, "isSendingTx", client.isSendingCommitMsg(), "sync", client.isSync())
if client.isSendingCommitMsg() || !client.isSync() { if client.isSendingCommitMsg() || !client.isSync() {
return return
} }
...@@ -116,7 +124,7 @@ func (client *commitMsgClient) procSendTx() { ...@@ -116,7 +124,7 @@ func (client *commitMsgClient) procSendTx() {
} }
//已发送,未共识场景 //已发送,未共识场景
if client.sendingHeight > consensHeight { if client.sendingHeight > -1 && client.sendingHeight > consensHeight {
return return
} }
...@@ -134,16 +142,25 @@ func (client *commitMsgClient) procSendTx() { ...@@ -134,16 +142,25 @@ func (client *commitMsgClient) procSendTx() {
} }
func (client *commitMsgClient) isSync() bool { func (client *commitMsgClient) isSync() bool {
chainHeight := atomic.LoadInt64(&client.chainHeight) height := atomic.LoadInt64(&client.chainHeight)
if chainHeight < 0 { if height <= 0 {
plog.Info("para isSync", "chainHeight",height)
return false
}
height = atomic.LoadInt64(&client.consensHeight)
if height == -2 {
plog.Info("para isSync", "consensHeight",height)
return false return false
} }
if atomic.LoadInt32(&client.authAccountIn) != 1 { if atomic.LoadInt32(&client.authAccountIn) != 1 {
plog.Info("para isSync ", "authAccountIn",atomic.LoadInt32(&client.authAccountIn))
return false return false
} }
if atomic.LoadInt32(&client.minerSwitch) == 0 { if atomic.LoadInt32(&client.minerSwitch) != 1 {
plog.Info("para isSync ", "minerSwitch",atomic.LoadInt32(&client.minerSwitch))
return false return false
} }
...@@ -175,9 +192,9 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type ...@@ -175,9 +192,9 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
} }
sendingMsgs := status[:count] sendingMsgs := status[:count]
plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer)) plog.Info("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
for i, msg := range sendingMsgs { for i, msg := range sendingMsgs {
plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight, plog.Info("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
"blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash), "blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
"from", client.paraClient.authAccount) "from", client.paraClient.authAccount)
} }
...@@ -205,6 +222,7 @@ func (client *commitMsgClient) updateChainHeight(height int64, isDel bool) { ...@@ -205,6 +222,7 @@ func (client *commitMsgClient) updateChainHeight(height int64, isDel bool) {
} }
atomic.StoreInt64(&client.chainHeight, height) atomic.StoreInt64(&client.chainHeight, height)
client.commitNotify()
} }
...@@ -219,14 +237,15 @@ func (client *commitMsgClient) checkSendingTxDone(txs map[string]bool) { ...@@ -219,14 +237,15 @@ func (client *commitMsgClient) checkSendingTxDone(txs map[string]bool) {
client.setCurrentTx(nil) client.setCurrentTx(nil)
atomic.StoreInt32(&client.checkTxCommitTimes, 0) atomic.StoreInt32(&client.checkTxCommitTimes, 0)
//继续处理 //继续处理
client.commitNotify <- 1 client.commitNotify()
return return
} }
atomic.AddInt32(&client.checkTxCommitTimes, 1) atomic.AddInt32(&client.checkTxCommitTimes, 1)
if atomic.LoadInt32(&client.checkTxCommitTimes) >= client.waitMainBlocks { if atomic.LoadInt32(&client.checkTxCommitTimes) >= client.waitMainBlocks {
atomic.StoreInt32(&client.checkTxCommitTimes, 0)
//重新发送 //重新发送
client.resetNotify <- 1 client.resetNotify()
} }
} }
...@@ -491,7 +510,7 @@ func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint3 ...@@ -491,7 +510,7 @@ func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint3
if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() { if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() {
consensStopTimes++ consensStopTimes++
if consensStopTimes > client.waitConsensStopTimes { if consensStopTimes > client.waitConsensStopTimes {
client.resetSendTx() client.clearSendingTx()
return 0 return 0
} }
return consensStopTimes return consensStopTimes
...@@ -553,6 +572,7 @@ out: ...@@ -553,6 +572,7 @@ out:
} else { } else {
atomic.StoreInt32(&client.authAccountIn, 0) atomic.StoreInt32(&client.authAccountIn, 0)
} }
plog.Info("para getConsensusHeight", "height",status.Height,"AccoutIn",authExist)
} }
} }
......
...@@ -74,7 +74,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er ...@@ -74,7 +74,7 @@ func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) er
return client.setLocalDb(set) return client.setLocalDb(set)
} }
func (client *client) checkTxInMainBlock(detail *types.BlockDetail) { func (client *client) checkCommitTxSuccess(detail *types.BlockDetail) {
if !client.isCaughtUp { if !client.isCaughtUp {
return return
} }
...@@ -107,8 +107,9 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty ...@@ -107,8 +107,9 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty
if err != nil { if err != nil {
return err return err
} }
client.checkTxInMainBlock(mainBlock.Detail) client.checkCommitTxSuccess(mainBlock.Detail)
return nil err = client.createBlockTemp(txs,mainBlock)
return err
} }
func (client *client) createLocalGenesisBlock(genesis *types.Block) error { func (client *client) createLocalGenesisBlock(genesis *types.Block) error {
......
...@@ -34,7 +34,7 @@ function para_set_toml() { ...@@ -34,7 +34,7 @@ function para_set_toml() {
sed -i $xsedfix 's/^Title.*/Title="user.p.'''$PARANAME'''."/g' "${1}" sed -i $xsedfix 's/^Title.*/Title="user.p.'''$PARANAME'''."/g' "${1}"
sed -i $xsedfix 's/^# TestNet=.*/TestNet=true/g' "${1}" sed -i $xsedfix 's/^# TestNet=.*/TestNet=true/g' "${1}"
sed -i $xsedfix 's/^startHeight=.*/startHeight=0/g' "${1}" sed -i $xsedfix 's/^startHeight=.*/startHeight=1/g' "${1}"
sed -i $xsedfix 's/^emptyBlockInterval=.*/emptyBlockInterval=4/g' "${1}" sed -i $xsedfix 's/^emptyBlockInterval=.*/emptyBlockInterval=4/g' "${1}"
sed -i $xsedfix '/^emptyBlockInterval=.*/a MainBlockHashForkHeight=1' "${1}" sed -i $xsedfix '/^emptyBlockInterval=.*/a MainBlockHashForkHeight=1' "${1}"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment