Commit 94db9782 authored by mdj33's avatar mdj33 Committed by vipwzw

improve multi-download

parent 63d18523
......@@ -41,7 +41,7 @@ const (
defaultMainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main
mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
batchFetchBlockCount int64 = 128
batchFetchBlockCount int64 = 1000
)
var (
......
......@@ -10,13 +10,15 @@ import (
"strings"
"time"
"github.com/33cn/chain33/rpc/grpcclient"
"github.com/33cn/chain33/types"
)
const (
maxRollbackHeight int64 = 10000
defaultInvNumPerJob = 20 // 1000 block per inv, 20inv per job
defaultInvNumPerJob = 20 // 20inv task per job
defaultJobBufferNum = 20 // channel buffer num for done job process
maxBlockSize = 20000000 // 单次1000block size累积超过20M 需保存到localdb
downTimesFastThreshold = 600 // 单个server 下载超过600次,平均20次用20s,下载10分钟左右检查有没有差别比较大的
......@@ -67,11 +69,11 @@ type multiDldClient struct {
func (m *multiDldClient) getInvs(startHeight, endHeight int64) []*inventory {
var invs []*inventory
if endHeight > startHeight && endHeight-startHeight > maxRollbackHeight {
for i := startHeight; i < endHeight; i += types.MaxBlockCountPerTime {
for i := startHeight; i < endHeight; i += m.paraClient.subCfg.BatchFetchBlockCount {
inv := new(inventory)
inv.txs = &types.ParaTxDetails{}
inv.start = i
inv.end = i + types.MaxBlockCountPerTime - 1
inv.end = i + m.paraClient.subCfg.BatchFetchBlockCount - 1
if inv.end > endHeight {
inv.end = endHeight
invs = append(invs, inv)
......@@ -98,19 +100,21 @@ func (m *multiDldClient) tryMultiServerDownload() {
}
}
if len(conns) == 0 {
plog.Info("multiDownload not valid ips")
plog.Info("tryMultiServerDownload not valid ips")
return
}
m.conns = conns
curMainHeight, err := m.paraClient.GetLastHeightOnMainChain()
if err != nil {
plog.Error("tryMultiServerDownload getMain height", "err", err.Error())
return
}
//如果切换不成功,则不进行多服务下载
_, localBlock, err := m.paraClient.switchLocalHashMatchedBlock()
if err != nil {
plog.Error("tryMultiServerDownload switch local height", "err", err.Error())
return
}
......@@ -118,7 +122,7 @@ func (m *multiDldClient) tryMultiServerDownload() {
totalInvs := m.getInvs(localBlock.MainHeight+1, curMainHeight-maxRollbackHeight)
totalInvsNum := int64(len(totalInvs))
if totalInvsNum == 0 {
plog.Info("multiDownload no invs need download")
plog.Info("tryMultiServerDownload no invs need download")
return
}
......@@ -151,6 +155,7 @@ func (m *multiDldClient) tryMultiServerDownload() {
}
close(jobsCh)
m.wg.Wait()
plog.Info("tryMultiServerDownload done")
}
func (i *inventory) getFirstBlock(d *downloadJob) *types.ParaTxDetail {
......@@ -186,6 +191,15 @@ func (m *multiDldClient) processDoneJobs(ch chan *downloadJob) {
}
}
func (d *downloadJob) resetInv(i *inventory) {
if i.isSaveDb {
d.rmvBatchMainBlocks(i)
}
i.curHeight = i.start
i.txs.Items = nil
i.isSaveDb = false
}
func (d *downloadJob) process() {
for _, inv := range d.invs {
if inv.isSaveDb {
......@@ -198,8 +212,8 @@ func (d *downloadJob) process() {
if err != nil {
panic(err)
}
}
d.mDldCli.paraClient.blockSyncClient.handleLocalChangedMsg()
continue
}
......@@ -208,13 +222,11 @@ func (d *downloadJob) process() {
if err != nil {
panic(err)
}
}
}
func (d *downloadJob) getPreVerifyBlock(inv *inventory) (*types.ParaTxDetail, error) {
if inv.isSaveDb {
lastBlock, err := d.getBlockFromDb(inv.curHeight - 1)
if err != nil {
return nil, err
......@@ -244,10 +256,6 @@ func (d *downloadJob) verifyDownloadBlock(inv *inventory, blocks *types.ParaTxDe
if verifyBlock != nil {
err = verifyMainBlockHash(getVerifyHash(verifyBlock), blocks.Items[0])
if err != nil {
d.rmvBatchMainBlocks(inv)
inv.curHeight = inv.start
inv.txs.Items = nil
inv.isSaveDb = false
plog.Error("verifyDownloadBlock.verfiy", "ip", inv.connCli.ip)
return err
}
......@@ -280,7 +288,7 @@ func (d *downloadJob) saveBatchMainBlocks(txs *types.ParaTxDetails) error {
func (d *downloadJob) rmvBatchMainBlocks(inv *inventory) error {
set := &types.LocalDBSet{}
for i := inv.start; i <= inv.curHeight; i++ {
for i := inv.start; i < inv.curHeight; i++ {
key := calcTitleMainHeightKey(types.GetTitle(), i)
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV, kv)
......@@ -330,7 +338,7 @@ func (d *downloadJob) verifyInvs() []*inventory {
for _, inv := range d.invs {
err := d.checkInv(lastRetry, pre, inv)
if err != nil {
plog.Info("verifyInvs", "height", inv.start)
plog.Info("verifyInvs error", "height", inv.start)
retryItems = append(retryItems, inv)
lastRetry = inv.getLastBlock(d)
}
......@@ -387,7 +395,9 @@ func (d *downloadJob) requestMainBlocks(inv *inventory) (*types.ParaTxDetails, e
}
func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
start := time.Now()
defer func() {
connPool <- inv.connCli
d.wg.Done()
}()
......@@ -395,18 +405,16 @@ func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
plog.Debug("getInvBlocks begin", "start", inv.start, "end", inv.end)
for {
txs, err := d.requestMainBlocks(inv)
if err != nil {
plog.Error("getInvBlocks connect error", "err", err, "ip", inv.connCli.ip)
if err != nil || len(txs.Items) == 0 {
d.resetInv(inv)
plog.Error("getInvBlocks reqMainBlock error", "err", err, "ip", inv.connCli.ip)
return
}
if len(txs.Items) == 0 {
plog.Error("getInvBlocks not items down", "ip", inv.connCli.ip)
continue
}
err = d.verifyDownloadBlock(inv, txs)
if err != nil {
continue
d.resetInv(inv)
return
}
//save 之前save到db,后面区块全部save到db
......@@ -419,9 +427,8 @@ func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
//check done
if txs.Items[len(txs.Items)-1].Header.Height == inv.end {
inv.connCli.downTimes++
plog.Info("getInvs done", "start", inv.start, "end", inv.end, "downtimes", inv.connCli.downTimes, "ip", inv.connCli.ip)
plog.Info("downloadjob getInvs done", "start", inv.start, "end", inv.end, "time", time.Since(start).Nanoseconds()/1000000, "downtimes", inv.connCli.downTimes, "ip", inv.connCli.ip)
inv.isDone = true
connPool <- inv.connCli
return
}
if !inv.isSaveDb && types.Size(inv.txs) > maxBlockSize {
......@@ -441,7 +448,6 @@ func (d *downloadJob) getInvs(invs []*inventory) {
if !conn.isFail {
connPool <- conn
}
}
for _, inv := range invs {
......@@ -453,7 +459,6 @@ func (d *downloadJob) getInvs(invs []*inventory) {
go d.getInvBlocks(inv, connPool)
}
//等待下载任务
d.wg.Wait()
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment