Commit f500379f authored by mdj33's avatar mdj33 Committed by vipwzw

commit msg get local consens height

parent c6fa4e0b
......@@ -162,7 +162,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
quitCreate: make(chan struct{}),
}
waitBlocks := int32(2) //最小是2
waitBlocks := int32(3) //最小是2
if subcfg.WaitBlocks4CommitMsg > 0 {
if subcfg.WaitBlocks4CommitMsg < waitBlocks {
panic("config WaitBlocks4CommitMsg should not less 2")
......@@ -181,7 +181,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
waitConsensStopTimes: waitConsensTimes,
commitCh: make(chan int64, 1),
resetCh: make(chan int64, 1),
verifyCh: make(chan []byte, 1),
verifyCh: make(chan bool, 1),
consensHeight: -2,
sendingHeight: -1,
quit: make(chan struct{}),
......
......@@ -13,8 +13,6 @@ import (
"sync/atomic"
"unsafe"
"bytes"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
......@@ -24,7 +22,7 @@ import (
)
var (
consensusInterval = 10 //about 1 new block interval
consensusInterval = 5 //about 1 new block interval
minerInterval = 10 //5s的主块间隔后分叉概率增加,10s可以消除一些分叉回退
)
......@@ -34,28 +32,32 @@ type commitMsgClient struct {
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
commitCh chan int64
resetCh chan int64
verifyCh chan []byte
verifyCh chan bool
sendMsgCh chan *types.Transaction
minerSwitch int32
currentTx unsafe.Pointer
chainHeight int64
sendingHeight int64
consensHeight int64
authAccountIn int32
authAccountIn bool
isRollBack int32
checkTxCommitTimes int32
privateKey crypto.PrivKey
quit chan struct{}
}
type commitCheckParams struct {
consensStopTimes uint32
}
// 1. 链高度回滚,低于当前发送高度,需要重新计算当前发送高度,不然不会重新发送回滚的高度
// 2. 定时轮询是在比如锁定解锁钱包这类外部条件变化时候,其他输入条件不会触发时候及时响应,不然任何一个外部条件变化都触发一下发送,可能条件比较多
func (client *commitMsgClient) handler() {
var readTick <-chan time.Time
var consensStopTimes uint32
checkParams := &commitCheckParams{}
client.paraClient.wg.Add(1)
go client.getConsensusHeight()
go client.getMainConsensusHeight()
if client.paraClient.authAccount != "" {
client.paraClient.wg.Add(1)
......@@ -74,14 +76,14 @@ out:
case height := <-client.commitCh:
//如果回滚高度小于发送高度,需要reset发送参数,回滚完成后重新发送
if height < client.sendingHeight {
client.procResetSendTx()
continue
client.resetSendEnv()
}
client.procSendTx()
//出错场景入口,需要reset 重发
case <-client.resetCh:
client.procResetSendTx()
client.resetSendEnv()
client.procSendTx()
//发送成功后,验证是否commitTx上链
case verifyTx := <-client.verifyCh:
......@@ -89,7 +91,7 @@ out:
//例行检查发送入口
case <-readTick:
consensStopTimes = client.checkConsensusStop(consensStopTimes)
client.procChecks(checkParams)
client.procSendTx()
case <-client.quit:
......@@ -110,8 +112,8 @@ func (client *commitMsgClient) resetNotify() {
client.resetCh <- 1
}
func (client *commitMsgClient) verifyNotify(verifyTx []byte) {
client.verifyCh <- verifyTx
func (client *commitMsgClient) verifyNotify(verified bool) {
client.verifyCh <- verified
}
func (client *commitMsgClient) resetSendEnv() {
......@@ -123,8 +125,18 @@ func (client *commitMsgClient) clearCurrentTx() {
client.setCurrentTx(nil)
}
//自共识后直接从本地获取最新共识高度,没有自共识,获取主链的共识高度
func (client *commitMsgClient) getConsensusHeight() int64 {
status, err := client.getSelfConsensusStatus()
if err != nil {
return atomic.LoadInt64(&client.consensHeight)
}
return status.Height
}
func (client *commitMsgClient) procSendTx() {
consensHeight := atomic.LoadInt64(&client.consensHeight)
consensHeight := client.getConsensusHeight()
chainHeight := atomic.LoadInt64(&client.chainHeight)
sendingHeight := client.sendingHeight
plog.Info("para commitMsg---status", "chainHeight", chainHeight, "sendingHeight", sendingHeight,
......@@ -157,35 +169,32 @@ func (client *commitMsgClient) procSendTx() {
}
func (client *commitMsgClient) procVerifyTx(verifyTx []byte) {
func (client *commitMsgClient) procVerifyTx(verified bool) {
curTx := client.getCurrentTx()
if curTx == nil {
return
}
if len(verifyTx) == 0 {
client.checkTxCommitTimes++
if client.checkTxCommitTimes >= client.waitMainBlocks {
client.checkTxCommitTimes = 0
client.procResetSendTx()
}
if verified {
client.clearCurrentTx()
client.procSendTx()
return
}
if bytes.Equal(curTx.Hash(), verifyTx) {
client.clearCurrentTx()
client.checkTxCommitTimes++
if client.checkTxCommitTimes >= client.waitMainBlocks {
client.checkTxCommitTimes = 0
client.resetSendEnv()
client.procSendTx()
}
}
func (client *commitMsgClient) procResetSendTx() {
client.resetSendEnv()
client.procSendTx()
}
func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint32 {
if client.sendingHeight > atomic.LoadInt64(&client.consensHeight) && !client.isSendingCommitMsg() {
consensHeight := client.getConsensusHeight()
if client.sendingHeight > consensHeight && !client.isSendingCommitMsg() {
if consensStopTimes > client.waitConsensStopTimes {
plog.Debug("para commitMsg-checkConsensusStop", "times", consensStopTimes)
client.resetSendEnv()
return 0
}
......@@ -195,6 +204,26 @@ func (client *commitMsgClient) checkConsensusStop(consensStopTimes uint32) uint3
return 0
}
func (client *commitMsgClient) checkAuthAccountIn() {
nodes, err := client.getNodeGroupAddrs()
if err != nil {
return
}
authExist := strings.Contains(nodes, client.paraClient.authAccount)
//如果授权节点重新加入,需要从当前共识高度重新发送
if !client.authAccountIn && authExist {
client.resetSendEnv()
}
client.authAccountIn = authExist
}
func (client *commitMsgClient) procChecks(checks *commitCheckParams) {
checks.consensStopTimes = client.checkConsensusStop(checks.consensStopTimes)
client.checkAuthAccountIn()
}
func (client *commitMsgClient) isSync() bool {
height := atomic.LoadInt64(&client.chainHeight)
if height <= 0 {
......@@ -208,8 +237,8 @@ func (client *commitMsgClient) isSync() bool {
return false
}
if atomic.LoadInt32(&client.authAccountIn) != 1 {
plog.Info("para is not Sync", "authAccountIn", atomic.LoadInt32(&client.authAccountIn))
if !client.authAccountIn {
plog.Info("para is not Sync", "authAccountIn", client.authAccountIn)
return false
}
......@@ -548,7 +577,7 @@ func (client *commitMsgClient) mainSync() error {
}
func (client *commitMsgClient) getConsensusHeight() {
func (client *commitMsgClient) getMainConsensusHeight() {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false
defer ticker.Stop()
......@@ -567,32 +596,26 @@ out:
isSync = true
}
block, err := client.paraClient.getLastBlockInfo()
status, err := client.getMainConsensusStatus()
if err != nil {
continue
}
status, err := client.getConsensusStatus(block)
if err != nil {
continue
}
atomic.StoreInt64(&client.consensHeight, status.Height)
authExist := false
if client.paraClient.authAccount != "" {
nodes, err := client.getNodeGroupAddrs()
if err != nil {
continue
}
authExist = strings.Contains(nodes, client.paraClient.authAccount)
}
if authExist {
atomic.StoreInt32(&client.authAccountIn, 1)
//如果主链的共识高度产生了回滚,本地链也需要重新检查共识高度
if status.Height < atomic.LoadInt64(&client.consensHeight) {
plog.Debug("para getMainConsensusStatus rollback", "height", status.Height, "lastHeight", atomic.LoadInt64(&client.consensHeight))
atomic.StoreInt64(&client.consensHeight, status.Height)
client.resetNotify()
} else {
atomic.StoreInt32(&client.authAccountIn, 0)
atomic.StoreInt64(&client.consensHeight, status.Height)
}
plog.Debug("para getConsensusHeight", "height", status.Height, "AccoutIn", authExist)
selfHeight := int64(-2)
selfStatus, _ := client.getSelfConsensusStatus()
if selfStatus != nil {
selfHeight = selfStatus.Height
}
plog.Debug("para consensusHeight", "mainHeight", status.Height, "selfHeight", selfHeight)
}
}
......@@ -600,7 +623,12 @@ out:
client.paraClient.wg.Done()
}
func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.ParacrossStatus, error) {
func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, error) {
block, err := client.paraClient.getLastBlockInfo()
if err != nil {
return nil, err
}
if isParaSelfConsensusForked(block.MainHeight) {
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
......@@ -609,28 +637,44 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac
Param: types.Encode(&types.ReqString{Data: types.GetTitle()}),
})
if err != nil {
plog.Error("getConsensusHeight ", "err", err.Error())
plog.Error("getSelfConsensusStatus ", "err", err.Error())
return nil, err
}
resp, ok := ret.(*pt.ParacrossStatus)
if !ok {
plog.Error("getConsensusHeight ParacrossStatus nok")
plog.Error("getSelfConsensusStatus ParacrossStatus nok")
return nil, err
}
//开启自共识后也要等到自共识真正切换之后再使用,如果本地区块已经过了自共识高度,但自共识的高度还没达成,就会导致共识机制出错
if resp.Height > -1 {
req := &types.ReqBlocks{Start: resp.Height, End: resp.Height}
v, err := client.paraClient.GetAPI().GetBlocks(req)
if err != nil {
plog.Error("getConsensusHeight GetBlocks", "err", err.Error())
return nil, err
var statusMainHeight int64
if pt.IsParaForkHeight(resp.MainHeight, pt.ForkLoopCheckCommitTxDone) {
plog.Error("getSelfConsensusStatus ForkLoopCheckCommitTxDone reach")
statusMainHeight = resp.MainHeight
} else {
block, err := client.paraClient.GetBlockByHeight(resp.Height)
if err != nil {
plog.Error("getSelfConsensusStatus GetBlocks", "err", err.Error())
return nil, err
}
statusMainHeight = block.MainHeight
}
//本地共识高度对应主链高度一定要高于自共识高度,为了适配平行链共识高度不连续场景
if isParaSelfConsensusForked(v.Items[0].Block.MainHeight) {
if isParaSelfConsensusForked(statusMainHeight) {
return resp, nil
}
}
}
return nil, types.ErrNotFound
}
//通过grpc获取主链状态可能耗时,放在定时器里面处理
func (client *commitMsgClient) getMainConsensusStatus() (*pt.ParacrossStatus, error) {
block, err := client.paraClient.getLastBlockInfo()
if err != nil {
return nil, err
}
//去主链获取共识高度
reply, err := client.paraClient.grpcClient.QueryChain(context.Background(), &types.ChainExecutor{
......
......@@ -59,11 +59,8 @@ func (client *client) checkCommitTxSuccess(txs []*pt.TxDetail) {
}
}
if txMap[string(curTx.Hash())] {
client.commitMsgClient.verifyNotify(curTx.Hash())
} else {
client.commitMsgClient.verifyNotify(nil)
}
client.commitMsgClient.verifyNotify(txMap[string(curTx.Hash())])
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *pt.ParaTxDetail) error {
......
......@@ -69,32 +69,32 @@ func TestCalcCommitMsgTxs(t *testing.T) {
}
func TestGetConsensusStatus(t *testing.T) {
para := new(client)
grpcClient := &typesmocks.Chain33Client{}
//grpcClient.On("GetFork", mock.Anything, &types.ReqKey{Key: []byte("ForkBlockHash")}).Return(&types.Int64{Data: 1}, errors.New("err")).Once()
para.grpcClient = grpcClient
commitCli := new(commitMsgClient)
commitCli.paraClient = para
block := &types.Block{
Height: 1,
MainHeight: 10,
}
status := &pt.ParacrossStatus{
Height: 1,
}
reply := &types.Reply{
IsOk: true,
Msg: types.Encode(status),
}
grpcClient.On("QueryChain", mock.Anything, mock.Anything).Return(reply, nil).Once()
ret, err := commitCli.getConsensusStatus(block)
assert.Nil(t, err)
assert.Equal(t, int64(1), ret.Height)
}
//func TestGetConsensusStatus(t *testing.T) {
// para := new(client)
// grpcClient := &typesmocks.Chain33Client{}
// //grpcClient.On("GetFork", mock.Anything, &types.ReqKey{Key: []byte("ForkBlockHash")}).Return(&types.Int64{Data: 1}, errors.New("err")).Once()
// para.grpcClient = grpcClient
// commitCli := new(commitMsgClient)
// commitCli.paraClient = para
//
// block := &types.Block{
// Height: 1,
// MainHeight: 10,
// }
//
// status := &pt.ParacrossStatus{
// Height: 1,
// }
// reply := &types.Reply{
// IsOk: true,
// Msg: types.Encode(status),
// }
// grpcClient.On("QueryChain", mock.Anything, mock.Anything).Return(reply, nil).Once()
// ret, err := commitCli.getSelfConsensusStatus()
//
// assert.Nil(t, err)
// assert.Equal(t, int64(1), ret.Height)
//}
func TestSendCommitMsg(t *testing.T) {
para := new(client)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment