Unverified Commit 9c8dae93 authored by vipwzw's avatar vipwzw Committed by GitHub

Merge pull request #534 from mdj33/issue530_para_node_in_group

平行链账户更新挖矿
parents 25b3c68b d317fb1c
......@@ -147,12 +147,17 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
privateKey: priKey,
subCfg: &subcfg,
}
if subcfg.WaitBlocks4CommitMsg < 2 {
waitBlocks := int32(2) //最小是2
if subcfg.WaitBlocks4CommitMsg > 0 {
if subcfg.WaitBlocks4CommitMsg < waitBlocks {
panic("config WaitBlocks4CommitMsg should not less 2")
}
waitBlocks = subcfg.WaitBlocks4CommitMsg
}
para.commitMsgClient = &commitMsgClient{
paraClient: para,
waitMainBlocks: subcfg.WaitBlocks4CommitMsg,
waitMainBlocks: waitBlocks,
commitMsgNotify: make(chan int64, 1),
delMsgNotify: make(chan int64, 1),
mainBlockAdd: make(chan *types.BlockDetail, 1),
......@@ -583,9 +588,10 @@ func (client *client) CreateBlock() {
//system startup, take the last added block's seq is ok
currSeq, lastSeqMainHash, err := client.getLastBlockMainInfo()
if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err.Error())
plog.Error("Parachain CreateBlock getLastBlockMainInfo fail", "err", err.Error())
return
}
for {
//should be lastSeq but not LastBlockSeq as del block case the seq is not equal
lastSeq, err := client.GetLastSeq()
......
......@@ -9,6 +9,8 @@ import (
"context"
"time"
"strings"
"github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types"
......@@ -24,7 +26,7 @@ var (
type commitMsgClient struct {
paraClient *client
waitMainBlocks int32
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
commitMsgNotify chan int64
delMsgNotify chan int64
mainBlockAdd chan *types.BlockDetail
......@@ -35,19 +37,24 @@ type commitMsgClient struct {
quit chan struct{}
}
type commitConsensRsp struct {
status *pt.ParacrossStatus
authAccountIn bool //是否授权账户包含在node group addrs
}
func (client *commitMsgClient) handler() {
var isSync bool
var isRollback bool
var notification []int64 //记录每次系统重启后 min and current height
var finishHeight int64
var consensHeight int64
var finishHeight int64 = -1
var consensHeight int64 = -1
var sendingHeight int64 //当前发送的最大高度
var sendingMsgs []*pt.ParacrossNodeStatus
var readTick <-chan time.Time
var ticker *time.Ticker
client.paraClient.wg.Add(1)
consensusCh := make(chan *pt.ParacrossStatus, 1)
consensusCh := make(chan *commitConsensRsp, 1)
go client.getConsensusHeight(consensusCh)
client.paraClient.wg.Add(1)
......@@ -92,7 +99,6 @@ out:
if height <= finishHeight {
finishHeight = notification[0] - 1
}
isSync = false
isRollback = true
plog.Debug("para del block", "delHeight", height)
......@@ -145,9 +151,9 @@ out:
client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx
plog.Info("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
for i, msg := range sendingMsgs {
plog.Info("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
"blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
"from", client.paraClient.authAccount)
}
......@@ -156,11 +162,13 @@ out:
//获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
//一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1]
case rsp := <-consensusCh:
consensHeight = rsp.Height
consensHeight = rsp.status.Height
plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs),
"consensHeigt", rsp.Height, "finished", finishHeight, "sync", isSync, "miner", readTick != nil, "consensBlockHash", common.ToHex(rsp.BlockHash))
"consensHeight", rsp.status.Height, "finishHeight", finishHeight, "authIn", rsp.authAccountIn, "sync", isSync, "miner", readTick != nil)
plog.Debug("para consensus rcv", "consensBlockHash", common.ToHex(rsp.status.BlockHash))
if notification == nil || isRollback {
if notification == nil || isRollback || !rsp.authAccountIn {
isSync = false
continue
}
......@@ -169,7 +177,7 @@ out:
isSync = true
}
// 共识高度追赶上完成高度之后再发,不然分叉节点继续发浪费手续费
// 共识高度追赶上完成高度之后再发,不然继续发浪费手续费
if finishHeight > consensHeight {
isSync = false
}
......@@ -180,20 +188,14 @@ out:
//而分叉高度是交易组里面的某个高度
if finishHeight < consensHeight {
finishHeight = consensHeight
sendingMsgs = nil
client.currentTx = nil
}
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的
//最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit
//此处也整合了当前consensus height=-1 场景
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要重发
// 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
nextConsensHeight := consensHeight + 1
if nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight
finishHeight = consensHeight
sendingMsgs = nil
client.currentTx = nil
}
case miner := <-client.minerSwitch:
......@@ -511,7 +513,7 @@ func (client *commitMsgClient) mainSync() error {
}
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *pt.ParacrossStatus) {
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *commitConsensRsp) {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false
defer ticker.Stop()
......@@ -547,7 +549,16 @@ out:
if err != nil {
continue
}
consensusRst <- status
authExist := false
if client.paraClient.authAccount != "" {
nodes, err := client.getNodeGroupAddrs()
if err != nil {
continue
}
authExist = strings.Contains(nodes, client.paraClient.authAccount)
}
consensusRst <- &commitConsensRsp{status: status, authAccountIn: authExist}
}
}
......@@ -573,9 +584,18 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac
}
//开启自共识后也要等到自共识真正切换之后再使用,如果本地区块已经过了自共识高度,但自共识的高度还没达成,就会导致共识机制出错
if resp.Height > -1 {
req := &types.ReqBlocks{Start: resp.Height, End: resp.Height}
v, err := client.paraClient.GetAPI().GetBlocks(req)
if err != nil {
plog.Error("getConsensusHeight GetBlocks", "err", err.Error())
return nil, err
}
//本地共识高度对应主链高度一定要高于自共识高度,为了适配平行链共识高度不连续场景
if isParaSelfConsensusForked(v.Items[0].Block.MainHeight) {
return resp, nil
}
}
}
//去主链获取共识高度
reply, err := client.paraClient.grpcClient.QueryChain(context.Background(), &types.ChainExecutor{
......@@ -601,6 +621,26 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac
}
//node group会在主链和平行链都同时配置,只本地查询就可以
func (client *commitMsgClient) getNodeGroupAddrs() (string, error) {
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
FuncName: "GetNodeGroupAddrs",
Param: types.Encode(&pt.ReqParacrossNodeInfo{Title: types.GetTitle()}),
})
if err != nil {
plog.Error("commitmsg.getNodeGroupAddrs ", "err", err.Error())
return "", err
}
resp, ok := ret.(*types.ReplyConfig)
if !ok {
plog.Error("commitmsg.getNodeGroupAddrs rsp nok")
return "", err
}
return resp.Value, nil
}
func (client *commitMsgClient) onWalletStatus(status *types.WalletStatus) {
if status == nil || client.paraClient.authAccount == "" {
return
......
......@@ -105,8 +105,6 @@ writeBlockSeconds=2
emptyBlockInterval=50
#验证账户,验证节点需要配置自己的账户,并且钱包导入对应种子,非验证节点留空
authAccount=""
#等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitBlocks4CommitMsg=2
#云端主链节点切换后,平行链适配新主链节点block,回溯查找和自己记录的相同blockhash的深度
searchHashMatchedBlockDepth=10000
#创世地址额度
......
......@@ -241,6 +241,23 @@ func getDappForkHeight(forkKey string) int64 {
return forkHeight
}
func getConfigNodes(db dbm.KV, title string) (map[string]struct{}, []byte, error) {
key := calcParaNodeGroupAddrsKey(title)
nodes, _, err := getNodes(db, key)
if err != nil {
if errors.Cause(err) != pt.ErrTitleNotExist {
return nil, nil, errors.Wrapf(err, "getNodes para for title:%s", title)
}
key = calcManageConfigNodesKey(title)
nodes, _, err = getNodes(db, key)
if err != nil {
return nil, nil, errors.Wrapf(err, "getNodes manager for title:%s", title)
}
}
return nodes, key, nil
}
func (a *action) getNodesGroup(title string) (map[string]struct{}, error) {
if a.exec.GetMainHeight() < getDappForkHeight(pt.ForkCommitTx) {
nodes, _, err := getConfigManageNodes(a.db, title)
......@@ -250,18 +267,9 @@ func (a *action) getNodesGroup(title string) (map[string]struct{}, error) {
return nodes, nil
}
nodes, _, err := getParacrossNodes(a.db, title)
if err != nil {
if errors.Cause(err) != pt.ErrTitleNotExist {
return nil, errors.Wrapf(err, "getNodes para for title:%s", title)
}
nodes, _, err = getConfigManageNodes(a.db, title)
if err != nil {
return nil, errors.Wrapf(err, "getNodes manager for title:%s", title)
}
}
nodes, _, err := getConfigNodes(a.db, title)
return nodes, err
return nodes, nil
}
//根据nodes过滤掉可能退出了的addrs
......@@ -394,7 +402,11 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
if commit.Status.Height > titleStatus.Height+1 {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
//平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始
if !(types.IsPara() && titleStatus.Height == -1) {
paraSwitch, err := a.isParaSelfConsensSwitch(commit, titleStatus)
if err != nil {
return nil, err
}
if !paraSwitch {
return receipt, nil
}
}
......@@ -457,6 +469,32 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
return receipt, nil
}
//平行链自共识无缝切换条件:1,平行链没有共识过,2:commit高度是大于自共识分叉高度且上一次共识的主链高度小于自共识分叉高度,保证只运行一次,
// 这样在主链没有共识空洞前提下,平行链允许有条件的共识跳跃
func (a *action) isParaSelfConsensSwitch(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
if !types.IsPara() {
return false, nil
}
if titleStatus.Height == -1 {
return true, nil
}
selfConsensForkHeight := getDappForkHeight(pt.ParaSelfConsensForkHeight)
lastStatusMainHeight := int64(-1)
if titleStatus.Height > -1 {
stat, err := getTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, titleStatus.Height))
if err != nil {
clog.Error("paracross.Commit isParaSelfConsensSwitch getTitleHeight failed", "err", err.Error())
return false, err
}
lastStatusMainHeight = stat.MainHeight
}
return commit.Status.MainBlockHeight > selfConsensForkHeight && lastStatusMainHeight < selfConsensForkHeight, nil
}
func (a *action) execCrossTx(tx *types.TransactionDetail, commit *pt.ParacrossCommitAction, crossTxHash []byte) (*types.Receipt, error) {
if !bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) {
return nil, nil
......
......@@ -63,10 +63,11 @@ func (p *Paracross) Query_GetTitleByHash(in *pt.ReqParacrossTitleHash) (types.Me
//Query_GetNodeGroupAddrs get node group addrs
func (p *Paracross) Query_GetNodeGroupAddrs(in *pt.ReqParacrossNodeInfo) (types.Message, error) {
if in == nil {
if in == nil || in.GetTitle() == "" {
return nil, types.ErrInvalidParam
}
ret, _, err := getParacrossNodes(p.GetStateDB(), in.GetTitle())
ret, key, err := getConfigNodes(p.GetStateDB(), in.GetTitle())
if err != nil {
return nil, errors.Cause(err)
}
......@@ -75,7 +76,7 @@ func (p *Paracross) Query_GetNodeGroupAddrs(in *pt.ReqParacrossNodeInfo) (types.
nodes = append(nodes, k)
}
var reply types.ReplyConfig
reply.Key = string(calcParaNodeGroupAddrsKey(in.GetTitle()))
reply.Key = string(key)
reply.Value = fmt.Sprint(nodes)
return &reply, nil
}
......
......@@ -18,6 +18,8 @@ var (
glog = log.New("module", ParaX)
// ForkCommitTx main chain support paracross commit tx
ForkCommitTx = "ForkParacrossCommitTx"
// ParaSelfConsensForkHeight para self consens height string
ParaSelfConsensForkHeight = "MainParaSelfConsensusForkHeight"
)
func init() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment