Commit 017bcf9e authored by mdj33's avatar mdj33 Committed by vipwzw

commit review

parent 2e57d4c1
......@@ -32,26 +32,25 @@ const (
delAct int64 = 2 //reference blockstore.go, del para block action
minBlockNum = 100 //min block number startHeight before lastHeight in mainchain
)
var (
plog = log.New("module", "para")
grpcSite = "localhost:8802"
genesisBlockTime int64 = 1514533390
startHeight int64 //parachain sync from startHeight in mainchain
blockSec int64 = 5 //write block interval, second
emptyBlockInterval int64 = 4 //write empty block every interval blocks in mainchain
zeroHash [32]byte
genesisBlockTime int64 = 1514533390
//current miner tx take any privatekey for unify all nodes sign purpose, and para chain is free
minerPrivateKey = "6da92a632ab7deb67d38c0f6560bcfed28167998f6496db64c258d5e8393a81b"
searchHashMatchDepth int32 = 100
mainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
defaultGenesisAmount int64 = 1e8
poolMainBlockSec int64 = 5
defaultEmptyBlockInterval int64 = 4 //write empty block every interval blocks in mainchain
defaultSearchMatchedBlockDepth int32 = 10000
defaultMainBlockHashForkHeight int64 = 209186 //calc block hash fork height in main chain
mainParaSelfConsensusForkHeight int64 = types.MaxHeight //para chain self consensus height switch, must >= ForkParacrossCommitTx of main
mainForkParacrossCommitTx int64 = types.MaxHeight //support paracross commit tx fork height in main chain: ForkParacrossCommitTx
fetchFilterParaTxsEnable bool
batchFetchBlockCount int64 = 128
)
var (
plog = log.New("module", "para")
zeroHash [32]byte
)
func init() {
drivers.Reg("para", New)
drivers.QueryData.Register("para", &client{})
......@@ -99,41 +98,36 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
types.MustDecode(sub, &subcfg)
}
if subcfg.GenesisAmount <= 0 {
subcfg.GenesisAmount = 1e8
}
if subcfg.ParaRemoteGrpcClient != "" {
grpcSite = subcfg.ParaRemoteGrpcClient
subcfg.GenesisAmount = defaultGenesisAmount
}
if subcfg.StartHeight > 0 {
startHeight = subcfg.StartHeight
}
if subcfg.WriteBlockSeconds > 0 {
blockSec = subcfg.WriteBlockSeconds
if subcfg.WriteBlockSeconds <= 0 {
subcfg.WriteBlockSeconds = poolMainBlockSec
}
if subcfg.EmptyBlockInterval > 0 {
emptyBlockInterval = subcfg.EmptyBlockInterval
if subcfg.EmptyBlockInterval <= 0 {
subcfg.EmptyBlockInterval = defaultEmptyBlockInterval
}
if subcfg.SearchHashMatchedBlockDepth > 0 {
searchHashMatchDepth = subcfg.SearchHashMatchedBlockDepth
if subcfg.SearchHashMatchedBlockDepth <= 0 {
subcfg.SearchHashMatchedBlockDepth = defaultSearchMatchedBlockDepth
}
if subcfg.MainBlockHashForkHeight > 0 {
mainBlockHashForkHeight = subcfg.MainBlockHashForkHeight
if subcfg.MainBlockHashForkHeight <= 0 {
subcfg.MainBlockHashForkHeight = defaultMainBlockHashForkHeight
}
if subcfg.MainParaSelfConsensusForkHeight > 0 {
mainParaSelfConsensusForkHeight = subcfg.MainParaSelfConsensusForkHeight
if subcfg.MainParaSelfConsensusForkHeight <= 0 {
subcfg.MainParaSelfConsensusForkHeight = mainParaSelfConsensusForkHeight
}
if subcfg.MainForkParacrossCommitTx > 0 {
mainForkParacrossCommitTx = subcfg.MainForkParacrossCommitTx
if subcfg.MainForkParacrossCommitTx <= 0 {
subcfg.MainForkParacrossCommitTx = mainForkParacrossCommitTx
}
if subcfg.FetchFilterParaTxsEnable > 0 {
fetchFilterParaTxsEnable = true
}
if subcfg.BatchFetchBlockCount > 0 {
batchFetchBlockCount = subcfg.BatchFetchBlockCount
if subcfg.BatchFetchBlockCount <= 0 {
subcfg.BatchFetchBlockCount = batchFetchBlockCount
}
pk, err := hex.DecodeString(minerPrivateKey)
......@@ -163,26 +157,25 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
quitCreate: make(chan struct{}),
}
waitBlocks := int32(3) //缺省是3
if subcfg.WaitBlocks4CommitMsg > 0 {
waitBlocks = subcfg.WaitBlocks4CommitMsg
}
waitConsensTimes := uint32(30) //30*10s = 5min
if subcfg.WaitConsensStopTimes > 0 {
waitConsensTimes = subcfg.WaitConsensStopTimes
}
para.commitMsgClient = &commitMsgClient{
paraClient: para,
waitMainBlocks: waitBlocks,
waitConsensStopTimes: waitConsensTimes,
waitMainBlocks: waitBlocks4CommitMsg,
waitConsensStopTimes: waitConsensStopTimes,
consensHeight: -2,
sendingHeight: -1,
consensStartHeight: -1,
resetCh: make(chan interface{}, 1),
quit: make(chan struct{}),
}
if subcfg.WaitBlocks4CommitMsg > 0 {
para.commitMsgClient.waitMainBlocks = subcfg.WaitBlocks4CommitMsg
}
if subcfg.WaitConsensStopTimes > 0 {
para.commitMsgClient.waitConsensStopTimes = subcfg.WaitConsensStopTimes
}
// 设置平行链共识起始高度,在共识高度为-1也就是从未共识过的环境中允许从设置的非0起始高度开始共识
//note:只有在主链LoopCheckCommitTxDoneForkHeight之后才支持设置ParaConsensStartHeight
if subcfg.ParaConsensStartHeight > 0 {
para.commitMsgClient.consensStartHeight = subcfg.ParaConsensStartHeight - 1
......@@ -247,18 +240,18 @@ func (client *client) InitBlock() {
}
if block == nil {
if startHeight <= 0 {
panic(fmt.Sprintf("startHeight(%d) should be more than 0 in mainchain", startHeight))
if client.subCfg.StartHeight <= 0 {
panic(fmt.Sprintf("startHeight(%d) should be more than 0 in mainchain", client.subCfg.StartHeight))
}
//平行链创世区块对应主链hash为startHeight-1的那个block的hash
mainHash := client.GetStartMainHash(startHeight - 1)
mainHash := client.GetStartMainHash(client.subCfg.StartHeight - 1)
// 创世区块
newblock := &types.Block{}
newblock.Height = 0
newblock.BlockTime = genesisBlockTime
newblock.ParentHash = zeroHash[:]
newblock.MainHash = mainHash
newblock.MainHeight = startHeight - 1
newblock.MainHeight = client.subCfg.StartHeight - 1
tx := client.CreateGenesisTx()
newblock.Txs = tx
newblock.TxHash = merkle.CalcMerkleRoot(newblock.Txs)
......@@ -275,8 +268,8 @@ func (client *client) InitBlock() {
client.SetCurrentBlock(block)
}
plog.Debug("para consensus init parameter", "mainBlockHashForkHeight", mainBlockHashForkHeight)
plog.Debug("para consensus init parameter", "mainParaSelfConsensusForkHeight", mainParaSelfConsensusForkHeight)
plog.Debug("para consensus init parameter", "mainBlockHashForkHeight", client.subCfg.MainBlockHashForkHeight)
plog.Debug("para consensus init parameter", "mainParaSelfConsensusForkHeight", client.subCfg.MainParaSelfConsensusForkHeight)
}
......@@ -291,7 +284,7 @@ func (client *client) GetStartMainHash(height int64) []byte {
}
if height > 0 {
hint := time.NewTicker(5 * time.Second)
hint := time.NewTicker(time.Second * time.Duration(client.subCfg.WriteBlockSeconds))
for lastHeight < height+minBlockNum {
select {
case <-hint.C:
......@@ -329,6 +322,10 @@ func (client *client) CreateGenesisTx() (ret []*types.Transaction) {
return
}
func (client *client) isParaSelfConsensusForked(height int64) bool {
return height > client.subCfg.MainParaSelfConsensusForkHeight
}
func (client *client) ProcEvent(msg *queue.Message) bool {
return false
}
......
......@@ -190,7 +190,6 @@ func TestAddMinerTx(t *testing.T) {
priKey, err := secp.PrivKeyFromBytes(pk)
assert.Nil(t, err)
mainForkParacrossCommitTx = 1
block := &types.Block{}
_, filterTxs, _ := createTestTxs(t)
......@@ -200,7 +199,11 @@ func TestAddMinerTx(t *testing.T) {
MainHash: []byte("mainhash"),
Txs: filterTxs}
para := new(client)
para.subCfg = new(subConfig)
para.privateKey = priKey
para.commitMsgClient = new(commitMsgClient)
para.commitMsgClient.paraClient = para
para.blockSyncClient = new(BlockSyncClient)
para.blockSyncClient.paraClient = para
para.blockSyncClient.addMinerTx(nil, block, localBlock)
......
......@@ -25,9 +25,12 @@ import (
"github.com/pkg/errors"
)
var (
const (
consensusInterval = 10 //about 1 new block interval
minerInterval = 10 //5s的主块间隔后分叉概率增加,10s可以消除一些分叉回退
waitBlocks4CommitMsg int32 = 3
waitConsensStopTimes uint32 = 30 //30*10s = 5min
)
type commitMsgClient struct {
......@@ -393,7 +396,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
var rawTxs types.Transactions
for _, status := range notifications {
execName := pt.ParaX
if isParaSelfConsensusForked(status.MainBlockHeight) {
if client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight) {
execName = paracross.GetExecName()
}
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, feeRate)
......@@ -413,7 +416,7 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, error) {
execName := pt.ParaX
if isParaSelfConsensusForked(status.MainBlockHeight) {
if client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight) {
execName = paracross.GetExecName()
}
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, feeRate)
......@@ -498,10 +501,6 @@ out:
client.paraClient.wg.Done()
}
func isParaSelfConsensusForked(height int64) bool {
return height > mainParaSelfConsensusForkHeight
}
//当前未考虑获取key非常多失败的场景, 如果获取height非常多,block模块会比较大,但是使用完了就释放了
//如果有必要也可以考虑每次最多取20个一个txgroup,发送共识部分循环获取发送也没问题
func (client *commitMsgClient) getNodeStatus(start, end int64) ([]*pt.ParacrossNodeStatus, error) {
......@@ -705,7 +704,7 @@ func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, er
return nil, err
}
if isParaSelfConsensusForked(block.MainHeight) {
if client.paraClient.isParaSelfConsensusForked(block.MainHeight) {
//从本地查询共识高度
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
......@@ -736,7 +735,7 @@ func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, er
}
//本地共识高度对应主链高度一定要高于自共识高度,为了适配平行链共识高度不连续场景
if isParaSelfConsensusForked(statusMainHeight) {
if client.paraClient.isParaSelfConsensusForked(statusMainHeight) {
return resp, nil
}
}
......
......@@ -19,6 +19,10 @@ import (
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
)
var (
fetchFilterParaTxsEnable bool
)
func (client *client) addLocalBlock(height int64, block *pt.ParaLocalDbBlock) error {
set := &types.LocalDBSet{}
......@@ -89,7 +93,7 @@ func (client *client) getLastLocalHeight() (int64, error) {
if err != nil {
return -1, err
}
if value[0] == nil {
if len(value) == 0 {
return -1, types.ErrNotFound
}
......@@ -110,7 +114,7 @@ func (client *client) getLocalBlockByHeight(height int64) (*pt.ParaLocalDbBlock,
if err != nil {
return nil, err
}
if value[0] == nil {
if len(value) == 0 {
return nil, types.ErrNotFound
}
......@@ -212,7 +216,7 @@ func (client *client) getMatchedBlockOnChain(startHeight int64) (int64, *types.B
startHeight = lastBlock.Height
}
depth := searchHashMatchDepth
depth := client.subCfg.SearchHashMatchedBlockDepth
for height := startHeight; height > 0 && depth > 0; height-- {
block, err := client.GetBlockByHeight(height)
if err != nil {
......@@ -227,7 +231,7 @@ func (client *client) getMatchedBlockOnChain(startHeight int64) (int64, *types.B
if depth == 0 {
plog.Error("switchHashMatchedBlock depth overflow", "last info:mainHeight", block.MainHeight,
"mainHash", hex.EncodeToString(block.MainHash), "search startHeight", lastBlock.Height, "curHeight", height,
"search depth", searchHashMatchDepth)
"search depth", client.subCfg.SearchHashMatchedBlockDepth)
panic("search HashMatchedBlock overflow, re-setting search depth and restart to try")
}
if height == 1 {
......@@ -307,13 +311,13 @@ func (client *client) getBatchSeqCount(currSeq int64) (int64, error) {
}
if lastSeq > currSeq {
if lastSeq-currSeq > emptyBlockInterval {
if lastSeq-currSeq > client.subCfg.EmptyBlockInterval {
atomic.StoreInt32(&client.caughtUp, 0)
} else {
atomic.StoreInt32(&client.caughtUp, 1)
}
if fetchFilterParaTxsEnable && lastSeq-currSeq > batchFetchBlockCount {
return batchFetchBlockCount, nil
if fetchFilterParaTxsEnable && lastSeq-currSeq > client.subCfg.BatchFetchBlockCount {
return client.subCfg.BatchFetchBlockCount, nil
}
return 0, nil
}
......@@ -437,7 +441,7 @@ func (client *client) procLocalBlock(mainBlock *types.ParaTxDetail) (bool, error
} else if mainBlock.Type == addAct {
if len(txs) == 0 {
if lastSeqMainHeight-lastBlock.MainHeight < emptyBlockInterval {
if lastSeqMainHeight-lastBlock.MainHeight < client.subCfg.EmptyBlockInterval {
return false, nil
}
plog.Info("Create empty block")
......@@ -487,7 +491,7 @@ out:
if err == nil {
continue
}
time.Sleep(time.Second * time.Duration(blockSec))
time.Sleep(time.Second * time.Duration(client.subCfg.WriteBlockSeconds))
continue
}
......
......@@ -139,9 +139,9 @@ func (client *client) GetBlockOnMainBySeq(seq int64) (*types.BlockSeq, error) {
return nil, err
}
hash := blockSeq.Detail.Block.HashByForkHeight(mainBlockHashForkHeight)
hash := blockSeq.Detail.Block.HashByForkHeight(client.subCfg.MainBlockHashForkHeight)
if !bytes.Equal(blockSeq.Seq.Hash, hash) {
plog.Error("para compare ForkBlockHash fail", "forkHeight", mainBlockHashForkHeight,
plog.Error("para compare ForkBlockHash fail", "forkHeight", client.subCfg.MainBlockHashForkHeight,
"seqHash", hex.EncodeToString(blockSeq.Seq.Hash), "calcHash", hex.EncodeToString(hash))
return nil, types.ErrBlockHashNoMatch
}
......
......@@ -355,7 +355,7 @@ func (client *BlockSyncClient) addMinerTx(preStateHash []byte, block *types.Bloc
tx, err := pt.CreateRawMinerTx(&pt.ParacrossMinerAction{
Status: status,
IsSelfConsensus: isParaSelfConsensusForked(status.MainBlockHeight),
IsSelfConsensus: client.paraClient.isParaSelfConsensusForked(status.MainBlockHeight),
})
if err != nil {
return err
......
......@@ -40,10 +40,15 @@ func getPrivKey(t *testing.T) crypto.PrivKey {
}
func TestCalcCommitMsgTxs(t *testing.T) {
para := new(client)
para.subCfg = new(subConfig)
priKey := getPrivKey(t)
client := commitMsgClient{
client := &commitMsgClient{
privateKey: priKey,
paraClient: para,
}
para.commitMsgClient = client
nt1 := &pt.ParacrossNodeStatus{
Height: 1,
Title: "user.p.para",
......@@ -71,9 +76,8 @@ func TestCalcCommitMsgTxs(t *testing.T) {
}
func TestGetConsensusStatus(t *testing.T) {
mainFork := mainParaSelfConsensusForkHeight
mainParaSelfConsensusForkHeight = 1
para := new(client)
para.subCfg = new(subConfig)
grpcClient := &typesmocks.Chain33Client{}
//grpcClient.On("GetFork", mock.Anything, &types.ReqKey{Key: []byte("ForkBlockHash")}).Return(&types.Int64{Data: 1}, errors.New("err")).Once()
para.grpcClient = grpcClient
......@@ -93,12 +97,6 @@ func TestGetConsensusStatus(t *testing.T) {
Height: 1,
}
//msgx := &types.Message{types.Encode(status)}
//msg := types.Encode(status)
//reply := &types.Reply{
// IsOk: true,
// Msg: types.Encode(status),
//}
api.On("QueryChain", mock.Anything, mock.Anything, mock.Anything).Return(status, nil).Once()
detail := &types.BlockDetail{Block: block}
details := &types.BlockDetails{Items: []*types.BlockDetail{detail}}
......@@ -108,7 +106,6 @@ func TestGetConsensusStatus(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(1), ret.Height)
mainParaSelfConsensusForkHeight = mainFork
}
func TestSendCommitMsg(t *testing.T) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment