Commit 8b282c75 authored by mdj33's avatar mdj33 Committed by vipwzw

add jump dld para block

parent 5795fe6b
......@@ -91,6 +91,7 @@ type subConfig struct {
MultiDownJobBuffNum uint32 `json:"multiDownJobBuffNum,omitempty"`
MultiDownServerRspTime uint32 `json:"multiDownServerRspTime,omitempty"`
RmCommitParamMainHeight int64 `json:"rmCommitParamMainHeight,omitempty"`
JumpDownloadClose bool `json:"jumpDownloadClose,omitempty"`
}
// New function to init paracross env
......
......@@ -21,20 +21,8 @@ import (
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleHeightKey(cfg.GetTitle(), height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
//两个key原子操作
key = calcTitleLastHeightKey(cfg.GetTitle())
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
func (client *client) createLocalGenesisBlock(genesis *types.Block) error {
return client.alignLocalBlock2ChainBlock(genesis)
}
func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*types.Transaction, mainBlock *types.ParaTxDetail) error {
......@@ -55,80 +43,6 @@ func (client *client) createLocalBlock(lastBlock *pt.ParaLocalDbBlock, txs []*ty
return err
}
func (client *client) createLocalGenesisBlock(genesis *types.Block) error {
return client.alignLocalBlock2ChainBlock(genesis)
}
func (client *client) delLocalBlock(height int64) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleHeightKey(cfg.GetTitle(), height)
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV, kv)
//两个key原子操作
key = calcTitleLastHeightKey(cfg.GetTitle())
kv = &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: height - 1})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
// localblock 设置到当前高度,当前高度后面block会被新的区块覆盖
func (client *client) removeLocalBlocks(curHeight int64) error {
cfg := client.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleLastHeightKey(cfg.GetTitle())
kv := &types.KeyValue{Key: key, Value: types.Encode(&types.Int64{Data: curHeight})}
set.KV = append(set.KV, kv)
return client.setLocalDb(set)
}
func (client *client) getLastLocalHeight() (int64, error) {
cfg := client.GetAPI().GetConfig()
key := calcTitleLastHeightKey(cfg.GetTitle())
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys))
if err != nil {
return -1, err
}
if len(value) == 0 || value[0] == nil {
return -1, types.ErrNotFound
}
height := &types.Int64{}
err = types.Decode(value[0], height)
if err != nil {
return -1, err
}
return height.Data, nil
}
func (client *client) getLocalBlockByHeight(height int64) (*pt.ParaLocalDbBlock, error) {
cfg := client.GetAPI().GetConfig()
key := calcTitleHeightKey(cfg.GetTitle(), height)
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := client.getLocalDb(set, len(set.Keys))
if err != nil {
return nil, err
}
if len(value) == 0 || value[0] == nil {
return nil, types.ErrNotFound
}
var block pt.ParaLocalDbBlock
err = types.Decode(value[0], &block)
if err != nil {
return nil, err
}
return &block, nil
}
func (client *client) getLocalBlockSeq(height int64) (int64, []byte, error) {
lastBlock, err := client.getLocalBlockByHeight(height)
if err != nil {
......
......@@ -224,7 +224,7 @@ func (m *multiDldClient) tryMultiServerDownload() {
func (i *inventory) getFirstBlock(d *downloadJob) *types.ParaTxDetail {
if i.isSaveDb {
block, err := d.getBlockFromDb(i.start)
block, err := d.mDldCli.paraClient.getMainBlockFromDb(i.start)
if err != nil {
panic(err)
}
......@@ -238,7 +238,7 @@ func (i *inventory) getLastBlock(d *downloadJob) *types.ParaTxDetail {
return nil
}
if i.isSaveDb {
block, err := d.getBlockFromDb(i.end)
block, err := d.mDldCli.paraClient.getMainBlockFromDb(i.end)
if err != nil {
panic(err)
}
......@@ -257,7 +257,7 @@ func (m *multiDldClient) processDoneJobs(ch chan *downloadJob) {
func (d *downloadJob) resetInv(i *inventory) {
if i.isSaveDb {
d.rmvBatchMainBlocks(i)
d.mDldCli.paraClient.rmvBatchMainBlocks(i.start, i.curHeight)
}
i.curHeight = i.start
i.txs.Items = nil
......@@ -268,7 +268,7 @@ func (d *downloadJob) process() {
for _, inv := range d.invs {
if inv.isSaveDb {
for i := inv.start; i <= inv.end; i++ {
block, err := d.getBlockFromDb(i)
block, err := d.mDldCli.paraClient.getMainBlockFromDb(i)
if err != nil {
panic(err)
}
......@@ -292,7 +292,7 @@ func (d *downloadJob) process() {
func (d *downloadJob) getPreVerifyBlock(inv *inventory) (*types.ParaTxDetail, error) {
if inv.isSaveDb {
lastBlock, err := d.getBlockFromDb(inv.curHeight - 1)
lastBlock, err := d.mDldCli.paraClient.getMainBlockFromDb(inv.curHeight - 1)
if err != nil {
return nil, err
}
......@@ -328,64 +328,6 @@ func (d *downloadJob) verifyDownloadBlock(inv *inventory, blocks *types.ParaTxDe
return nil
}
func (d *downloadJob) saveMainBlock(height int64, block *types.ParaTxDetail) error {
cfg := d.mDldCli.paraClient.GetAPI().GetConfig()
set := &types.LocalDBSet{}
key := calcTitleMainHeightKey(cfg.GetTitle(), height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
return d.mDldCli.paraClient.setLocalDb(set)
}
func (d *downloadJob) saveBatchMainBlocks(txs *types.ParaTxDetails) error {
cfg := d.mDldCli.paraClient.GetAPI().GetConfig()
set := &types.LocalDBSet{}
for _, block := range txs.Items {
key := calcTitleMainHeightKey(cfg.GetTitle(), block.Header.Height)
kv := &types.KeyValue{Key: key, Value: types.Encode(block)}
set.KV = append(set.KV, kv)
}
return d.mDldCli.paraClient.setLocalDb(set)
}
func (d *downloadJob) rmvBatchMainBlocks(inv *inventory) error {
cfg := d.mDldCli.paraClient.GetAPI().GetConfig()
set := &types.LocalDBSet{}
for i := inv.start; i < inv.curHeight; i++ {
key := calcTitleMainHeightKey(cfg.GetTitle(), i)
kv := &types.KeyValue{Key: key, Value: nil}
set.KV = append(set.KV, kv)
}
return d.mDldCli.paraClient.setLocalDb(set)
}
func (d *downloadJob) getBlockFromDb(height int64) (*types.ParaTxDetail, error) {
cfg := d.mDldCli.paraClient.GetAPI().GetConfig()
key := calcTitleMainHeightKey(cfg.GetTitle(), height)
set := &types.LocalDBGet{Keys: [][]byte{key}}
value, err := d.mDldCli.paraClient.getLocalDb(set, len(set.Keys))
if err != nil {
return nil, err
}
if len(value) == 0 || value[0] == nil {
return nil, types.ErrNotFound
}
var tx types.ParaTxDetail
err = types.Decode(value[0], &tx)
if err != nil {
return nil, err
}
return &tx, nil
}
func (d *downloadJob) checkInv(lastRetry, pre *types.ParaTxDetail, inv *inventory) error {
if !inv.isDone {
return types.ErrNotFound
......@@ -515,7 +457,7 @@ func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
//save 之前save到db,后面区块全部save到db
if inv.isSaveDb {
d.saveBatchMainBlocks(txs)
d.mDldCli.paraClient.saveBatchMainBlocks(txs)
} else {
inv.txs.Items = append(inv.txs.Items, txs.Items...)
}
......@@ -528,7 +470,7 @@ func (d *downloadJob) getInvBlocks(inv *inventory, connPool chan *connectCli) {
return
}
if !inv.isSaveDb && types.Size(inv.txs) > maxBlockSize {
d.saveBatchMainBlocks(inv.txs)
d.mDldCli.paraClient.saveBatchMainBlocks(inv.txs)
inv.txs.Items = nil
inv.isSaveDb = true
}
......
......@@ -8,54 +8,11 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/types"
)
func (client *client) setLocalDb(set *types.LocalDBSet) error {
//如果追赶上主链了,则落盘
if client.isCaughtUp() {
set.Txid = 1
client.blockSyncClient.handleLocalCaughtUpMsg()
}
msg := client.GetQueueClient().NewMessage("blockchain", types.EventSetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return err
}
if resp.GetData().(*types.Reply).IsOk {
return nil
}
return errors.New(string(resp.GetData().(*types.Reply).GetMsg()))
}
func (client *client) getLocalDb(set *types.LocalDBGet, count int) ([][]byte, error) {
msg := client.GetQueueClient().NewMessage("blockchain", types.EventGetValueByKey, set)
err := client.GetQueueClient().Send(msg, true)
if err != nil {
return nil, err
}
resp, err := client.GetQueueClient().Wait(msg)
if err != nil {
return nil, err
}
reply := resp.GetData().(*types.LocalReplyValue)
if len(reply.Values) != count {
plog.Error("Parachain getLocalDb count not match", "expert", count, "real", len(reply.Values))
return nil, types.ErrInvalidParam
}
return reply.Values, nil
}
func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
//from blockchain db
blockDetails, err := client.GetAPI().GetBlocks(&types.ReqBlocks{Start: height, End: height})
......@@ -70,6 +27,16 @@ func (client *client) GetBlockByHeight(height int64) (*types.Block, error) {
return blockDetails.Items[0].Block, nil
}
func (client *client) GetBlockHeaders(req *types.ReqBlocks) (*types.Headers, error) {
//from blockchain db
headers, err := client.grpcClient.GetHeaders(context.Background(), req)
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
return nil, err
}
return headers, nil
}
// 获取当前平行链block对应主链seq,hash信息
// 对于云端主链节点,创世区块记录seq在不同主链节点上差异很大,通过记录的主链hash获取真实seq使用
func (client *client) getLastBlockMainInfo() (int64, *types.Block, error) {
......@@ -169,3 +136,30 @@ func (client *client) QueryTxOnMainByHash(hash []byte) (*types.TransactionDetail
return detail, nil
}
func (client *client) GetParaHeightsByTitle(req *types.ReqHeightByTitle) (*types.ReplyHeightByTitle, error) {
//from blockchain db
heights, err := client.grpcClient.LoadParaTxByTitle(context.Background(), req)
if err != nil {
plog.Error("paracommitmsg get node status block count fail")
return nil, err
}
return heights, nil
}
func (client *client) GetParaTxByHeight(req *types.ReqParaTxByHeight) (*types.ParaTxDetails, error) {
//from blockchain db
blocks, err := client.grpcClient.GetParaTxByHeight(context.Background(), req)
if err != nil {
plog.Error("GetParaTxByHeight get node status block count fail")
return nil, err
}
if len(req.Items) != len(blocks.Items) {
plog.Error("GetParaTxByHeight get block tx count fail", "req", len(req.Items), "rsp", len(blocks.Items))
return nil, types.ErrInvalidParam
}
return blocks, nil
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment