Commit b8d027e4 authored by kingwang's avatar kingwang Committed by 33cn

update chain33 07/31

parent 23143d09
......@@ -14,6 +14,7 @@ import (
"github.com/33cn/chain33/types"
jsty "github.com/33cn/plugin/plugin/dapp/js/types"
"github.com/33cn/plugin/plugin/dapp/js/types/jsproto"
//"github.com/gojson"
"github.com/spf13/cobra"
)
......
......@@ -189,12 +189,16 @@ func (chain *BlockChain) FetchBlock(start int64, end int64, pid []string, syncOr
requestblock.End = end
}
var cb func()
var timeoutcb func(height int64)
if syncOrfork {
//还有区块需要请求,挂接钩子回调函数
if requestblock.End < chain.downLoadInfo.EndHeight {
cb = func() {
chain.ReqDownLoadBlocks()
}
timeoutcb = func(height int64) {
chain.DownLoadTimeOutProc(height)
}
chain.UpdateDownLoadStartHeight(requestblock.End + 1)
//快速下载时需要及时更新bestpeer,防止下载侧链的block
if chain.GetDownloadSyncStatus() {
......@@ -203,7 +207,7 @@ func (chain *BlockChain) FetchBlock(start int64, end int64, pid []string, syncOr
} else { // 所有DownLoad block已请求结束,恢复DownLoadInfo为默认值
chain.DefaultDownLoadInfo()
}
err = chain.downLoadTask.Start(requestblock.Start, requestblock.End, cb)
err = chain.downLoadTask.Start(requestblock.Start, requestblock.End, cb, timeoutcb)
if err != nil {
return err
}
......@@ -213,7 +217,7 @@ func (chain *BlockChain) FetchBlock(start int64, end int64, pid []string, syncOr
chain.SynBlocksFromPeers()
}
}
err = chain.syncTask.Start(requestblock.Start, requestblock.End, cb)
err = chain.syncTask.Start(requestblock.Start, requestblock.End, cb, timeoutcb)
if err != nil {
return err
}
......@@ -570,6 +574,12 @@ func (chain *BlockChain) SynBlocksFromPeers() {
synlog.Info("chain syncTask InProgress")
return
}
//如果此时系统正在处理回滚,不启动同步的任务。
//等分叉回滚处理结束之后再启动同步任务继续同步
if chain.downLoadTask.InProgress() {
synlog.Info("chain downLoadTask InProgress")
return
}
//获取peers的最新高度.处理没有收到广播block的情况
if curheight+1 < peerMaxBlkHeight {
synlog.Info("SynBlocksFromPeers", "curheight", curheight, "LastCastBlkHeight", RcvLastCastBlkHeight, "peerMaxBlkHeight", peerMaxBlkHeight)
......@@ -590,7 +600,6 @@ func (chain *BlockChain) SynBlocksFromPeers() {
//请求bestchain.Height -BackBlockNum -- bestchain.Height的header
//需要考虑收不到分叉之后的第一个广播block,这样就会导致后面的广播block都在孤儿节点中了。
func (chain *BlockChain) CheckHeightNoIncrease() {
synlog.Debug("CheckHeightNoIncrease")
defer chain.tickerwg.Done()
//获取当前主链的最新高度
......@@ -609,8 +618,9 @@ func (chain *BlockChain) CheckHeightNoIncrease() {
chain.UpdatesynBlkHeight(tipheight)
return
}
//一个检测周期bestchain的tip高度没有变化。并且远远落后于peer的最新高度
//本节点可能在侧链上,需要从最新的peer上向后取BackBlockNum个headers
//一个检测周期发现本节点bestchain的tip高度没有变化。
//远远落后于高度的peer节点并且最高peer节点不是最优链,本节点可能在侧链上,
//需要从最新的peer上向后取BackBlockNum个headers
maxpeer := chain.GetMaxPeerInfo()
if maxpeer == nil {
......@@ -620,8 +630,9 @@ func (chain *BlockChain) CheckHeightNoIncrease() {
peermaxheight := maxpeer.Height
pid := maxpeer.Name
var err error
if peermaxheight > tipheight && (peermaxheight-tipheight) > BackwardBlockNum {
//从指定peer 请求BackBlockNum个blockheaders
if peermaxheight > tipheight && (peermaxheight-tipheight) > BackwardBlockNum && !chain.isBestChainPeer(pid) {
//从指定peer向后请求BackBlockNum个blockheaders
synlog.Debug("CheckHeightNoIncrease", "tipheight", tipheight, "pid", pid)
if tipheight > BackBlockNum {
err = chain.FetchBlockHeaders(tipheight-BackBlockNum, tipheight, pid)
} else {
......@@ -765,7 +776,13 @@ func (chain *BlockChain) ProcBlockHeaders(headers *types.Headers, pid string) er
return nil
}
//在快速下载block阶段不处理fork的处理
//如果在普通同步阶段出现了分叉
//需要暂定同步解决分叉回滚之后再继续开启普通同步
if !chain.GetDownloadSyncStatus() {
if chain.syncTask.InProgress() {
err = chain.syncTask.Cancel()
synlog.Info("ProcBlockHeaders: cancel syncTask start fork process downLoadTask!", "err", err)
}
go chain.ProcDownLoadBlocks(ForkHeight, peermaxheight, []string{pid})
}
return nil
......@@ -951,6 +968,17 @@ func (chain *BlockChain) GetBestChainPeer(pid string) *BestPeerInfo {
return chain.bestChainPeerList[pid]
}
//isBestChainPeer 指定peer是不是最优链
func (chain *BlockChain) isBestChainPeer(pid string) bool {
chain.bestpeerlock.Lock()
defer chain.bestpeerlock.Unlock()
peer := chain.bestChainPeerList[pid]
if peer != nil && peer.IsBestChain {
return true
}
return false
}
//GetBestChainPids 定时确保本节点在最优链上,定时向peer请求指定高度的header
func (chain *BlockChain) GetBestChainPids() []string {
var PeerPids []string
......
......@@ -1088,6 +1088,7 @@ func testReadBlockToExec(t *testing.T, chain *blockchain.BlockChain) {
chainlog.Info("testReadBlockToExec begin ---------------------")
curheight := chain.GetBlockHeight()
chain.ReadBlockToExec(curheight+1, false)
chain.DownLoadTimeOutProc(curheight - 1)
chainlog.Info("testReadBlockToExec end ---------------------")
}
func testWriteBlockToDbTemp(t *testing.T, chain *blockchain.BlockChain) {
......
......@@ -331,3 +331,22 @@ func (chain *BlockChain) ReqDownLoadBlocks() {
}
}
}
//DownLoadTimeOutProc 快速下载模式下载区块超时的处理函数
func (chain *BlockChain) DownLoadTimeOutProc(height int64) {
info := chain.GetDownLoadInfo()
synlog.Info("DownLoadTimeOutProc", "timeoutheight", height, "StartHeight", info.StartHeight, "EndHeight", info.EndHeight)
if info.StartHeight != -1 && info.EndHeight != -1 && info.Pids != nil {
//从超时的高度继续下载区块
if info.StartHeight > height {
chain.UpdateDownLoadStartHeight(height)
info.StartHeight = height
}
synlog.Info("DownLoadTimeOutProc:FetchBlock", "StartHeight", info.StartHeight, "EndHeight", info.EndHeight, "pids", len(info.Pids))
err := chain.FetchBlock(info.StartHeight, info.EndHeight, info.Pids, true)
if err != nil {
synlog.Error("DownLoadTimeOutProc:FetchBlock", "err", err)
}
}
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blockchain
import (
"strings"
"github.com/33cn/chain33/types"
)
var (
filterlog = chainlog.New("submodule", "filter")
)
//GetParaTxByTitle 通过seq以及title获取对应平行连的交易
func (chain *BlockChain) GetParaTxByTitle(seq *types.ReqParaTxByTitle) (*types.ParaTxDetails, error) {
//入参数校验
err := chain.checkInputParam(seq)
if err != nil {
return nil, err
}
//获取区块的seq信息
req := &types.ReqBlocks{Start: seq.Start, End: seq.End, IsDetail: false, Pid: []string{}}
sequences, err := chain.GetBlockSequences(req)
if err != nil {
filterlog.Error("GetParaTxByTitle:GetBlockSequences", "err", err.Error())
return nil, err
}
//通过区块hash获取区块信息
var reqHashes types.ReqHashes
for _, item := range sequences.Items {
if item != nil {
reqHashes.Hashes = append(reqHashes.Hashes, item.GetHash())
}
}
blocks, err := chain.GetBlockByHashes(reqHashes.Hashes)
if err != nil {
filterlog.Error("GetParaTxByTitle:GetBlockByHashes", "err", err)
return nil, err
}
//通过指定的title过滤对应平行链的交易
var paraTxs types.ParaTxDetails
var paraTx *types.ParaTxDetail
for i, block := range blocks.Items {
if block != nil {
paraTx = block.FilterParaTxsByTitle(seq.Title)
paraTx.Type = sequences.Items[i].GetType()
} else {
paraTx = nil
}
paraTxs.Items = append(paraTxs.Items, paraTx)
}
return &paraTxs, err
}
//checkInputParam 入参检测,主要检测seq的end的值已经title是否合法
func (chain *BlockChain) checkInputParam(seq *types.ReqParaTxByTitle) error {
//入参数校验
blockLastSeq, err := chain.blockStore.LoadBlockLastSequence()
if err != nil || seq.End > blockLastSeq || blockLastSeq < 0 || !strings.HasPrefix(seq.Title, types.ParaKeyX) {
filterlog.Error("checkInputParam", "blockLastSeq", blockLastSeq, "seq", seq, "err", err)
return types.ErrInvalidParam
}
return nil
}
// Copyright Fuzamei Corp. 2018 All Rights Reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package blockchain_test
import (
"bytes"
"errors"
"strings"
"testing"
"time"
"github.com/33cn/chain33/blockchain"
"github.com/33cn/chain33/client"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/merkle"
_ "github.com/33cn/chain33/system"
"github.com/33cn/chain33/types"
"github.com/33cn/chain33/util"
"github.com/33cn/chain33/util/testnode"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func init() {
types.Init("local", nil)
}
func addMainTx(priv crypto.PrivKey, api client.QueueProtocolAPI) (string, error) {
txs := util.GenCoinsTxs(priv, 1)
hash := common.ToHex(txs[0].Hash())
reply, err := api.SendTx(txs[0])
if err != nil {
return hash, err
}
if !reply.GetIsOk() {
return hash, errors.New("sendtx unknow error")
}
return hash, nil
}
//构造单笔para交易
func addSingleParaTx(priv crypto.PrivKey, api client.QueueProtocolAPI) (string, error) {
tx := util.CreateTxWithExecer(priv, "user.p.hyb.none")
hash := common.ToHex(tx.Hash())
reply, err := api.SendTx(tx)
if err != nil {
return hash, err
}
if !reply.GetIsOk() {
return hash, errors.New("sendtx unknow error")
}
return hash, nil
}
//构造para交易组
func addGroupParaTx(priv crypto.PrivKey, api client.QueueProtocolAPI, haveMainTx bool) (string, *types.ReplyStrings, error) {
var tx0 *types.Transaction
if haveMainTx {
tx0 = util.CreateTxWithExecer(priv, "coins")
} else {
tx0 = util.CreateTxWithExecer(priv, "user.p.hyb.coins")
}
tx1 := util.CreateTxWithExecer(priv, "user.p.hyb.token")
tx2 := util.CreateTxWithExecer(priv, "user.p.hyb.trade")
tx3 := util.CreateTxWithExecer(priv, "user.p.hyb.evm")
tx4 := util.CreateTxWithExecer(priv, "user.p.hyb.none")
var txs types.Transactions
txs.Txs = append(txs.Txs, tx0)
txs.Txs = append(txs.Txs, tx1)
txs.Txs = append(txs.Txs, tx2)
txs.Txs = append(txs.Txs, tx3)
txs.Txs = append(txs.Txs, tx4)
feeRate := types.GInt("MinFee")
group, err := types.CreateTxGroup(txs.Txs, feeRate)
if err != nil {
chainlog.Error("addGroupParaTx", "err", err.Error())
return "", nil, err
}
var txHashs types.ReplyStrings
for i, tx := range group.Txs {
group.SignN(i, int32(types.SECP256K1), priv)
txhash := common.ToHex(tx.Hash())
txHashs.Datas = append(txHashs.Datas, txhash)
}
newtx := group.Tx()
hash := common.ToHex(newtx.Hash())
reply, err := api.SendTx(newtx)
if err != nil {
return "", nil, err
}
if !reply.GetIsOk() {
return "", nil, errors.New("sendtx unknow error")
}
return hash, &txHashs, nil
}
func TestGetParaTxByTitle(t *testing.T) {
//log.SetLogLevel("crit")
mock33 := testnode.New("", nil)
defer mock33.Close()
blockchain := mock33.GetBlockChain()
chainlog.Info("TestGetParaTxByTitle begin --------------------")
//构造十个区块
curheight := blockchain.GetBlockHeight()
addblockheight := curheight + 10
_, err := blockchain.GetBlock(curheight)
if err != nil {
require.NoError(t, err)
}
for {
_, err = addMainTx(mock33.GetGenesisKey(), mock33.GetAPI())
require.NoError(t, err)
_, err = addSingleParaTx(mock33.GetGenesisKey(), mock33.GetAPI())
require.NoError(t, err)
//_, _, err = addGroupParaTx(mock33.GetGenesisKey(), mock33.GetAPI(), true)
//require.NoError(t, err)
_, _, err = addGroupParaTx(mock33.GetGenesisKey(), mock33.GetAPI(), false)
require.NoError(t, err)
curheight = blockchain.GetBlockHeight()
_, err = blockchain.GetBlock(curheight)
require.NoError(t, err)
if curheight >= addblockheight {
break
}
time.Sleep(sendTxWait)
}
var req types.ReqParaTxByTitle
req.Start = 0
req.End = curheight
req.Title = "user.p.hyb."
testgetParaTxByTitle(t, blockchain, &req, false, false, nil)
chainlog.Info("TestGetParaTxByTitle end --------------------")
}
func testgetParaTxByTitle(t *testing.T, blockchain *blockchain.BlockChain, req *types.ReqParaTxByTitle, isGroup bool, haveMainTx bool, hashs []string) {
ParaTxDetails, err := blockchain.GetParaTxByTitle(req)
require.NoError(t, err)
for i, txDetail := range ParaTxDetails.Items {
if txDetail != nil {
assert.Equal(t, txDetail.Header.Height, req.Start+int64(i))
//chainlog.Info("testgetParaTxByTitle:", "Height", txDetail.Header.Height)
for _, tx := range txDetail.TxDetails {
if tx != nil {
execer := string(tx.Tx.Execer)
if !strings.HasPrefix(execer, "user.p.hyb.") && tx.Tx.GetGroupCount() != 0 {
//chainlog.Info("testgetParaTxByTitle:maintxingroup", "tx", tx)
assert.Equal(t, tx.Receipt.Ty, int32(types.ExecOk))
} else {
assert.Equal(t, tx.Receipt.Ty, int32(types.ExecPack))
}
if tx.Proofs != nil {
roothash := merkle.GetMerkleRootFromBranch(tx.Proofs, tx.Tx.Hash(), tx.Index)
ok := bytes.Equal(roothash, txDetail.Header.GetHash())
assert.Equal(t, ok, false)
}
}
}
}
}
}
......@@ -101,6 +101,9 @@ func (chain *BlockChain) ProcRecvMsg() {
case types.EventGetValueByKey:
go chain.processMsg(msg, reqnum, chain.getValueByKey)
//通过平行链title获取平行链的交易
case types.EventGetParaTxByTitle:
go chain.processMsg(msg, reqnum, chain.getParaTxByTitle)
default:
go chain.processMsg(msg, reqnum, chain.unknowMsg)
}
......@@ -586,3 +589,15 @@ func (chain *BlockChain) getValueByKey(msg *queue.Message) {
values := chain.GetValueByKey(keys)
msg.Reply(chain.client.NewMessage("", types.EventLocalReplyValue, values))
}
//getParaTxByTitle //通过平行链title获取平行链的交易
func (chain *BlockChain) getParaTxByTitle(msg *queue.Message) {
req := (msg.Data).(*types.ReqParaTxByTitle)
reply, err := chain.GetParaTxByTitle(req)
if err != nil {
chainlog.Error("getParaTxByTitle", "req", req, "err", err.Error())
msg.Reply(chain.client.NewMessage("", types.EventReplyParaTxByTitle, err))
return
}
msg.Reply(chain.client.NewMessage("", types.EventReplyParaTxByTitle, reply))
}
......@@ -15,14 +15,15 @@ import (
//Task 任务
type Task struct {
sync.Mutex
cond *sync.Cond
start int64
end int64
isruning bool
ticker *time.Timer
timeout time.Duration
cb func()
donelist map[int64]struct{}
cond *sync.Cond
start int64
end int64
isruning bool
ticker *time.Timer
timeout time.Duration
cb func()
donelist map[int64]struct{}
timeoutcb func(height int64)
}
func newTask(timeout time.Duration) *Task {
......@@ -43,12 +44,15 @@ func (t *Task) tick() {
t.cond.L.Unlock()
_, ok := <-t.ticker.C
if !ok {
chainlog.Error("task is done", "timer is stop", t.start)
chainlog.Error("task is done", "timer ticker is stop", t.start)
continue
}
t.Lock()
if err := t.stop(false); err == nil {
chainlog.Debug("task is done", "timer is stop", t.start)
if t.timeoutcb != nil {
go t.timeoutcb(t.start)
}
chainlog.Debug("task is done", "timer timeout is stop", t.start)
}
t.Unlock()
}
......@@ -78,7 +82,7 @@ func (t *Task) TimerStop() {
}
//Start 计时器启动
func (t *Task) Start(start, end int64, cb func()) error {
func (t *Task) Start(start, end int64, cb func(), timeoutcb func(height int64)) error {
t.Lock()
defer t.Unlock()
if t.isruning {
......@@ -93,6 +97,7 @@ func (t *Task) Start(start, end int64, cb func()) error {
t.start = start
t.end = end
t.cb = cb
t.timeoutcb = timeoutcb
t.donelist = make(map[int64]struct{})
t.cond.Signal()
return nil
......
......@@ -17,7 +17,7 @@ func TestTask(t *testing.T) {
t.Log("task not start")
return
}
task.Start(1, 10, nil)
task.Start(1, 10, nil, nil)
perm := rand.Perm(10)
for i := 0; i < len(perm); i++ {
time.Sleep(time.Millisecond * 5)
......@@ -43,7 +43,7 @@ func TestTasks(t *testing.T) {
t.Log("task not start")
return
}
task.Start(1, 10, nil)
task.Start(1, 10, nil, nil)
perm := rand.Perm(10)
for i := 0; i < len(perm); i++ {
time.Sleep(time.Millisecond / 10)
......@@ -61,3 +61,35 @@ func TestTasks(t *testing.T) {
}
}
}
func TestTaskTimeOut(t *testing.T) {
task := newTask(time.Millisecond * 10)
if task.InProgress() {
task.Cancel()
t.Log("task not start")
return
}
timeoutcb := func(height int64) {
timeOutProc(height)
}
task.Start(1, 10, nil, timeoutcb)
perm := rand.Perm(10)
for i := 0; i < len(perm); i++ {
time.Sleep(time.Millisecond * 10)
task.Done(int64(perm[i]) + 1)
if i < len(perm)-1 && !task.InProgress() {
task.Cancel()
t.Log("task not done, but InProgress is false")
return
}
if i == len(perm)-1 && task.InProgress() {
task.Cancel()
t.Log("task is done, but InProgress is true")
return
}
}
}
func timeOutProc(height int64) {
chainlog.Info("timeOutProc", "height", height)
}
......@@ -3,7 +3,7 @@ FROM ubuntu:16.04
WORKDIR /root
COPY chain33 chain33
COPY chain33-cli chain33-cli
COPY chain33.toml ./
COPY chain33.toml chain33-solo.toml ./
RUN ./chain33-cli cert --host=127.0.0.1
......
......@@ -70,6 +70,7 @@ echo "CLI=$CLI"
####################
testtoml=chain33.toml
testtomlsolo=chain33-solo.toml
function base_init() {
......@@ -93,6 +94,7 @@ function base_init() {
# wallet
sed -i $sedfix 's/^minerdisable=.*/minerdisable=false/g' ${testtoml}
cp ${testtoml} ${testtomlsolo}
#consens
consens_init "solo"
......@@ -103,6 +105,9 @@ function consens_init() {
if [ "$1" == "solo" ]; then
sed -i $sedfix 's/^name="ticket"/name="solo"/g' ${testtoml}
sed -i $sedfix 's/^singleMode=false/singleMode=true/g' ${testtoml}
# only one node miner for solo miner
sed -i $sedfix 's/^minerstart=true/minerstart=false/g' ${testtomlsolo}
fi
}
......
......@@ -8,19 +8,24 @@ services:
chain32:
build:
context: .
command: ["/root/chain33", "-f", "/root/chain33-solo.toml"]
chain31:
build:
context: .
command: ["/root/chain33", "-f", "/root/chain33-solo.toml"]
chain30:
build:
context: .
command: ["/root/chain33", "-f", "/root/chain33-solo.toml"]
chain29:
build:
context: .
command: ["/root/chain33", "-f", "/root/chain33-solo.toml"]
chain28:
build:
context: .
command: ["/root/chain33", "-f", "/root/chain33-solo.toml"]
......@@ -174,6 +174,17 @@ func (m *mockBlockChain) SetQueueClient(q queue.Queue) {
} else {
msg.ReplyErr("request must be nil", types.ErrInvalidParam)
}
case types.EventGetParaTxByTitle:
if req, ok := msg.GetData().(*types.ReqParaTxByTitle); ok {
// just for cover
if req.Title == "user" {
msg.Reply(client.NewMessage(blockchainKey, types.EventReplyParaTxByTitle, &types.Reply{IsOk: false, Msg: []byte("not support")}))
} else {
msg.Reply(client.NewMessage(blockchainKey, types.EventReplyParaTxByTitle, &types.ParaTxDetails{}))
}
}
default:
msg.ReplyErr("Do not support", types.ErrNotSupport)
}
......
......@@ -522,6 +522,29 @@ func (_m *QueueProtocolAPI) GetNetInfo() (*types.NodeNetInfo, error) {
return r0, r1
}
// GetParaTxByTitle provides a mock function with given fields: param
func (_m *QueueProtocolAPI) GetParaTxByTitle(param *types.ReqParaTxByTitle) (*types.ParaTxDetails, error) {
ret := _m.Called(param)
var r0 *types.ParaTxDetails
if rf, ok := ret.Get(0).(func(*types.ReqParaTxByTitle) *types.ParaTxDetails); ok {
r0 = rf(param)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ParaTxDetails)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(*types.ReqParaTxByTitle) error); ok {
r1 = rf(param)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetProperFee provides a mock function with given fields: req
func (_m *QueueProtocolAPI) GetProperFee(req *types.ReqProperFee) (*types.ReplyProperFee, error) {
ret := _m.Called(req)
......
......@@ -1277,3 +1277,22 @@ func (q *QueueProtocol) GetMainSequenceByHash(param *types.ReqHash) (*types.Int6
}
return nil, types.ErrTypeAsset
}
//GetParaTxByTitle 通过seq以及title获取对应平行连的交易
func (q *QueueProtocol) GetParaTxByTitle(param *types.ReqParaTxByTitle) (*types.ParaTxDetails, error) {
if param == nil {
err := types.ErrInvalidParam
log.Error("GetParaTxByTitle", "Error", err)
return nil, err
}
msg, err := q.send(blockchainKey, types.EventGetParaTxByTitle, param)
if err != nil {
log.Error("GetParaTxByTitle", "Error", err.Error())
return nil, err
}
if reply, ok := msg.GetData().(*types.ParaTxDetails); ok {
return reply, nil
}
return nil, types.ErrTypeAsset
}
......@@ -938,6 +938,7 @@ func TestGRPC(t *testing.T) {
testIsSyncGRPC(t, &grpcMock)
testIsNtpClockSyncGRPC(t, &grpcMock)
testNetInfoGRPC(t, &grpcMock)
testGetParaTxByTitleGRPC(t, &grpcMock)
}
......@@ -1302,3 +1303,25 @@ func TestGetMainSeq(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(9999), seq1.Data)
}
func testGetParaTxByTitleGRPC(t *testing.T, rpc *mockGRPCSystem) {
var res types.ParaTxDetails
var req types.ReqParaTxByTitle
req.Start = 0
req.End = 0
req.Title = "user"
err := rpc.newRpcCtx("GetParaTxByTitle", &req, &res)
assert.NotNil(t, err)
req.Title = "user.p.para."
err = rpc.newRpcCtx("GetParaTxByTitle", &req, &res)
assert.Nil(t, err)
}
func TestGetParaTxByTitle(t *testing.T) {
q := client.QueueProtocol{}
_, err := q.GetParaTxByTitle(nil)
assert.NotNil(t, err)
}
......@@ -158,4 +158,6 @@ type QueueProtocolAPI interface {
ListSeqCallBack() (*types.BlockSeqCBs, error)
// types.EventGetSeqCBLastNum
GetSeqCallBackLastNum(param *types.ReqString) (*types.Int64, error)
// types.EventGetParaTxByTitle
GetParaTxByTitle(param *types.ReqParaTxByTitle) (*types.ParaTxDetails, error)
}
......@@ -327,6 +327,13 @@ func (c *GrpcCtx) Run() (err error) {
*c.Res.(*types.BlockSeq) = *reply
}
errRet = err
case "GetParaTxByTitle":
reply, err := rpc.GetParaTxByTitle(context.Background(), c.Params.(*types.ReqParaTxByTitle))
if err == nil {
*c.Res.(*types.ParaTxDetails) = *reply
}
errRet = err
default:
errRet = errors.New(fmt.Sprintf("Unsupport method %v", c.Method))
}
......
......@@ -233,8 +233,11 @@ func (d *DownloadJob) syncDownloadBlock(peer *Peer, inv *pb.Inventory, bchan cha
var p2pdata pb.P2PGetData
p2pdata.Version = d.p2pcli.network.node.nodeInfo.channelVersion
p2pdata.Invs = []*pb.Inventory{inv}
ctx, cancel := context.WithCancel(context.Background())
//主动取消grpc流, 即时释放资源
defer cancel()
beg := pb.Now()
resp, err := peer.mconn.gcli.GetData(context.Background(), &p2pdata, grpc.FailFast(true))
resp, err := peer.mconn.gcli.GetData(ctx, &p2pdata, grpc.FailFast(true))
P2pComm.CollectPeerStat(err, peer)
if err != nil {
log.Error("syncDownloadBlock", "GetData err", err.Error())
......
......@@ -116,8 +116,8 @@ func Test_processP2P(t *testing.T) {
<-subChan //query block
for !ltBlockCache.contains(blockHash) {
}
ltBlock := ltBlockCache.get(blockHash).(*types.Block)
assert.True(t, bytes.Equal(rootHash, merkle.CalcMerkleRoot(ltBlock.Txs)))
cpBlock := *ltBlockCache.get(blockHash).(*types.Block)
assert.True(t, bytes.Equal(rootHash, merkle.CalcMerkleRoot(cpBlock.Txs)))
//query tx
sendChan <- &versionData{rawData: &types.P2PQueryData{Value: &types.P2PQueryData_TxReq{TxReq: &types.P2PTxReq{TxHash: tx.Hash()}}}}
......@@ -147,7 +147,7 @@ func Test_processP2P(t *testing.T) {
<-subChan
assert.True(t, ltBlockCache.contains(blockHash))
ltBlock.TxHash = rootHash
cpBlock.TxHash = rootHash
sendChan <- &versionData{rawData: &types.P2PBlockTxReply{
BlockHash: blockHash,
Txs: txList[0:],
......
......@@ -386,3 +386,8 @@ func (g *Grpc) GetFork(ctx context.Context, in *pb.ReqKey) (*pb.Int64, error) {
}
return &pb.Int64{Data: pb.GetFork(string(in.Key))}, nil
}
// GetParaTxByTitle 通过seq以及title获取对应平行连的交易
func (g *Grpc) GetParaTxByTitle(ctx context.Context, in *pb.ReqParaTxByTitle) (*pb.ParaTxDetails, error) {
return g.cli.GetParaTxByTitle(in)
}
......@@ -5,6 +5,7 @@
package types
import (
"bytes"
"runtime"
"sync"
......@@ -63,6 +64,7 @@ func (block *Block) GetHeader() *Header {
head.Difficulty = block.Difficulty
head.StateHash = block.StateHash
head.TxCount = int64(len(block.Txs))
head.Hash = block.Hash()
return head
}
......@@ -186,3 +188,59 @@ func CheckSign(data []byte, execer string, sign *Signature) bool {
}
return pub.VerifyBytes(data, signbytes)
}
//FilterParaTxsByTitle 过滤指定title的平行链交易
//1,单笔平行连交易
//2,交易组中的平行连交易,需要将整个交易组都过滤出来
//目前暂时不返回单个交易的proof证明路径,
//后面会将平行链的交易组装到一起,构成一个子roothash。会返回子roothash的proof证明路径
func (blockDetail *BlockDetail) FilterParaTxsByTitle(title string) *ParaTxDetail {
var paraTx ParaTxDetail
paraTx.Header = blockDetail.Block.GetHeader()
for i := 0; i < len(blockDetail.Block.Txs); i++ {
tx := blockDetail.Block.Txs[i]
if IsSpecificParaExecName(title, string(tx.Execer)) {
//过滤交易组中的para交易,需要将整个交易组都过滤出来
if tx.GroupCount >= 2 {
txDetails, endIdx := blockDetail.filterParaTxGroup(tx, i)
paraTx.TxDetails = append(paraTx.TxDetails, txDetails...)
i = endIdx - 1
continue
}
//单笔para交易
var txDetail TxDetail
txDetail.Tx = tx
txDetail.Receipt = blockDetail.Receipts[i]
txDetail.Index = uint32(i)
paraTx.TxDetails = append(paraTx.TxDetails, &txDetail)
}
}
return &paraTx
}
//filterParaTxGroup 获取para交易所在交易组信息
func (blockDetail *BlockDetail) filterParaTxGroup(tx *Transaction, index int) ([]*TxDetail, int) {
var headIdx int
var txDetails []*TxDetail
for i := index; i >= 0; i-- {
if bytes.Equal(tx.Header, blockDetail.Block.Txs[i].Hash()) {
headIdx = i
break
}
}
endIdx := headIdx + int(tx.GroupCount)
for i := headIdx; i < endIdx; i++ {
var txDetail TxDetail
txDetail.Tx = blockDetail.Block.Txs[i]
txDetail.Receipt = blockDetail.Receipts[i]
txDetail.Index = uint32(i)
txDetails = append(txDetails, &txDetail)
}
return txDetails, endIdx
}
......@@ -2,9 +2,12 @@ package types
import (
"encoding/hex"
"strings"
"testing"
"github.com/33cn/chain33/common/address"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBlock(t *testing.T) {
......@@ -28,3 +31,84 @@ func TestBlock(t *testing.T) {
b.Txs = append(b.Txs, &Transaction{})
assert.Equal(t, false, b.CheckSign())
}
func TestFilterParaTxsByTitle(t *testing.T) {
to := "14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
//构造一个主链交易
maintx := &Transaction{Execer: []byte("coins"), Payload: []byte("none")}
maintx.To = to
maintx, err := FormatTx("coins", maintx)
require.NoError(t, err)
//构造一个平行链交易
execer := "user.p.hyb.none"
paratx := &Transaction{Execer: []byte(execer), Payload: []byte("none")}
paratx.To = address.ExecAddress(execer)
paratx, err = FormatTx(execer, paratx)
require.NoError(t, err)
//构造一个平行链交易组
execer1 := "user.p.hyb.coins"
tx1 := &Transaction{Execer: []byte(execer1), Payload: []byte("none")}
tx1.To = address.ExecAddress(execer1)
tx1, err = FormatTx(execer1, tx1)
require.NoError(t, err)
execer2 := "user.p.hyb.token"
tx2 := &Transaction{Execer: []byte(execer2), Payload: []byte("none")}
tx2.To = address.ExecAddress(execer2)
tx2, err = FormatTx(execer2, tx2)
require.NoError(t, err)
execer3 := "user.p.hyb.trade"
tx3 := &Transaction{Execer: []byte(execer3), Payload: []byte("none")}
tx3.To = address.ExecAddress(execer3)
tx3, err = FormatTx(execer3, tx3)
require.NoError(t, err)
var txs Transactions
txs.Txs = append(txs.Txs, tx1)
txs.Txs = append(txs.Txs, tx2)
txs.Txs = append(txs.Txs, tx3)
feeRate := GInt("MinFee")
group, err := CreateTxGroup(txs.Txs, feeRate)
require.NoError(t, err)
//构造一个有平行链交易的区块
block := &Block{}
block.Version = 0
block.Height = 0
block.BlockTime = 1
block.Difficulty = 1
block.Txs = append(block.Txs, maintx)
block.Txs = append(block.Txs, paratx)
block.Txs = append(block.Txs, group.Txs...)
blockdetal := &BlockDetail{}
blockdetal.Block = block
maintxreceipt := &ReceiptData{Ty: ExecOk}
paratxreceipt := &ReceiptData{Ty: ExecPack}
grouppara1receipt := &ReceiptData{Ty: ExecPack}
grouppara2receipt := &ReceiptData{Ty: ExecPack}
grouppara3receipt := &ReceiptData{Ty: ExecPack}
blockdetal.Receipts = append(blockdetal.Receipts, maintxreceipt)
blockdetal.Receipts = append(blockdetal.Receipts, paratxreceipt)
blockdetal.Receipts = append(blockdetal.Receipts, grouppara1receipt)
blockdetal.Receipts = append(blockdetal.Receipts, grouppara2receipt)
blockdetal.Receipts = append(blockdetal.Receipts, grouppara3receipt)
txDetail := blockdetal.FilterParaTxsByTitle("user.p.hyb.")
for _, tx := range txDetail.TxDetails {
if tx != nil {
execer := string(tx.Tx.Execer)
if !strings.HasPrefix(execer, "user.p.hyb.") && tx.Tx.GetGroupCount() != 0 {
assert.Equal(t, tx.Receipt.Ty, int32(ExecOk))
} else {
assert.Equal(t, tx.Receipt.Ty, int32(ExecPack))
}
}
}
}
......@@ -165,6 +165,9 @@ const (
//其他模块读写blockchain db事件
EventSetValueByKey = 304
EventGetValueByKey = 305
//通过平行链title获取平行链的交易
EventGetParaTxByTitle = 306
EventReplyParaTxByTitle = 307
)
var eventName = map[int]string{
......@@ -318,4 +321,6 @@ var eventName = map[int]string{
EventReplyMainSeqByHash: "EventReplyMainSeqByHash",
EventSetValueByKey: "EventSetValueByKey",
EventGetValueByKey: "EventGetValueByKey",
EventGetParaTxByTitle: "EventGetParaTxByTitle",
EventReplyParaTxByTitle: "EventReplyParaTxByTitle",
}
......@@ -732,6 +732,36 @@ func (_m *Chain33Client) GetMemPool(ctx context.Context, in *types.ReqGetMempool
return r0, r1
}
// GetParaTxByTitle provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetParaTxByTitle(ctx context.Context, in *types.ReqParaTxByTitle, opts ...grpc.CallOption) (*types.ParaTxDetails, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *types.ParaTxDetails
if rf, ok := ret.Get(0).(func(context.Context, *types.ReqParaTxByTitle, ...grpc.CallOption) *types.ParaTxDetails); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*types.ParaTxDetails)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *types.ReqParaTxByTitle, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetPeerInfo provides a mock function with given fields: ctx, in, opts
func (_m *Chain33Client) GetPeerInfo(ctx context.Context, in *types.ReqNil, opts ...grpc.CallOption) (*types.PeerList, error) {
_va := make([]interface{}, len(opts))
......
......@@ -206,4 +206,37 @@ message BlockSequences {
message ParaChainBlockDetail {
BlockDetail blockdetail = 1;
int64 sequence = 2;
}
\ No newline at end of file
}
// 定义para交易结构
message ParaTxDetails {
repeated ParaTxDetail items = 1;
}
// type:平行链交易所在区块add/del操作,方便平行链回滚
// header:平行链交易所在区块头信息
// txDetails:本区块中指定title平行链的所有交易
message ParaTxDetail {
int64 type = 1;
Header header = 2;
repeated TxDetail txDetails = 3;
}
//交易的详情:
// index:本交易在block中索引值,用于proof的证明
// tx:本交易内容
// receipt:本交易在主链的执行回执
// proofs:本交易hash在block中merkel中的路径
message TxDetail {
uint32 index = 1;
Transaction tx = 2;
ReceiptData receipt = 3;
repeated bytes proofs = 4;
}
//通过seq区间和title请求平行链的交易
message ReqParaTxByTitle {
int64 start = 1;
int64 end = 2;
string title = 3;
}
......@@ -147,4 +147,6 @@ service chain33 {
// 获取是否达到fork高度
rpc GetFork(ReqKey) returns (Int64) {}
//通过seq以及title获取对应平行连的交易
rpc GetParaTxByTitle(ReqParaTxByTitle) returns (ParaTxDetails) {}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment