Commit a511b559 authored by mdj33's avatar mdj33 Committed by vipwzw

adjust download strategy

parent eb87ad16
......@@ -5,9 +5,8 @@
package para
import (
"sync"
"bytes"
"sync"
"sync/atomic"
"github.com/33cn/chain33/common"
......@@ -19,64 +18,14 @@ import (
type paraTxBlocksJob struct {
start int64
end int64
txBlocks *types.ParaTxDetails //有平行链交易的blocks
paraTxBlocks map[int64]*types.ParaTxDetail //有平行链交易的blocks
headers *types.ParaTxDetails
}
type jumpDldClient struct {
paraClient *client
downFail int32
wg sync.WaitGroup
mtx sync.Mutex
}
func (j *jumpDldClient) proSaveHeaders(job *types.Headers) {
if atomic.LoadInt32(&j.downFail) != 0 || j.paraClient.isCancel() {
return
}
err := j.paraClient.saveBatchMainHeaders(job)
if err != nil {
plog.Error("getAllHeaders---saveHeaders", "err", err)
atomic.StoreInt32(&j.downFail, 1)
}
}
func (j *jumpDldClient) saveHeaderJobs(ch chan *types.Headers) {
defer j.wg.Done()
for job := range ch {
j.proSaveHeaders(job)
}
}
func (j *jumpDldClient) getAllHeaders(startHeight, endHeight int64) error {
jobsCh := make(chan *types.Headers, defaultJobBufferNum)
j.wg.Add(1)
go j.saveHeaderJobs(jobsCh)
var ret error
for i := startHeight; i <= endHeight; i += types.MaxBlockCountPerTime {
end := i + types.MaxBlockCountPerTime - 1
if end > endHeight {
end = endHeight
}
blocks := &types.ReqBlocks{Start: i, End: end}
headers, err := j.paraClient.GetBlockHeaders(blocks)
if err != nil {
ret = err
break
}
plog.Info("paraJumpDownload.getAllHeaders", "start", headers.Items[0].Height, "end", headers.Items[len(headers.Items)-1].Height)
jobsCh <- headers
if j.paraClient.isCancel() {
ret = errors.New("main thread cancel")
break
}
}
close(jobsCh)
j.wg.Wait()
return ret
}
//校验按高度获取的block hash和前一步对应高度的blockhash比对
......@@ -95,7 +44,7 @@ func verifyBlockHahs(heights []*types.BlockInfo, blocks []*types.ParaTxDetail) e
return nil
}
func (j *jumpDldClient) getParaHeights(startHeight, endHeight int64) ([]*types.BlockInfo, error) {
func (j *jumpDldClient) getParaHeightList(startHeight, endHeight int64) ([]*types.BlockInfo, error) {
var heightList []*types.BlockInfo
title := j.paraClient.GetAPI().GetConfig().GetTitle()
lastHeight := int64(-1)
......@@ -103,7 +52,7 @@ func (j *jumpDldClient) getParaHeights(startHeight, endHeight int64) ([]*types.B
req := &types.ReqHeightByTitle{Height: lastHeight, Count: int32(types.MaxBlockCountPerTime), Direction: 1, Title: title}
heights, err := j.paraClient.GetParaHeightsByTitle(req)
if err != nil && err != types.ErrNotFound {
plog.Error("jumpDld.getParaTxs getHeights", "start", lastHeight, "count", req.Count, "title", title, "err", err)
plog.Error("jumpDld.getParaHeightList", "start", lastHeight, "count", req.Count, "title", title, "err", err)
return heightList, err
}
if err == types.ErrNotFound || heights == nil || len(heights.Items) <= 0 {
......@@ -128,7 +77,7 @@ func (j *jumpDldClient) getParaHeights(startHeight, endHeight int64) ([]*types.B
}
//把不连续的平行链区块高度按offset分成二维数组,方便后面处理
func getHeightsArry(heights []*types.BlockInfo, offset int) [][]*types.BlockInfo {
func splitHeights2Rows(heights []*types.BlockInfo, offset int) [][]*types.BlockInfo {
var ret [][]*types.BlockInfo
for i := 0; i < len(heights); i += offset {
end := i + offset
......@@ -142,7 +91,7 @@ func getHeightsArry(heights []*types.BlockInfo, offset int) [][]*types.BlockInfo
//按高度每次获取实际1000个有平行链交易的区块,这些区块并不一定连续,为了连续处理有交易和没有交易的区块,需要特殊设置起始结束高度,
//但每次处理的起始高度和结束高度都包含了有交易的1000个平行链高度
func getStartEndHeight(startHeight, endHeight int64, arr [][]*types.BlockInfo, i int) (int64, int64) {
func getHeaderStartEndRange(startHeight, endHeight int64, arr [][]*types.BlockInfo, i int) (int64, int64) {
single := arr[i]
s := startHeight
e := single[len(single)-1].Height
......@@ -182,17 +131,20 @@ func (j *jumpDldClient) process(job *paraTxBlocksJob) {
return
}
headMap := make(map[int64]*types.ParaTxDetail)
headers, err := j.paraClient.getBatchMainHeadersFromDb(job.start, job.end)
if err != nil {
plog.Error("jumpDldClient.process getBatchHeader", "start", job.start, "end", job.end)
atomic.StoreInt32(&j.downFail, 1)
return
}
for _, h := range headers.Items {
for _, h := range job.headers.Items {
headMap[h.Header.Height] = h
}
if job.txBlocks != nil {
for _, tx := range job.txBlocks.Items {
//收集header头尾区间有没有paraTxBlocks
txBlocks := &types.ParaTxDetails{}
for i := job.start; i <= job.end; i++ {
if job.paraTxBlocks[i] != nil {
txBlocks.Items = append(txBlocks.Items, job.paraTxBlocks[i])
}
}
if len(txBlocks.Items) > 0 {
for _, tx := range txBlocks.Items {
// 1. 校验平行链交易的区块头hash 和之前读取的主链头对应高度的块hash
if !bytes.Equal(tx.Header.Hash, headMap[tx.Header.Height].Header.Hash) {
plog.Error("jumpDldClient.process verifyhash", "height", tx.Header.Height,
......@@ -212,12 +164,11 @@ func (j *jumpDldClient) process(job *paraTxBlocksJob) {
headMap[tx.Header.Height].TxDetails = tx.TxDetails
}
}
err = j.paraClient.procLocalAddBlocks(headers)
err := j.paraClient.procLocalAddBlocks(job.headers)
if err != nil {
atomic.StoreInt32(&j.downFail, 1)
plog.Error("jumpDldClient.process procLocalAddBlocks", "start", job.start, "end", job.end, "err", err)
}
j.paraClient.rmvBatchMainBlocks(job.start, job.end)
}
......@@ -254,31 +205,78 @@ func (j *jumpDldClient) fetchHeightListBlocks(hlist []int64, title string) (*typ
}
}
func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*types.BlockInfo, ch chan *paraTxBlocksJob) error {
title := j.paraClient.GetAPI().GetConfig().GetTitle()
heightsArr := getHeightsArry(heights, int(types.MaxBlockCountPerTime))
for i, single := range heightsArr {
func (j *jumpDldClient) getParaTxsBlocks(blocksList []*types.BlockInfo, title string) (map[int64]*types.ParaTxDetail, error) {
var hlist []int64
for _, h := range single {
for _, h := range blocksList {
hlist = append(hlist, h.Height)
}
blocks, err := j.fetchHeightListBlocks(hlist, title)
if err != nil {
plog.Error("jumpDld.getParaTxs getParaTx", "start", hlist[0], "end", hlist[len(hlist)-1], "title", title)
return err
plog.Error("jumpDld.getParaTxsBlocks", "start", hlist[0], "end", hlist[len(hlist)-1], "title", title)
return nil, err
}
err = verifyBlockHahs(blocksList, blocks.Items)
if err != nil {
plog.Error("jumpDld.getParaTxsBlocks verifyTx", "start", hlist[0], "end", hlist[len(hlist)-1], "title", title)
return nil, err
}
blocksMap := make(map[int64]*types.ParaTxDetail)
for _, b := range blocks.Items {
blocksMap[b.Header.Height] = b
}
return blocksMap, nil
}
func (j *jumpDldClient) getHeaders(start, end int64) (*types.ParaTxDetails, error) {
blocks := &types.ReqBlocks{Start: start, End: end}
headers, err := j.paraClient.GetBlockHeaders(blocks)
if err != nil {
plog.Error("paraJumpDownload.getHeaders", "start", start, "end", end, "error", err)
return nil, err
}
plog.Debug("paraJumpDownload.getHeaders", "start", start, "end", end)
paraTxHeaders := &types.ParaTxDetails{}
for _, header := range headers.Items {
paraTxHeaders.Items = append(paraTxHeaders.Items, &types.ParaTxDetail{Type: types.AddBlock, Header: header})
}
return paraTxHeaders, nil
}
err = verifyBlockHahs(single, blocks.Items)
//每1000header执行一次比全部获取出来更有效率,可以和同步层更好并行处理,节约时间,1000paraTxBlocks花时间很少,相比headers获取,串行获取时间可以忽略
func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*types.BlockInfo, ch chan *paraTxBlocksJob) error {
title := j.paraClient.GetAPI().GetConfig().GetTitle()
heightsRows := splitHeights2Rows(heights, int(types.MaxBlockCountPerTime))
for i, row := range heightsRows {
//获取每一排1000个paraTxBlocks
paraBlocks, err := j.getParaTxsBlocks(row, title)
if err != nil {
return err
}
//根据1000个paraTxBlocks的头尾高度获取header的头尾高度,header的高度要包含paraTxBlocks高度
headerStart, headerEnd := getHeaderStartEndRange(startHeight, endHeight, heightsRows, i)
plog.Debug("paraJumpDownload.getHeaders", "headerStart", headerStart, "headerEnd", headerEnd, "i", i)
for s := headerStart; s <= headerEnd; s += types.MaxBlockCountPerTime {
end := s + types.MaxBlockCountPerTime - 1
if end > headerEnd {
end = headerEnd
}
headers, err := j.getHeaders(s, end)
if err != nil {
plog.Error("jumpDld.getParaTxs verifyTx", "start", hlist[0], "end", hlist[len(hlist)-1], "title", title)
plog.Error("jumpDld.getParaTxs headers", "start", headerStart, "end", headerEnd, "title", title, "err", err)
return err
}
s, e := getStartEndHeight(startHeight, endHeight, heightsArr, i)
plog.Info("jumpDld.getParaTxs fillTxJob", "start", s, "end", e, "i", i, "len", len(single))
paraTxs := &paraTxBlocksJob{start: s, end: e, txBlocks: blocks}
ch <- paraTxs
//每1000个header同步一次,这样可以更快更小粒度的使同步层获取区块执行
job := &paraTxBlocksJob{start: s, end: end, paraTxBlocks: paraBlocks, headers: headers}
ch <- job
if atomic.LoadInt32(&j.downFail) != 0 || j.paraClient.isCancel() {
return errors.New("verify fail or main thread cancel")
}
}
if atomic.LoadInt32(&j.downFail) != 0 || j.paraClient.isCancel() {
return errors.New("verify fail or main thread cancel")
......@@ -290,9 +288,8 @@ func (j *jumpDldClient) getParaTxs(startHeight, endHeight int64, heights []*type
//Jump Download 是选择有平行链交易的区块跳跃下载的功能,分为三个步骤:
//0. 只获取当前主链高度1w高度前的区块,默认没有分叉,都是addType block
//1. 获取完整的主链header,为了后面平行链交易的校验和平行链的空块产生,云节点主链1000个header大概30ms,固态硬盘更快
//2. 获取所有平行链交易的高度列表,大概5s以内
//3. 按高度列表获取平行链区块并获取一段执行一段
//1. 获取所有平行链交易的高度列表,大概5s以内
//2. 按高度列表获取含平行链交易的主链区块每次获取一段1000高度,并获取相关范围的主链headers,一起执行,单独获取headers和同步处理不能并行
func (j *jumpDldClient) tryJumpDownload() {
curMainHeight, err := j.paraClient.GetLastHeightOnMainChain()
if err != nil {
......@@ -314,28 +311,20 @@ func (j *jumpDldClient) tryJumpDownload() {
return
}
plog.Info("tryJumpDownload", "start", startHeight, "end", endHeight)
t1 := types.Now()
//1. get all main headers
err = j.getAllHeaders(startHeight, endHeight)
if err != nil {
plog.Error("JumpDld.getAllHeaders", "err", err)
return
}
plog.Info("tryJumpDownload.getAllHeaders", "time", types.Since(t1))
//2. 获取有平行链交易的块高度列表
t1 = types.Now()
heights, err := j.getParaHeights(startHeight, endHeight)
//1. 获取有平行链交易的块高度列表
t1 := types.Now()
heights, err := j.getParaHeightList(startHeight, endHeight)
if err != nil {
plog.Error("JumpDld.getParaHeights", "err", err)
plog.Error("JumpDld.getParaHeightList", "err", err)
}
if len(heights) == 0 {
plog.Error("JumpDld.getParaHeights no height found")
plog.Error("JumpDld.getParaHeightList no height found")
return
}
plog.Info("tryJumpDownload.getParaHeights", "time", types.Since(t1))
plog.Info("tryJumpDownload.getParaHeightList", "time", types.Since(t1))
//3. 按有平行链交易的高度列表获取平行链块
//2. 按有平行链交易的高度列表获取平行链块
jobsCh := make(chan *paraTxBlocksJob, defaultJobBufferNum)
j.wg.Add(1)
go j.processTxJobs(jobsCh)
......@@ -349,6 +338,5 @@ func (j *jumpDldClient) tryJumpDownload() {
close(jobsCh)
j.wg.Wait()
plog.Info("tryJumpDownload.getParaTxs", "time", types.Since(t1))
plog.Info("tryJumpDownload done")
plog.Info("tryJumpDownload.getParaTxs done", "time", types.Since(t1))
}
......@@ -27,7 +27,7 @@ func TestGetHeightsArry(t *testing.T) {
heights := []*types.BlockInfo{h0, h1, h2, h3, h4, h5, h6, h7, h8, h9}
hh := getHeightsArry(heights, 3)
hh := splitHeights2Rows(heights, 3)
h11 := []*types.BlockInfo{h0, h1, h2}
h12 := []*types.BlockInfo{h3, h4, h5}
h13 := []*types.BlockInfo{h6, h7, h8}
......@@ -35,19 +35,19 @@ func TestGetHeightsArry(t *testing.T) {
expect := [][]*types.BlockInfo{h11, h12, h13, h14}
assert.Equal(t, expect, hh)
s, e := getStartEndHeight(0, 100, hh, 0)
s, e := getHeaderStartEndRange(0, 100, hh, 0)
assert.Equal(t, int64(0), s)
assert.Equal(t, h2.Height, e)
s, e = getStartEndHeight(0, 100, hh, 1)
s, e = getHeaderStartEndRange(0, 100, hh, 1)
assert.Equal(t, h2.Height+1, s)
assert.Equal(t, h5.Height, e)
s, e = getStartEndHeight(0, 100, hh, 2)
s, e = getHeaderStartEndRange(0, 100, hh, 2)
assert.Equal(t, h5.Height+1, s)
assert.Equal(t, h8.Height, e)
s, e = getStartEndHeight(0, 100, hh, 3)
s, e = getHeaderStartEndRange(0, 100, hh, 3)
assert.Equal(t, h8.Height+1, s)
assert.Equal(t, int64(100), e)
}
......
......@@ -17,11 +17,11 @@ func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
//from blockchain db
blockDetails, err := client.GetAPI().GetBlocks(&types.ReqBlocks{Start: height, End: height})
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
plog.Error("GetBlockByHeight fail", "err", err)
return nil, err
}
if 1 != int64(len(blockDetails.Items)) {
plog.Error("paracommitmsg get node status block count fail")
plog.Error("GetBlockByHeight count fail", "len", len(blockDetails.Items))
return nil, types.ErrInvalidParam
}
return blockDetails.Items[0].Block, nil
......@@ -31,9 +31,16 @@ func (client *client) GetBlockHeaders(req *types.ReqBlocks) (*types.Headers, err
//from blockchain db
headers, err := client.grpcClient.GetHeaders(context.Background(), req)
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
plog.Error("GetBlockHeaders fail", "err", err)
return nil, err
}
count := req.End - req.Start + 1
if int64(len(headers.Items)) != count {
plog.Error("GetBlockHeaders", "start", req.Start, "end", req.End, "reals", headers.Items[0].Height, "reale", headers.Items[len(headers.Items)-1].Height,
"len", len(headers.Items), "count", count)
return nil, types.ErrBlockHeightNoMatch
}
return headers, nil
}
......@@ -55,7 +62,7 @@ func (client *client) getLastBlockMainInfo() (int64, *types.Block, error) {
func (client *client) getLastBlockInfo() (*types.Block, error) {
lastBlock, err := client.RequestLastBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
plog.Error("Parachain getLastBlockInfo fail", "err", err)
return nil, err
}
......@@ -141,7 +148,7 @@ func (client *client) GetParaHeightsByTitle(req *types.ReqHeightByTitle) (*types
//from blockchain db
heights, err := client.grpcClient.LoadParaTxByTitle(context.Background(), req)
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
plog.Error("GetParaHeightsByTitle fail", "err", err)
return nil, err
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment