Commit 0055832c authored by mdj33's avatar mdj33

self consense fork

parent c2820b66
...@@ -147,12 +147,17 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -147,12 +147,17 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
privateKey: priKey, privateKey: priKey,
subCfg: &subcfg, subCfg: &subcfg,
} }
if subcfg.WaitBlocks4CommitMsg < 2 {
waitBlocks := int32(2) //最小是2
if subcfg.WaitBlocks4CommitMsg > 0 {
if subcfg.WaitBlocks4CommitMsg < waitBlocks {
panic("config WaitBlocks4CommitMsg should not less 2") panic("config WaitBlocks4CommitMsg should not less 2")
} }
waitBlocks = subcfg.WaitBlocks4CommitMsg
}
para.commitMsgClient = &commitMsgClient{ para.commitMsgClient = &commitMsgClient{
paraClient: para, paraClient: para,
waitMainBlocks: subcfg.WaitBlocks4CommitMsg, waitMainBlocks: waitBlocks,
commitMsgNotify: make(chan int64, 1), commitMsgNotify: make(chan int64, 1),
delMsgNotify: make(chan int64, 1), delMsgNotify: make(chan int64, 1),
mainBlockAdd: make(chan *types.BlockDetail, 1), mainBlockAdd: make(chan *types.BlockDetail, 1),
...@@ -583,9 +588,10 @@ func (client *client) CreateBlock() { ...@@ -583,9 +588,10 @@ func (client *client) CreateBlock() {
//system startup, take the last added block's seq is ok //system startup, take the last added block's seq is ok
currSeq, lastSeqMainHash, err := client.getLastBlockMainInfo() currSeq, lastSeqMainHash, err := client.getLastBlockMainInfo()
if err != nil { if err != nil {
plog.Error("Parachain getLastBlockInfo fail", "err", err.Error()) plog.Error("Parachain CreateBlock getLastBlockMainInfo fail", "err", err.Error())
return return
} }
for { for {
//should be lastSeq but not LastBlockSeq as del block case the seq is not equal //should be lastSeq but not LastBlockSeq as del block case the seq is not equal
lastSeq, err := client.GetLastSeq() lastSeq, err := client.GetLastSeq()
......
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"context" "context"
"time" "time"
"strings"
"github.com/33cn/chain33/common" "github.com/33cn/chain33/common"
"github.com/33cn/chain33/common/crypto" "github.com/33cn/chain33/common/crypto"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
...@@ -24,7 +26,7 @@ var ( ...@@ -24,7 +26,7 @@ var (
type commitMsgClient struct { type commitMsgClient struct {
paraClient *client paraClient *client
waitMainBlocks int32 waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
commitMsgNotify chan int64 commitMsgNotify chan int64
delMsgNotify chan int64 delMsgNotify chan int64
mainBlockAdd chan *types.BlockDetail mainBlockAdd chan *types.BlockDetail
...@@ -35,19 +37,24 @@ type commitMsgClient struct { ...@@ -35,19 +37,24 @@ type commitMsgClient struct {
quit chan struct{} quit chan struct{}
} }
type commitConsensRsp struct {
status *pt.ParacrossStatus
authAccountIn bool //是否授权账户包含在node group addrs
}
func (client *commitMsgClient) handler() { func (client *commitMsgClient) handler() {
var isSync bool var isSync bool
var isRollback bool var isRollback bool
var notification []int64 //记录每次系统重启后 min and current height var notification []int64 //记录每次系统重启后 min and current height
var finishHeight int64 var finishHeight int64 = -1
var consensHeight int64 var consensHeight int64 = -1
var sendingHeight int64 //当前发送的最大高度 var sendingHeight int64 //当前发送的最大高度
var sendingMsgs []*pt.ParacrossNodeStatus var sendingMsgs []*pt.ParacrossNodeStatus
var readTick <-chan time.Time var readTick <-chan time.Time
var ticker *time.Ticker var ticker *time.Ticker
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
consensusCh := make(chan *pt.ParacrossStatus, 1) consensusCh := make(chan *commitConsensRsp, 1)
go client.getConsensusHeight(consensusCh) go client.getConsensusHeight(consensusCh)
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
...@@ -92,7 +99,6 @@ out: ...@@ -92,7 +99,6 @@ out:
if height <= finishHeight { if height <= finishHeight {
finishHeight = notification[0] - 1 finishHeight = notification[0] - 1
} }
isSync = false
isRollback = true isRollback = true
plog.Debug("para del block", "delHeight", height) plog.Debug("para del block", "delHeight", height)
...@@ -145,9 +151,9 @@ out: ...@@ -145,9 +151,9 @@ out:
client.checkTxCommitTimes = 0 client.checkTxCommitTimes = 0
sendMsgCh <- client.currentTx sendMsgCh <- client.currentTx
plog.Info("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer)) plog.Debug("paracommitmsg sending", "txhash", common.ToHex(signTx.Hash()), "exec", string(signTx.Execer))
for i, msg := range sendingMsgs { for i, msg := range sendingMsgs {
plog.Info("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight, plog.Debug("paracommitmsg sending", "idx", i, "height", msg.Height, "mainheight", msg.MainBlockHeight,
"blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash), "blockhash", common.HashHex(msg.BlockHash), "mainHash", common.HashHex(msg.MainBlockHash),
"from", client.paraClient.authAccount) "from", client.paraClient.authAccount)
} }
...@@ -156,11 +162,13 @@ out: ...@@ -156,11 +162,13 @@ out:
//获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度 //获取正在共识的高度,同步有两层意思,一个是主链跟其他节点完成了同步,另一个是当前平行链节点的高度追赶上了共识高度
//一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1] //一般来说高度增长从小到大: notifiy[0] -- selfConsensusHeight(mainHeight) -- finishHeight -- sendingHeight -- notify[1]
case rsp := <-consensusCh: case rsp := <-consensusCh:
consensHeight = rsp.Height consensHeight = rsp.status.Height
plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs), plog.Info("para consensus rcv", "notify", notification, "sending", len(sendingMsgs),
"consensHeigt", rsp.Height, "finished", finishHeight, "sync", isSync, "miner", readTick != nil, "consensBlockHash", common.ToHex(rsp.BlockHash)) "consensHeight", rsp.status.Height, "finishHeight", finishHeight, "authIn", rsp.authAccountIn, "sync", isSync, "miner", readTick != nil)
plog.Debug("para consensus rcv", "consensBlockHash", common.ToHex(rsp.status.BlockHash))
if notification == nil || isRollback { if notification == nil || isRollback || !rsp.authAccountIn {
isSync = false
continue continue
} }
...@@ -169,7 +177,7 @@ out: ...@@ -169,7 +177,7 @@ out:
isSync = true isSync = true
} }
// 共识高度追赶上完成高度之后再发,不然分叉节点继续发浪费手续费 // 共识高度追赶上完成高度之后再发,不然继续发浪费手续费
if finishHeight > consensHeight { if finishHeight > consensHeight {
isSync = false isSync = false
} }
...@@ -180,20 +188,14 @@ out: ...@@ -180,20 +188,14 @@ out:
//而分叉高度是交易组里面的某个高度 //而分叉高度是交易组里面的某个高度
if finishHeight < consensHeight { if finishHeight < consensHeight {
finishHeight = consensHeight finishHeight = consensHeight
sendingMsgs = nil
client.currentTx = nil
} }
//系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要把从当前共识高度到完成的 //系统每次重启都有检查一次共识,如果共识高度落后于系统起来后完成的第一个高度或最小高度,说明可能有共识空洞,需要重发
//最大高度重发一遍,直到确认收到,发过的最小到最大高度也要重发是因为之前空洞原因共识不连续,即便满足2/3节点也不会增长,需要重发来触发commit
//此处也整合了当前consensus height=-1 场景
// 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的 // 需要是<而不是<=, 因为notification[0]被认为是系统起来后已经发送过的
nextConsensHeight := consensHeight + 1 nextConsensHeight := consensHeight + 1
if nextConsensHeight < notification[0] { if nextConsensHeight < notification[0] {
notification[0] = nextConsensHeight notification[0] = nextConsensHeight
finishHeight = consensHeight finishHeight = consensHeight
sendingMsgs = nil
client.currentTx = nil
} }
case miner := <-client.minerSwitch: case miner := <-client.minerSwitch:
...@@ -511,7 +513,7 @@ func (client *commitMsgClient) mainSync() error { ...@@ -511,7 +513,7 @@ func (client *commitMsgClient) mainSync() error {
} }
func (client *commitMsgClient) getConsensusHeight(consensusRst chan *pt.ParacrossStatus) { func (client *commitMsgClient) getConsensusHeight(consensusRst chan *commitConsensRsp) {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval)) ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false isSync := false
defer ticker.Stop() defer ticker.Stop()
...@@ -547,7 +549,16 @@ out: ...@@ -547,7 +549,16 @@ out:
if err != nil { if err != nil {
continue continue
} }
consensusRst <- status
authExist := false
if client.paraClient.authAccount != "" {
nodes, err := client.getNodeGroupAddrs()
if err != nil {
continue
}
authExist = strings.Contains(nodes, client.paraClient.authAccount)
}
consensusRst <- &commitConsensRsp{status: status, authAccountIn: authExist}
} }
} }
...@@ -573,9 +584,18 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac ...@@ -573,9 +584,18 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac
} }
//开启自共识后也要等到自共识真正切换之后再使用,如果本地区块已经过了自共识高度,但自共识的高度还没达成,就会导致共识机制出错 //开启自共识后也要等到自共识真正切换之后再使用,如果本地区块已经过了自共识高度,但自共识的高度还没达成,就会导致共识机制出错
if resp.Height > -1 { if resp.Height > -1 {
req := &types.ReqBlocks{Start: resp.Height, End: resp.Height}
v, err := client.paraClient.GetAPI().GetBlocks(req)
if err != nil {
plog.Error("getConsensusHeight GetBlocks", "err", err.Error())
return nil, err
}
//本地共识高度对应主链高度一定要高于自共识高度,为了适配平行链共识高度不连续场景
if isParaSelfConsensusForked(v.Items[0].Block.MainHeight) {
return resp, nil return resp, nil
} }
} }
}
//去主链获取共识高度 //去主链获取共识高度
reply, err := client.paraClient.grpcClient.QueryChain(context.Background(), &types.ChainExecutor{ reply, err := client.paraClient.grpcClient.QueryChain(context.Background(), &types.ChainExecutor{
...@@ -601,6 +621,26 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac ...@@ -601,6 +621,26 @@ func (client *commitMsgClient) getConsensusStatus(block *types.Block) (*pt.Parac
} }
//node group会在主链和平行链都同时配置,只本地查询就可以
func (client *commitMsgClient) getNodeGroupAddrs() (string, error) {
ret, err := client.paraClient.GetAPI().QueryChain(&types.ChainExecutor{
Driver: "paracross",
FuncName: "GetNodeGroupAddrs",
Param: types.Encode(&pt.ReqParacrossNodeInfo{Title: types.GetTitle()}),
})
if err != nil {
plog.Error("commitmsg.getNodeGroupAddrs ", "err", err.Error())
return "", err
}
resp, ok := ret.(*types.ReplyConfig)
if !ok {
plog.Error("commitmsg.getNodeGroupAddrs rsp nok")
return "", err
}
return resp.Value, nil
}
func (client *commitMsgClient) onWalletStatus(status *types.WalletStatus) { func (client *commitMsgClient) onWalletStatus(status *types.WalletStatus) {
if status == nil || client.paraClient.authAccount == "" { if status == nil || client.paraClient.authAccount == "" {
return return
......
...@@ -105,8 +105,6 @@ writeBlockSeconds=2 ...@@ -105,8 +105,6 @@ writeBlockSeconds=2
emptyBlockInterval=50 emptyBlockInterval=50
#验证账户,验证节点需要配置自己的账户,并且钱包导入对应种子,非验证节点留空 #验证账户,验证节点需要配置自己的账户,并且钱包导入对应种子,非验证节点留空
authAccount="" authAccount=""
#等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitBlocks4CommitMsg=2
#云端主链节点切换后,平行链适配新主链节点block,回溯查找和自己记录的相同blockhash的深度 #云端主链节点切换后,平行链适配新主链节点block,回溯查找和自己记录的相同blockhash的深度
searchHashMatchedBlockDepth=10000 searchHashMatchedBlockDepth=10000
#创世地址额度 #创世地址额度
......
...@@ -241,6 +241,23 @@ func getDappForkHeight(forkKey string) int64 { ...@@ -241,6 +241,23 @@ func getDappForkHeight(forkKey string) int64 {
return forkHeight return forkHeight
} }
func getConfigNodes(db dbm.KV, title string) (map[string]struct{}, []byte, error) {
key := calcParaNodeGroupAddrsKey(title)
nodes, _, err := getNodes(db, key)
if err != nil {
if errors.Cause(err) != pt.ErrTitleNotExist {
return nil, nil, errors.Wrapf(err, "getNodes para for title:%s", title)
}
key = calcManageConfigNodesKey(title)
nodes, _, err = getNodes(db, key)
if err != nil {
return nil, nil, errors.Wrapf(err, "getNodes manager for title:%s", title)
}
}
return nodes, key, nil
}
func (a *action) getNodesGroup(title string) (map[string]struct{}, error) { func (a *action) getNodesGroup(title string) (map[string]struct{}, error) {
if a.exec.GetMainHeight() < getDappForkHeight(pt.ForkCommitTx) { if a.exec.GetMainHeight() < getDappForkHeight(pt.ForkCommitTx) {
nodes, _, err := getConfigManageNodes(a.db, title) nodes, _, err := getConfigManageNodes(a.db, title)
...@@ -250,18 +267,9 @@ func (a *action) getNodesGroup(title string) (map[string]struct{}, error) { ...@@ -250,18 +267,9 @@ func (a *action) getNodesGroup(title string) (map[string]struct{}, error) {
return nodes, nil return nodes, nil
} }
nodes, _, err := getParacrossNodes(a.db, title) nodes, _, err := getConfigNodes(a.db, title)
if err != nil { return nodes, err
if errors.Cause(err) != pt.ErrTitleNotExist {
return nil, errors.Wrapf(err, "getNodes para for title:%s", title)
}
nodes, _, err = getConfigManageNodes(a.db, title)
if err != nil {
return nil, errors.Wrapf(err, "getNodes manager for title:%s", title)
}
}
return nodes, nil
} }
//根据nodes过滤掉可能退出了的addrs //根据nodes过滤掉可能退出了的addrs
...@@ -394,7 +402,11 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -394,7 +402,11 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
if commit.Status.Height > titleStatus.Height+1 { if commit.Status.Height > titleStatus.Height+1 {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat) saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
//平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始 //平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始
if !(types.IsPara() && titleStatus.Height == -1) { paraSwitch, err := a.isParaSelfConsensSwitch(commit, titleStatus)
if err != nil {
return nil, err
}
if !paraSwitch {
return receipt, nil return receipt, nil
} }
} }
...@@ -457,6 +469,32 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error ...@@ -457,6 +469,32 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
return receipt, nil return receipt, nil
} }
//平行链自共识无缝切换条件:1,平行链没有共识过,2:commit高度是大于自共识分叉高度且上一次共识的主链高度小于自共识分叉高度,保证只运行一次,
// 这样在主链没有共识空洞前提下,平行链允许有条件的共识跳跃
func (a *action) isParaSelfConsensSwitch(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
if !types.IsPara() {
return false, nil
}
if titleStatus.Height == -1 {
return true, nil
}
selfConsensForkHeight := getDappForkHeight(pt.ParaSelfConsensForkHeight)
lastStatusMainHeight := int64(-1)
if titleStatus.Height > -1 {
stat, err := getTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, titleStatus.Height))
if err != nil {
clog.Error("paracross.Commit isParaSelfConsensSwitch getTitleHeight failed", "err", err.Error())
return false, err
}
lastStatusMainHeight = stat.MainHeight
}
return commit.Status.MainBlockHeight > selfConsensForkHeight && lastStatusMainHeight < selfConsensForkHeight, nil
}
func (a *action) execCrossTx(tx *types.TransactionDetail, commit *pt.ParacrossCommitAction, crossTxHash []byte) (*types.Receipt, error) { func (a *action) execCrossTx(tx *types.TransactionDetail, commit *pt.ParacrossCommitAction, crossTxHash []byte) (*types.Receipt, error) {
if !bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) { if !bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) {
return nil, nil return nil, nil
......
...@@ -63,10 +63,11 @@ func (p *Paracross) Query_GetTitleByHash(in *pt.ReqParacrossTitleHash) (types.Me ...@@ -63,10 +63,11 @@ func (p *Paracross) Query_GetTitleByHash(in *pt.ReqParacrossTitleHash) (types.Me
//Query_GetNodeGroupAddrs get node group addrs //Query_GetNodeGroupAddrs get node group addrs
func (p *Paracross) Query_GetNodeGroupAddrs(in *pt.ReqParacrossNodeInfo) (types.Message, error) { func (p *Paracross) Query_GetNodeGroupAddrs(in *pt.ReqParacrossNodeInfo) (types.Message, error) {
if in == nil { if in == nil || in.GetTitle() == "" {
return nil, types.ErrInvalidParam return nil, types.ErrInvalidParam
} }
ret, _, err := getParacrossNodes(p.GetStateDB(), in.GetTitle())
ret, key, err := getConfigNodes(p.GetStateDB(), in.GetTitle())
if err != nil { if err != nil {
return nil, errors.Cause(err) return nil, errors.Cause(err)
} }
...@@ -75,7 +76,7 @@ func (p *Paracross) Query_GetNodeGroupAddrs(in *pt.ReqParacrossNodeInfo) (types. ...@@ -75,7 +76,7 @@ func (p *Paracross) Query_GetNodeGroupAddrs(in *pt.ReqParacrossNodeInfo) (types.
nodes = append(nodes, k) nodes = append(nodes, k)
} }
var reply types.ReplyConfig var reply types.ReplyConfig
reply.Key = string(calcParaNodeGroupAddrsKey(in.GetTitle())) reply.Key = string(key)
reply.Value = fmt.Sprint(nodes) reply.Value = fmt.Sprint(nodes)
return &reply, nil return &reply, nil
} }
......
...@@ -18,6 +18,7 @@ var ( ...@@ -18,6 +18,7 @@ var (
glog = log.New("module", ParaX) glog = log.New("module", ParaX)
// ForkCommitTx main chain support paracross commit tx // ForkCommitTx main chain support paracross commit tx
ForkCommitTx = "ForkParacrossCommitTx" ForkCommitTx = "ForkParacrossCommitTx"
ParaSelfConsensForkHeight = "MainParaSelfConsensusForkHeight"
) )
func init() { func init() {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment