Commit 732471a3 authored by mdj33's avatar mdj33 Committed by vipwzw

add multi-download

parent 1ebeedb2
......@@ -14,6 +14,11 @@ func calcTitleHeightKey(title string, height int64) []byte {
return []byte(fmt.Sprintf("%s-TH-%s-%d", types.ConsensusParaTxsPrefix, title, height))
}
//temp main height tx data for big size
func calcTitleMainHeightKey(title string, height int64) []byte {
return []byte(fmt.Sprintf("%s-TMH-%s-%d", types.ConsensusParaTxsPrefix, title, height))
}
func calcTitleLastHeightKey(title string) []byte {
return []byte(fmt.Sprintf("%s-TLH-%s", types.ConsensusParaTxsPrefix, title))
}
......
......@@ -29,9 +29,6 @@ import (
)
const (
addAct int64 = 1 //add para block action
delAct int64 = 2 //reference blockstore.go, del para block action
minBlockNum = 100 //min block number startHeight before lastHeight in mainchain
genesisBlockTime int64 = 1514533390
......@@ -64,10 +61,12 @@ type client struct {
caughtUp int32
commitMsgClient *commitMsgClient
blockSyncClient *blockSyncClient
multiDldCli *multiDldClient
authAccount string
privateKey crypto.PrivKey
wg sync.WaitGroup
subCfg *subConfig
isClosed int32
quitCreate chan struct{}
}
......@@ -89,6 +88,9 @@ type subConfig struct {
FetchFilterParaTxsEnable uint32 `json:"fetchFilterParaTxsEnable,omitempty"`
BatchFetchBlockCount int64 `json:"batchFetchBlockCount,omitempty"`
ParaConsensStartHeight int64 `json:"paraConsensStartHeight,omitempty"`
MultiDownloadOpen int32 `json:"multiDownloadOpen,omitempty"`
MultiDownInvNumPerJob int64 `json:"multiDownInvNumPerJob,omitempty"`
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
}
// New function to init paracross env
......@@ -197,6 +199,21 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
para.blockSyncClient.maxSyncErrCount = subcfg.MaxSyncErrCount
}
para.multiDldCli = &multiDldClient{
paraClient: para,
invNumPerJob: defaultInvNumPerJob,
jobBufferNum: defaultJobBufferNum,
}
if subcfg.MultiDownInvNumPerJob > 0 {
para.multiDldCli.invNumPerJob = subcfg.MultiDownInvNumPerJob
}
if subcfg.MultiDownJobBuffNum > 0 {
para.multiDldCli.jobBufferNum = subcfg.MultiDownJobBuffNum
}
if subcfg.MultiDownloadOpen > 0 {
para.multiDldCli.multiDldOpen = true
}
c.SetChild(para)
return para
}
......@@ -208,6 +225,7 @@ func (client *client) CheckBlock(parent *types.Block, current *types.BlockDetail
}
func (client *client) Close() {
atomic.StoreInt32(&client.isClosed, 1)
close(client.commitMsgClient.quit)
close(client.quitCreate)
close(client.blockSyncClient.quitChan)
......@@ -218,6 +236,10 @@ func (client *client) Close() {
plog.Info("consensus para closed")
}
func (client *client) isCancel() bool {
return atomic.LoadInt32(&client.isClosed) == 1
}
func (client *client) SetQueueClient(c queue.Client) {
plog.Info("Enter SetQueueClient method of Para consensus")
client.InitClient(c, func() {
......
......@@ -93,7 +93,7 @@ func (client *client) getLastLocalHeight() (int64, error) {
if err != nil {
return -1, err
}
if len(value) == 0 {
if len(value) == 0 || value[0] == nil {
return -1, types.ErrNotFound
}
......@@ -114,7 +114,7 @@ func (client *client) getLocalBlockByHeight(height int64) (*pt.ParaLocalDbBlock,
if err != nil {
return nil, err
}
if len(value) == 0 {
if len(value) == 0 || value[0] == nil {
return nil, types.ErrNotFound
}
......@@ -264,15 +264,15 @@ func (client *client) switchMatchedBlockOnChain(startHeight int64) (int64, []byt
}
func (client *client) switchHashMatchedBlock() (int64, []byte, error) {
mainSeq, mainHash, err := client.switchLocalHashMatchedBlock()
mainSeq, localBlock, err := client.switchLocalHashMatchedBlock()
if err != nil {
return client.switchMatchedBlockOnChain(0)
}
return mainSeq, mainHash, nil
return mainSeq, localBlock.MainHash, nil
}
//
func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
func (client *client) switchLocalHashMatchedBlock() (int64, *pt.ParaLocalDbBlock, error) {
lastBlock, err := client.getLastLocalBlock()
if err != nil {
plog.Error("Parachain RequestLastBlock fail", "err", err)
......@@ -286,6 +286,11 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
}
//当前block结构已经有mainHash和MainHeight但是从blockchain获取的block还没有写入,以后如果获取到,可以替换从minerTx获取
plog.Info("switchLocalHashMatchedBlock", "height", height, "mainHeight", block.MainHeight, "mainHash", hex.EncodeToString(block.MainHash))
mainHash, err := client.GetHashByHeightOnMainChain(block.MainHeight)
if err != nil || !bytes.Equal(mainHash, block.MainHash) {
continue
}
mainSeq, err := client.GetSeqByHashOnMainChain(block.MainHash)
if err != nil {
continue
......@@ -298,8 +303,8 @@ func (client *client) switchLocalHashMatchedBlock() (int64, []byte, error) {
}
plog.Info("switchLocalHashMatchedBlock succ", "currHeight", height, "initHeight", lastBlock.Height,
"currSeq", mainSeq, "currMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block.MainHash, nil
"currSeq", mainSeq, "mainHeight", block.MainHeight, "currMainBlockHash", hex.EncodeToString(block.MainHash))
return mainSeq, block, nil
}
return -2, nil, pt.ErrParaCurHashNotMatch
}
......@@ -317,7 +322,7 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
atomic.StoreInt32(&client.caughtUp, 1)
}
if fetchFilterParaTxsEnable && lastSeq-currSeq > client.subCfg.BatchFetchBlockCount {
return client.subCfg.BatchFetchBlockCount, nil
return client.subCfg.BatchFetchBlockCount - 1, nil
}
return 0, nil
}
......@@ -337,9 +342,23 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
}
func getParentHash(block *types.ParaTxDetail) []byte {
if block.Type == types.AddBlock {
return block.Header.ParentHash
}
return block.Header.Hash
}
func getVerifyHash(block *types.ParaTxDetail) []byte {
if block.Type == types.AddBlock {
return block.Header.Hash
}
return block.Header.ParentHash
}
func verifyMainBlockHash(preMainBlockHash []byte, mainBlock *types.ParaTxDetail) error {
if (bytes.Equal(preMainBlockHash, mainBlock.Header.ParentHash) && mainBlock.Type == addAct) ||
(bytes.Equal(preMainBlockHash, mainBlock.Header.Hash) && mainBlock.Type == delAct) {
if bytes.Equal(preMainBlockHash, getParentHash(mainBlock)) {
return nil
}
plog.Error("verifyMainBlockHash", "preMainBlockHash", hex.EncodeToString(preMainBlockHash),
......@@ -355,16 +374,29 @@ func verifyMainBlocks(preMainBlockHash []byte, mainBlocks *types.ParaTxDetails)
if err != nil {
return err
}
if block.Type == addAct {
pre = block.Header.Hash
} else {
pre = block.Header.ParentHash
}
pre = getVerifyHash(block)
}
return nil
}
func verifyMainBlocksInternal(mainBlocks *types.ParaTxDetails) error {
return verifyMainBlocks(getParentHash(mainBlocks.Items[0]), mainBlocks)
}
func isValidSeqType(ty int64) bool {
return ty == types.AddBlock || ty == types.DelBlock
}
func validMainBlocks(txs *types.ParaTxDetails) *types.ParaTxDetails {
for i, item := range txs.Items {
if item == nil || !isValidSeqType(item.Type) {
txs.Items = txs.Items[:i]
return txs
}
}
return txs
}
func (client *client) requestTxsFromBlock(currSeq int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
blockSeq, err := client.GetBlockOnMainBySeq(currSeq)
if err != nil {
......@@ -374,6 +406,10 @@ func (client *client) requestTxsFromBlock(currSeq int64, preMainBlockHash []byte
txDetail := blockSeq.Detail.FilterParaTxsByTitle(types.GetTitle())
txDetail.Type = blockSeq.Seq.Type
if !isValidSeqType(txDetail.Type) {
return nil, types.ErrInvalidParam
}
err = verifyMainBlockHash(preMainBlockHash, txDetail)
if err != nil {
plog.Error("requestTxsFromBlock", "curr seq", currSeq, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
......@@ -383,12 +419,13 @@ func (client *client) requestTxsFromBlock(currSeq int64, preMainBlockHash []byte
}
func (client *client) requestFilterParaTxs(currSeq int64, count int64, preMainBlockHash []byte) (*types.ParaTxDetails, error) {
req := &types.ReqParaTxByTitle{Start: currSeq, End: currSeq + count, Title: types.GetTitle()}
req := &types.ReqParaTxByTitle{IsSeq: true, Start: currSeq, End: currSeq + count, Title: types.GetTitle()}
details, err := client.GetParaTxByTitle(req)
if err != nil {
return nil, err
}
details = validMainBlocks(details)
err = verifyMainBlocks(preMainBlockHash, details)
if err != nil {
plog.Error("requestTxsOnlyPara", "curSeq", currSeq, "count", count, "preMainBlockHash", hex.EncodeToString(preMainBlockHash))
......@@ -430,26 +467,24 @@ func (client *client) procLocalBlock(mainBlock *types.ParaTxDetail) (bool, error
"lastBlockMainHash", common.ToHex(lastBlock.MainHash), "currMainHeight", lastSeqMainHeight,
"curMainHash", common.ToHex(mainBlock.Header.Hash), "seqTy", mainBlock.Type)
if mainBlock.Type == delAct {
if mainBlock.Type == types.DelBlock {
if len(txs) == 0 {
if lastSeqMainHeight > lastBlock.MainHeight {
return false, nil
}
plog.Info("Delete empty block")
plog.Info("Delete empty block", "height", lastBlock.Height)
}
return true, client.delLocalBlock(lastBlock.Height)
} else if mainBlock.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < client.subCfg.EmptyBlockInterval {
return false, nil
}
plog.Info("Create empty block")
}
//AddAct
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < client.subCfg.EmptyBlockInterval {
return false, nil
}
return true, client.createLocalBlock(lastBlock, txs, mainBlock)
plog.Info("Create empty block", "newHeight", lastBlock.Height+1)
}
return false, types.ErrInvalidParam
return true, client.createLocalBlock(lastBlock, txs, mainBlock)
}
......@@ -472,6 +507,8 @@ func (client *client) procLocalBlocks(mainBlocks *types.ParaTxDetails) error {
}
func (client *client) CreateBlock() {
client.multiDldCli.tryMultiServerDownload()
lastSeq, lastSeqMainHash, err := client.getLastLocalBlockSeq()
if err != nil {
plog.Error("Parachain CreateBlock getLastLocalBlockSeq fail", "err", err.Error())
......@@ -502,9 +539,9 @@ out:
continue
}
if count+1 != int64(len(paraTxs.Items)) {
plog.Error("para CreateBlock count not match", "count", count+1, "items", len(paraTxs.Items))
continue
if count != int64(len(paraTxs.Items)) {
plog.Debug("para CreateBlock count not match", "count", count, "items", len(paraTxs.Items))
count = int64(len(paraTxs.Items))
}
err = client.procLocalBlocks(paraTxs)
......@@ -516,11 +553,11 @@ out:
}
//重新设定seq和lastSeqMainHash
lastSeqMainHash = paraTxs.Items[count].Header.Hash
if paraTxs.Items[count].Type == delAct {
lastSeqMainHash = paraTxs.Items[count].Header.ParentHash
lastSeqMainHash = paraTxs.Items[count-1].Header.Hash
if paraTxs.Items[count-1].Type == types.DelBlock {
lastSeqMainHash = paraTxs.Items[count-1].Header.ParentHash
}
currSeq = currSeq + count + 1
currSeq = currSeq + count
}
}
......
This diff is collapsed.
......@@ -150,7 +150,7 @@ func TestVerifyMainBlocks(t *testing.T) {
Hash: hash1,
}
block1 := &types.ParaTxDetail{
Type: addAct,
Type: types.AddBlock,
Header: header1,
}
......@@ -159,7 +159,7 @@ func TestVerifyMainBlocks(t *testing.T) {
Hash: hash2,
}
block2 := &types.ParaTxDetail{
Type: addAct,
Type: types.AddBlock,
Header: header2,
}
......@@ -168,7 +168,7 @@ func TestVerifyMainBlocks(t *testing.T) {
Hash: hash3,
}
block3 := &types.ParaTxDetail{
Type: addAct,
Type: types.AddBlock,
Header: header3,
}
......@@ -178,7 +178,7 @@ func TestVerifyMainBlocks(t *testing.T) {
Hash: hash3,
}
block4 := &types.ParaTxDetail{
Type: delAct,
Type: types.DelBlock,
Header: header4,
}
//del2
......@@ -187,7 +187,7 @@ func TestVerifyMainBlocks(t *testing.T) {
Hash: hash2,
}
block5 := &types.ParaTxDetail{
Type: delAct,
Type: types.DelBlock,
Header: header5,
}
......@@ -196,7 +196,7 @@ func TestVerifyMainBlocks(t *testing.T) {
Hash: hash6,
}
block6 := &types.ParaTxDetail{
Type: addAct,
Type: types.AddBlock,
Header: header6,
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment