Commit 881d1453 authored by mdj33's avatar mdj33 Committed by vipwzw

fix ut

parent a0fd675b
......@@ -5,16 +5,16 @@
package para
import (
"encoding/hex"
"fmt"
"sync"
"time"
"encoding/hex"
log "github.com/33cn/chain33/common/log/log15"
"sync/atomic"
"time"
"github.com/33cn/chain33/client/api"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/common/merkle"
......@@ -60,18 +60,17 @@ func init() {
type client struct {
*drivers.BaseClient
grpcClient types.Chain33Client
execAPI api.ExecutorAPI
isCaughtUp int32
commitMsgClient *commitMsgClient
authAccount string
privateKey crypto.PrivKey
wg sync.WaitGroup
subCfg *subConfig
mtx sync.Mutex
grpcClient types.Chain33Client
execAPI api.ExecutorAPI
isCaughtUp int32
commitMsgClient *commitMsgClient
authAccount string
privateKey crypto.PrivKey
wg sync.WaitGroup
subCfg *subConfig
syncCaughtUpAtom int32
localChangeAtom int32
quitCreate chan struct{}
}
type subConfig struct {
......@@ -165,6 +164,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
authAccount: subcfg.AuthAccount,
privateKey: priKey,
subCfg: &subcfg,
quitCreate: make(chan struct{}),
}
waitBlocks := int32(2) //最小是2
......@@ -203,6 +203,7 @@ func (client *client) CheckBlock(parent *types.Block, current *types.BlockDetail
func (client *client) Close() {
client.BaseClient.Close()
close(client.commitMsgClient.quit)
close(client.quitCreate)
client.wg.Wait()
plog.Info("consensus para closed")
}
......@@ -215,6 +216,7 @@ func (client *client) SetQueueClient(c queue.Client) {
go client.EventLoop()
client.wg.Add(1)
go client.commitMsgClient.handler()
client.wg.Add(1)
go client.CreateBlock()
go client.SyncBlocks()
}
......@@ -230,7 +232,11 @@ func (client *client) InitBlock() {
}
if block == nil {
mainHash := client.GetStartMainHash(startHeight)
if startHeight <= 0 {
panic(fmt.Sprintf("startHeight(%d) should be more than 0 in mainchain", startHeight))
}
//平行链创世区块对应主链hash为startHeight-1的那个block的hash
mainHash := client.GetStartMainHash(startHeight - 1)
// 创世区块
newblock := &types.Block{}
newblock.Height = 0
......@@ -259,41 +265,39 @@ func (client *client) InitBlock() {
}
// GetStartMainHash get StartMainHash in mainchain
// GetStartMainHash 获取start
func (client *client) GetStartMainHash(height int64) []byte {
if height <= 0 {
panic(fmt.Sprintf("startHeight(%d) should be more than 0 in mainchain", height))
}
lastHeight, err := client.GetLastHeightOnMainChain()
if err != nil {
panic(err)
}
if lastHeight < height && lastHeight > 0 {
if lastHeight < height {
panic(fmt.Sprintf("lastHeight(%d) less than startHeight(%d) in mainchain", lastHeight, height))
}
hint := time.NewTicker(5 * time.Second)
for lastHeight < height+minBlockNum {
select {
case <-hint.C:
plog.Info("Waiting lastHeight increase......", "lastHeight", lastHeight, "startHeight", height)
default:
lastHeight, err = client.GetLastHeightOnMainChain()
if err != nil {
panic(err)
if height > 0 {
hint := time.NewTicker(5 * time.Second)
for lastHeight < height+minBlockNum {
select {
case <-hint.C:
plog.Info("Waiting lastHeight increase......", "lastHeight", lastHeight, "startHeight", height)
default:
lastHeight, err = client.GetLastHeightOnMainChain()
if err != nil {
panic(err)
}
time.Sleep(time.Second)
}
time.Sleep(time.Second)
}
hint.Stop()
plog.Info(fmt.Sprintf("lastHeight more than %d blocks after startHeight", minBlockNum), "lastHeight", lastHeight, "startHeight", height)
}
hint.Stop()
plog.Info(fmt.Sprintf("lastHeight more than %d blocks after startHeight", minBlockNum), "lastHeight", lastHeight, "startHeight", height)
seq, hash, err := client.GetSeqByHeightOnMainChain(height - 1)
hash, err := client.GetHashByHeightOnMainChain(height)
if err != nil {
panic(err)
}
plog.Info("the start sequence in mainchain", "startHeight", height, "startSeq", seq)
plog.Info("the start hash in mainchain", "height", height, "hash", hex.EncodeToString(hash))
return hash
}
......
......@@ -208,20 +208,16 @@ func TestAddMinerTx(t *testing.T) {
mainForkParacrossCommitTx = 1
block := &types.Block{}
mainDetail, filterTxs, allTxs := createTestTxs(t)
mainBlock := &types.BlockSeq{
Seq: &types.BlockSequence{},
Detail: mainDetail}
_, filterTxs, _ := createTestTxs(t)
localBlock := &pt.ParaLocalDbBlock{
Height: 1,
MainHeight: 10,
MainHash: []byte("mainhash"),
Txs: filterTxs}
para := new(client)
para.privateKey = priKey
para.addMinerTx(nil, block, mainBlock, allTxs)
ret := checkTxInMainBlock(filterTxs[0], mainDetail)
assert.True(t, ret)
tx2, _ := createCrossMainTx("toA")
ret = checkTxInMainBlock(tx2, mainDetail)
assert.False(t, ret)
para.addMinerTx(nil, block, localBlock)
assert.Equal(t,1,len(block.Txs))
}
......@@ -251,16 +247,10 @@ func TestGetLastBlockInfo(t *testing.T) {
qClient.On("Wait", mock.Anything).Return(msg, nil)
api.On("GetMainSequenceByHash", mock.Anything).Return(&types.Int64{Data: int64(1)}, nil)
mainBlock := &types.Block{ParentHash: []byte("phash")}
mainDetail := &types.BlockDetail{Block: mainBlock}
blocks := &types.BlockDetails{}
blocks.Items = append(blocks.Items, mainDetail)
grpcClient.On("GetBlockByHashes", mock.Anything, mock.Anything).Return(blocks, nil)
grpcClient.On("GetSequenceByHash", mock.Anything, mock.Anything).Return(&types.Int64{Data: int64(10)}, nil)
mainSeq, hash, err := para.getLastBlockMainInfo()
mainSeq, lastBlock, err := para.getLastBlockMainInfo()
assert.NoError(t, err)
assert.Equal(t, int64(9), mainSeq)
assert.Equal(t, []byte("phash"), hash)
assert.Equal(t, int64(10), mainSeq)
assert.Equal(t, lastBlock.Height, block.Height)
}
......@@ -104,7 +104,7 @@ func (client *commitMsgClient) clearSendingTx() {
}
func (client *commitMsgClient) procSendTx() {
plog.Info("para commitMsg---send", "chainHeight", atomic.LoadInt64(&client.chainHeight), "sendingHeight", client.sendingHeight,
plog.Info("para commitMsg---status", "chainHeight", atomic.LoadInt64(&client.chainHeight), "sendingHeight", client.sendingHeight,
"consensHeight", atomic.LoadInt64(&client.consensHeight), "isSendingTx", client.isSendingCommitMsg(), "sync", client.isSync())
if client.isSendingCommitMsg() || !client.isSync() {
......@@ -156,7 +156,7 @@ func (client *commitMsgClient) isSync() bool {
}
if atomic.LoadInt32(&client.minerSwitch) != 1 {
plog.Info("para is not Sync", "minerSwitch", atomic.LoadInt32(&client.minerSwitch))
plog.Info("para is not Sync", "isMiner", atomic.LoadInt32(&client.minerSwitch))
return false
}
......@@ -170,6 +170,11 @@ func (client *commitMsgClient) isSync() bool {
return false
}
if atomic.LoadInt32(&client.paraClient.syncCaughtUpAtom) != 1 {
plog.Info("para is not Sync", "syncCaughtUpAtom", atomic.LoadInt32(&client.paraClient.syncCaughtUpAtom))
return false
}
return true
}
......
......@@ -573,43 +573,52 @@ func (client *client) CreateBlock() {
return
}
currSeq := lastSeq + 1
out:
for {
count, err := client.getBatchFetchSeqCount(currSeq)
if err != nil {
currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
if err == nil {
select {
case <-client.quitCreate:
break out
default:
count, err := client.getBatchFetchSeqCount(currSeq)
if err != nil {
currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
if err == nil {
continue
}
time.Sleep(time.Second * time.Duration(blockSec))
continue
}
time.Sleep(time.Second * time.Duration(blockSec))
continue
}
plog.Info("Parachain CreateBlock", "curSeq", currSeq, "count", count, "lastSeqMainHash", common.ToHex(lastSeqMainHash))
paraTxs, err := client.RequestTx(currSeq, count, lastSeqMainHash)
if err != nil {
currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
continue
}
plog.Info("Parachain CreateBlock", "curSeq", currSeq, "count", count, "lastSeqMainHash", common.ToHex(lastSeqMainHash))
paraTxs, err := client.RequestTx(currSeq, count, lastSeqMainHash)
if err != nil {
currSeq, lastSeqMainHash, err = client.processHashNotMatchError(currSeq, lastSeqMainHash, err)
continue
}
if count+1 != int64(len(paraTxs.Items)) {
plog.Error("para CreateBlock count not match", "count", count+1, "items", len(paraTxs.Items))
continue
}
if count+1 != int64(len(paraTxs.Items)) {
plog.Error("para CreateBlock count not match", "count", count+1, "items", len(paraTxs.Items))
continue
}
err = client.procLocalBlocks(paraTxs)
if err != nil {
//根据localblock,重新搜索匹配
lastSeqMainHash = nil
plog.Error("para CreateBlock.procLocalBlocks", "err", err.Error())
continue
}
err = client.procLocalBlocks(paraTxs)
if err != nil {
//根据localblock,重新搜索匹配
lastSeqMainHash = nil
plog.Error("para CreateBlock.procLocalBlocks", "err", err.Error())
continue
}
//重新设定seq和lastSeqMainHash
lastSeqMainHash = paraTxs.Items[count].Header.Hash
if paraTxs.Items[count].Type == delAct {
lastSeqMainHash = paraTxs.Items[count].Header.ParentHash
}
currSeq = currSeq + count + 1
//重新设定seq和lastSeqMainHash
lastSeqMainHash = paraTxs.Items[count].Header.Hash
if paraTxs.Items[count].Type == delAct {
lastSeqMainHash = paraTxs.Items[count].Header.ParentHash
}
currSeq = currSeq + count + 1
}
}
client.wg.Done()
}
......@@ -106,8 +106,8 @@ func TestSendCommitMsg(t *testing.T) {
commitCli.quit = make(chan struct{})
commitCli.paraClient.wg.Add(1)
sendMsgCh := make(chan *types.Transaction, 1)
go commitCli.sendCommitMsg(sendMsgCh)
commitCli.sendMsgCh = make(chan *types.Transaction, 1)
go commitCli.sendCommitMsg()
//reply := &types.Reply{
// IsOk: true,
......@@ -116,7 +116,7 @@ func TestSendCommitMsg(t *testing.T) {
grpcClient.On("SendTransaction", mock.Anything, mock.Anything).Return(nil, types.ErrNotFound).Twice()
tx := &types.Transaction{}
sendMsgCh <- tx
commitCli.sendMsgCh <- tx
time.Sleep(3 * time.Second)
//para.BaseClient.Close()
......
......@@ -126,7 +126,7 @@ func FilterTxsForPara(title string, main *types.BlockDetail) []*types.Transactio
return txs
}
//FilterTxsForPara include some main tx in tx group before ForkParacrossCommitTx
//FilterTxsForParaPlus include some main tx in tx group before ForkParacrossCommitTx
func FilterTxsForParaPlus(title string, main *pt.ParaTxDetail) []*types.Transaction {
var txs []*types.Transaction
forkHeight := pt.GetDappForkHeight(pt.ForkCommitTx)
......
......@@ -101,7 +101,7 @@ targetTimePerBlock = 16
#主链节点的grpc服务器ip,当前可以支持多ip负载均衡,如“101.37.227.226:8802,39.97.20.242:8802,47.107.15.126:8802,jiedian2.33.cn”
ParaRemoteGrpcClient=""
#主链指定高度的区块开始同步
startHeight=0
startHeight=1
#打包时间间隔,单位秒
writeBlockSeconds=2
#主链每隔几个没有相关交易的区块,平行链上打包空区块
......@@ -116,6 +116,7 @@ searchHashMatchedBlockDepth=10000
genesisAmount=100000000
MainBlockHashForkHeight=1
MainForkParacrossCommitTx=1
MainLoopCheckCommitTxDoneForkHeight=1
[store]
name="mavl"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment