Commit 34453ff5 authored by mdj33's avatar mdj33 Committed by vipwzw

multi download para tx

parent 995012f1
...@@ -242,7 +242,7 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) { ...@@ -242,7 +242,7 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
atomic.StoreInt32(&client.caughtUp, 1) atomic.StoreInt32(&client.caughtUp, 1)
} }
if lastSeq-currSeq > client.subCfg.BatchFetchBlockCount { if lastSeq-currSeq > client.subCfg.BatchFetchBlockCount {
return client.subCfg.BatchFetchBlockCount - 1, nil return client.subCfg.BatchFetchBlockCount, nil
} }
return 1, nil return 1, nil
} }
......
...@@ -125,7 +125,6 @@ func (j *jumpDldClient) getParaHeights(startHeight, endHeight int64) ([]*types.B ...@@ -125,7 +125,6 @@ func (j *jumpDldClient) getParaHeights(startHeight, endHeight int64) ([]*types.B
return nil, errors.New("verify fail or main thread cancel") return nil, errors.New("verify fail or main thread cancel")
} }
} }
return heightList, nil
} }
//把不连续的平行链区块高度按offset分成二维数组,方便后面处理 //把不连续的平行链区块高度按offset分成二维数组,方便后面处理
...@@ -162,7 +161,7 @@ func (j *jumpDldClient) verifyTxMerkleRoot(tx *types.ParaTxDetail, headMap map[i ...@@ -162,7 +161,7 @@ func (j *jumpDldClient) verifyTxMerkleRoot(tx *types.ParaTxDetail, headMap map[i
for _, t := range tx.TxDetails { for _, t := range tx.TxDetails {
verifyTxs = append(verifyTxs, t.Tx) verifyTxs = append(verifyTxs, t.Tx)
} }
verifyTxRoot := merkle.CalcMerkleRoot(verifyTxs) verifyTxRoot := merkle.CalcMerkleRoot(j.paraClient.GetAPI().GetConfig(), tx.Header.Height, verifyTxs)
if !bytes.Equal(verifyTxRoot, tx.ChildHash) { if !bytes.Equal(verifyTxRoot, tx.ChildHash) {
plog.Error("jumpDldClient.verifyTxMerkelHash", "height", tx.Header.Height, plog.Error("jumpDldClient.verifyTxMerkelHash", "height", tx.Header.Height,
"calcHash", common.ToHex(verifyTxRoot), "rcvHash", common.ToHex(tx.ChildHash)) "calcHash", common.ToHex(verifyTxRoot), "rcvHash", common.ToHex(tx.ChildHash))
...@@ -230,6 +229,31 @@ func (j *jumpDldClient) processTxJobs(ch chan *paraTxBlocksJob) { ...@@ -230,6 +229,31 @@ func (j *jumpDldClient) processTxJobs(ch chan *paraTxBlocksJob) {
} }
} }
//按高度list请求平行链区块,服务器有可能返回少于请求高度,少于时候需要继续请求
func (j *jumpDldClient) fetchHeightListBlocks(hlist []int64, title string) (*types.ParaTxDetails, error) {
index := 0
retBlocks := &types.ParaTxDetails{}
for {
list := hlist[index:]
req := &types.ReqParaTxByHeight{Items: list, Title: title}
blocks, err := j.paraClient.GetParaTxByHeight(req)
if err != nil {
plog.Error("jumpDld.getParaTxs fetchHeightListBlocks", "start", list[0], "end", list[len(list)-1], "title", title)
return nil, err
}
retBlocks.Items = append(retBlocks.Items, blocks.Items...)
index += len(blocks.Items)
if index == len(hlist) {
return retBlocks, nil
}
//从逻辑上应该不会有大于场景出现
if index > len(hlist) {
plog.Error("jumpDld.getParaTxs fetchHeightListBlocks len", "index", index, "len", len(hlist), "start", list[0], "end", list[len(list)-1], "title", title)
return nil, err
}
}
}
func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*types.BlockInfo, ch chan *paraTxBlocksJob) error { func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*types.BlockInfo, ch chan *paraTxBlocksJob) error {
title := j.paraClient.GetAPI().GetConfig().GetTitle() title := j.paraClient.GetAPI().GetConfig().GetTitle()
heightsArr := getHeightsArry(heights, int(types.MaxBlockCountPerTime)) heightsArr := getHeightsArry(heights, int(types.MaxBlockCountPerTime))
...@@ -239,8 +263,8 @@ func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*type ...@@ -239,8 +263,8 @@ func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*type
for _, h := range single { for _, h := range single {
hlist = append(hlist, h.Height) hlist = append(hlist, h.Height)
} }
req := &types.ReqParaTxByHeight{Items: hlist, Title: title}
blocks, err := j.paraClient.GetParaTxByHeight(req) blocks, err := j.fetchHeightListBlocks(hlist, title)
if err != nil { if err != nil {
plog.Error("jumpDld.getParaTxs getParaTx", "start", hlist[0], "end", hlist[len(hlist)-1], "title", title) plog.Error("jumpDld.getParaTxs getParaTx", "start", hlist[0], "end", hlist[len(hlist)-1], "title", title)
return err return err
......
...@@ -8,7 +8,9 @@ import ( ...@@ -8,7 +8,9 @@ import (
"testing" "testing"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
typesmocks "github.com/33cn/chain33/types/mocks"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
) )
func TestGetHeightsArry(t *testing.T) { func TestGetHeightsArry(t *testing.T) {
...@@ -49,3 +51,35 @@ func TestGetHeightsArry(t *testing.T) { ...@@ -49,3 +51,35 @@ func TestGetHeightsArry(t *testing.T) {
assert.Equal(t, h8.Height+1, s) assert.Equal(t, h8.Height+1, s)
assert.Equal(t, int64(100), e) assert.Equal(t, int64(100), e)
} }
func TestFetchHeightListBlocks(t *testing.T) {
para := &client{}
grpcClient := &typesmocks.Chain33Client{}
para.grpcClient = grpcClient
jump := &jumpDldClient{paraClient: para}
b1 := &types.ParaTxDetail{Header: &types.Header{Height: 1}}
b2 := &types.ParaTxDetail{Header: &types.Header{Height: 2}}
b3 := &types.ParaTxDetail{Header: &types.Header{Height: 3}}
b4 := &types.ParaTxDetail{Header: &types.Header{Height: 4}}
b5 := &types.ParaTxDetail{Header: &types.Header{Height: 5}}
b6 := &types.ParaTxDetail{Header: &types.Header{Height: 6}}
b7 := &types.ParaTxDetail{Header: &types.Header{Height: 7}}
b8 := &types.ParaTxDetail{Header: &types.Header{Height: 8}}
b9 := &types.ParaTxDetail{Header: &types.Header{Height: 9}}
blocks1 := &types.ParaTxDetails{Items: []*types.ParaTxDetail{b1, b2, b3}}
blocks2 := &types.ParaTxDetails{Items: []*types.ParaTxDetail{b4, b5, b6, b7}}
blocks3 := &types.ParaTxDetails{Items: []*types.ParaTxDetail{b8, b9}}
grpcClient.On("GetParaTxByHeight", mock.Anything, mock.Anything).Return(blocks1, nil).Once()
grpcClient.On("GetParaTxByHeight", mock.Anything, mock.Anything).Return(blocks2, nil).Once()
grpcClient.On("GetParaTxByHeight", mock.Anything, mock.Anything).Return(blocks3, nil).Once()
allBlocks := &types.ParaTxDetails{}
allBlocks.Items = append(allBlocks.Items, blocks1.Items...)
allBlocks.Items = append(allBlocks.Items, blocks2.Items...)
allBlocks.Items = append(allBlocks.Items, blocks3.Items...)
hlist := []int64{1, 2, 3, 4, 5, 6, 7, 8, 9}
blocks, err := jump.fetchHeightListBlocks(hlist, "title")
assert.NoError(t, err)
assert.Equal(t, allBlocks.Items, blocks.Items)
}
...@@ -156,10 +156,10 @@ func (client *client) GetParaTxByHeight(req *types.ReqParaTxByHeight) (*types.Pa ...@@ -156,10 +156,10 @@ func (client *client) GetParaTxByHeight(req *types.ReqParaTxByHeight) (*types.Pa
return nil, err return nil, err
} }
if len(req.Items) != len(blocks.Items) { //可以小于等于,不能大于
plog.Error("GetParaTxByHeight get block tx count fail", "req", len(req.Items), "rsp", len(blocks.Items)) if len(blocks.Items) > len(req.Items) {
plog.Error("GetParaTxByHeight get blocks more than req")
return nil, types.ErrInvalidParam return nil, types.ErrInvalidParam
} }
return blocks, nil return blocks, nil
} }
...@@ -408,9 +408,9 @@ paracross_testSelfConsensStages() { ...@@ -408,9 +408,9 @@ paracross_testSelfConsensStages() {
echo "send vote 1" echo "send vote 1"
chain33_SignAndSendTx "$rawtx" "$KS_PRI" "${para_ip}" chain33_SignAndSendTx "$rawtx" "$KS_PRI" "${para_ip}"
echo "send vote 2" echo "send vote 2"
chain33_SignAndSendTx "$rawtx" "$JR_PRI" "${para_ip}" "110s" chain33_SignAndSendTx "$rawtx" "$JR_PRI" "${para_ip}" "130s"
echo "send vote 3" echo "send vote 3"
chain33_SignAndSendTx "$rawtx" "$NL_PRI" "${para_ip}" "111s" chain33_SignAndSendTx "$rawtx" "$NL_PRI" "${para_ip}" "140s"
echo "query status" echo "query status"
req='"method":"Chain33.Query","params":[{ "execer":"paracross", "funcName":"ListSelfStages","payload":{"status":3,"count":1}}]' req='"method":"Chain33.Query","params":[{ "execer":"paracross", "funcName":"ListSelfStages","payload":{"status":3,"count":1}}]'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment