Commit 66372814 authored by mdj33's avatar mdj33 Committed by vipwzw

fix commit notify del block seq issue

parent 022584d9
...@@ -31,7 +31,7 @@ const ( ...@@ -31,7 +31,7 @@ const (
consensusInterval = 10 //about 1 new block interval consensusInterval = 10 //about 1 new block interval
minerInterval = 10 //5s的主块间隔后分叉概率增加,10s可以消除一些分叉回退 minerInterval = 10 //5s的主块间隔后分叉概率增加,10s可以消除一些分叉回退
waitBlocks4CommitMsg int32 = 3 waitBlocks4CommitMsg int32 = 5 //commit msg共识发送后等待几个块没确认则重发
waitConsensStopTimes uint32 = 30 //30*10s = 5min waitConsensStopTimes uint32 = 30 //30*10s = 5min
) )
...@@ -42,7 +42,7 @@ type paraSelfConsEnable struct { ...@@ -42,7 +42,7 @@ type paraSelfConsEnable struct {
type commitMsgClient struct { type commitMsgClient struct {
paraClient *client paraClient *client
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2 waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数 waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
resetCh chan interface{} resetCh chan interface{}
sendMsgCh chan *types.Transaction sendMsgCh chan *types.Transaction
...@@ -130,8 +130,8 @@ func (client *commitMsgClient) resetNotify() { ...@@ -130,8 +130,8 @@ func (client *commitMsgClient) resetNotify() {
} }
//新的区块产生,检查是否有commitTx正在发送入口 //新的区块产生,检查是否有commitTx正在发送入口
func (client *commitMsgClient) commitTxCheckNotify(txs []*types.TxDetail) { func (client *commitMsgClient) commitTxCheckNotify(block *types.ParaTxDetail) {
if client.checkCommitTxSuccess(txs) { if client.checkCommitTxSuccess(block) {
client.sendCommitTx() client.sendCommitTx()
} }
} }
...@@ -200,11 +200,19 @@ func (client *commitMsgClient) sendCommitTx() { ...@@ -200,11 +200,19 @@ func (client *commitMsgClient) sendCommitTx() {
} }
func (client *commitMsgClient) verifyTx(curTx *types.Transaction, verifyTxs map[string]bool) bool { func (client *commitMsgClient) verifyTx(curTx *types.Transaction, verifyTxs map[string]bool, addType int64) bool {
//验证通过
if verifyTxs[string(curTx.Hash())] { if verifyTxs[string(curTx.Hash())] {
client.setCurrentTx(nil) client.setCurrentTx(nil)
return true return true
} }
//当前addType是回滚,则不计数,如果有累计则撤销上次累计次数,重新计数
if addType != types.AddBlock{
if client.checkTxCommitTimes > 0{
client.checkTxCommitTimes--
}
return false
}
client.checkTxCommitTimes++ client.checkTxCommitTimes++
if client.checkTxCommitTimes >= client.waitMainBlocks { if client.checkTxCommitTimes >= client.waitMainBlocks {
...@@ -216,7 +224,7 @@ func (client *commitMsgClient) verifyTx(curTx *types.Transaction, verifyTxs map[ ...@@ -216,7 +224,7 @@ func (client *commitMsgClient) verifyTx(curTx *types.Transaction, verifyTxs map[
} }
func (client *commitMsgClient) checkCommitTxSuccess(txs []*types.TxDetail) bool { func (client *commitMsgClient) checkCommitTxSuccess(block *types.ParaTxDetail) bool {
client.mutex.Lock() client.mutex.Lock()
defer client.mutex.Unlock() defer client.mutex.Unlock()
...@@ -225,10 +233,11 @@ func (client *commitMsgClient) checkCommitTxSuccess(txs []*types.TxDetail) bool ...@@ -225,10 +233,11 @@ func (client *commitMsgClient) checkCommitTxSuccess(txs []*types.TxDetail) bool
return false return false
} }
//使用map 比每个交易hash byte比较效率应该会高些
txMap := make(map[string]bool) txMap := make(map[string]bool)
//committx是平行链交易 //committx是平行链交易
if types.IsParaExecName(string(curTx.Execer)) { if types.IsParaExecName(string(curTx.Execer)) {
for _, tx := range txs { for _, tx := range block.TxDetails {
if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk { if bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) && tx.Receipt.Ty == types.ExecOk {
txMap[string(tx.Tx.Hash())] = true txMap[string(tx.Tx.Hash())] = true
} }
...@@ -250,7 +259,7 @@ func (client *commitMsgClient) checkCommitTxSuccess(txs []*types.TxDetail) bool ...@@ -250,7 +259,7 @@ func (client *commitMsgClient) checkCommitTxSuccess(txs []*types.TxDetail) bool
return false return false
} }
return client.verifyTx(curTx, txMap) return client.verifyTx(curTx, txMap,block.Type)
} }
//如果共识高度一直没有追上发送高度,且当前发送高度已经上链,说明共识一直没达成,安全起见,超过停止次数后,重发 //如果共识高度一直没有追上发送高度,且当前发送高度已经上链,说明共识一直没达成,安全起见,超过停止次数后,重发
......
...@@ -244,11 +244,11 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) { ...@@ -244,11 +244,11 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
if lastSeq-currSeq > client.subCfg.BatchFetchBlockCount { if lastSeq-currSeq > client.subCfg.BatchFetchBlockCount {
return client.subCfg.BatchFetchBlockCount - 1, nil return client.subCfg.BatchFetchBlockCount - 1, nil
} }
return 0, nil return 1, nil
} }
if lastSeq == currSeq { if lastSeq == currSeq {
return 0, nil return 1, nil
} }
// lastSeq = currSeq -1 // lastSeq = currSeq -1
...@@ -341,7 +341,7 @@ func (client *client) requestTxsFromBlock(currSeq int64, preMainBlockHash []byte ...@@ -341,7 +341,7 @@ func (client *client) requestTxsFromBlock(currSeq int64, preMainBlockHash []byte
func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) { func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
cfg := client.GetAPI().GetConfig() cfg := client.GetAPI().GetConfig()
req := &types.ReqParaTxByTitle{IsSeq: true, Start: currSeq, End: currSeq + count, Title: cfg.GetTitle()} req := &types.ReqParaTxByTitle{IsSeq: true, Start: currSeq, End: currSeq + count-1, Title: cfg.GetTitle()}
details, err := client.GetParaTxByTitle(req) details, err := client.GetParaTxByTitle(req)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -350,9 +350,15 @@ func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBl ...@@ -350,9 +350,15 @@ func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBl
details = validMainBlocks(details) details = validMainBlocks(details)
err = verifyMainBlocks(preMainBlockHash, details) err = verifyMainBlocks(preMainBlockHash, details)
if err != nil { if err != nil {
plog.Error("requestTxsOnlyPara", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash)) plog.Error("requestFilterParaTxs", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, err return nil, err
} }
//至少应该返回1个
if len(details.Items) == 0{
plog.Error("requestFilterParaTxs ret nil", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
return nil, types.ErrNotFound
}
return details, nil return details, nil
} }
...@@ -533,9 +539,9 @@ out: ...@@ -533,9 +539,9 @@ out:
plog.Debug("para CreateBlock count not match", "count", count, "items", len(paraTxs.Items)) plog.Debug("para CreateBlock count not match", "count", count, "items", len(paraTxs.Items))
count = int64(len(paraTxs.Items)) count = int64(len(paraTxs.Items))
} }
//如果超过1个block,则认为当前正在追赶,暂不处理 //如果当前正在追赶,暂不处理
if client.commitMsgClient.authAccount != "" && len(paraTxs.Items) == 1 { if client.commitMsgClient.authAccount != "" && client.isCaughtUp() && len(paraTxs.Items) > 0{
client.commitMsgClient.commitTxCheckNotify(paraTxs.Items[0].TxDetails) client.commitMsgClient.commitTxCheckNotify(paraTxs.Items[0])
} }
err = client.procLocalBlocks(paraTxs) err = client.procLocalBlocks(paraTxs)
......
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package para
import (
"testing"
"github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert"
)
func TestGetHeightsArry(t *testing.T) {
h0 := &types.BlockInfo{Height: 1}
h1 := &types.BlockInfo{Height: 3}
h2 := &types.BlockInfo{Height: 5}
h3 := &types.BlockInfo{Height: 6}
h4 := &types.BlockInfo{Height: 9}
h5 := &types.BlockInfo{Height: 15}
h6 := &types.BlockInfo{Height: 21}
h7 := &types.BlockInfo{Height: 25}
h8 := &types.BlockInfo{Height: 31}
h9 := &types.BlockInfo{Height: 41}
heights := []*types.BlockInfo{h0, h1, h2, h3, h4, h5, h6, h7, h8, h9}
hh := getHeightsArry(heights, 3)
h11 := []*types.BlockInfo{h0, h1, h2}
h12 := []*types.BlockInfo{h3, h4, h5}
h13 := []*types.BlockInfo{h6, h7, h8}
h14 := []*types.BlockInfo{h9}
expect := [][]*types.BlockInfo{h11, h12, h13, h14}
assert.Equal(t, expect, hh)
s, e := getStartEndHeight(0, 100, hh, 0)
assert.Equal(t, int64(0), s)
assert.Equal(t, h2.Height, e)
s, e = getStartEndHeight(0, 100, hh, 1)
assert.Equal(t, h2.Height+1, s)
assert.Equal(t, h5.Height, e)
s, e = getStartEndHeight(0, 100, hh, 2)
assert.Equal(t, h5.Height+1, s)
assert.Equal(t, h8.Height, e)
s, e = getStartEndHeight(0, 100, hh, 3)
assert.Equal(t, h8.Height+1, s)
assert.Equal(t, int64(100), e)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment