Commit 29629ee7 authored by mdj33's avatar mdj33 Committed by vipwzw

node quit trigger commit done

parent e989b8fb
......@@ -113,6 +113,8 @@ genesisAmount=100000000
MainForkParacrossCommitTx=2270000
#平行链自共识开启对应的主链高度,需要大于等于MainForkParacrossCommitTx=2270000, -1 不开启
MainParaSelfConsensusForkHeight=-1
#主链开启循环检查共识交易done的fork高度
MainLoopCheckCommitTxDoneForkHeight=-1
[store]
name="kvmvccmavl"
......
......@@ -118,8 +118,8 @@ func checkCommitInfo(commit *pt.ParacrossCommitAction) error {
return nil
}
func isCommitDone(f interface{}, nodes map[string]struct{}, mostSameHash int) bool {
return float32(mostSameHash) > float32(len(nodes))*float32(2)/float32(3)
func isCommitDone(nodes map[string]struct{}, mostSame int) bool {
return float32(mostSame) > float32(len(nodes))*float32(2)/float32(3)
}
func makeCommitReceipt(addr string, commit *pt.ParacrossCommitAction, prev, current *pt.ParacrossHeightStatus) *types.Receipt {
......@@ -161,29 +161,39 @@ func makeRecordReceipt(addr string, commit *pt.ParacrossCommitAction) *types.Rec
}
}
func makeDoneReceipt(addr string, commit *pt.ParacrossCommitAction, current *pt.ParacrossHeightStatus,
most, commitCount, totalCount int32) *types.Receipt {
func makeDoneReceipt(execMainHeight int64, commit *pt.ParacrossNodeStatus,
most, commitCount, totalCount int32) *types.Receipt {
log := &pt.ReceiptParacrossDone{
TotalNodes: totalCount,
TotalCommit: commitCount,
MostSameCommit: most,
Title: commit.Status.Title,
Height: commit.Status.Height,
StateHash: commit.Status.StateHash,
TxCounts: commit.Status.TxCounts,
TxResult: commit.Status.TxResult,
}
key := calcTitleKey(commit.Status.Title)
stat := &pt.ParacrossStatus{
Title: commit.Status.Title,
Height: commit.Status.Height,
BlockHash: commit.Status.BlockHash,
Title: commit.Title,
Height: commit.Height,
StateHash: commit.StateHash,
BlockHash: commit.BlockHash,
TxCounts: commit.TxCounts,
TxResult: commit.TxResult,
TxHashs: commit.TxHashs,
CrossTxHashs: commit.CrossTxHashs,
CrossTxResult: commit.CrossTxResult,
MainBlockHeight: commit.MainBlockHeight,
MainBlockHash: commit.MainBlockHash,
}
key := calcTitleKey(commit.Title)
status := &pt.ParacrossStatus{
Title: commit.Title,
Height: commit.Height,
BlockHash: commit.BlockHash,
}
if execMainHeight >= getDappForkHeight(pt.ForkLoopCheckCommitTxDone) {
status.MainHeight = commit.MainBlockHeight
status.MainHash = commit.MainBlockHash
}
return &types.Receipt{
Ty: types.ExecOk,
KV: []*types.KeyValue{
{Key: key, Value: types.Encode(stat)},
{Key: key, Value: types.Encode(status)},
},
Logs: []*types.ReceiptLog{
{
......@@ -215,6 +225,17 @@ func getMostCommit(stat *pt.ParacrossHeightStatus) (int, string) {
return most, hash
}
//需要在ForkLoopCheckCommitTxDone后使用
func getMostResults(mostHash []byte, stat *pt.ParacrossHeightStatus) ([]byte,[]byte,[]byte,[]byte) {
for i,hash := range stat.Details.BlockHash{
if bytes.Equal(mostHash,hash){
return stat.Details.TxResult[i],stat.Details.TxHashs[i],stat.Details.CrossTxResult[i],stat.Details.CrossTxHashs[i]
}
}
return nil,nil,nil,nil
}
func hasCommited(addrs []string, addr string) (bool, int) {
for i, a := range addrs {
if a == addr {
......@@ -228,9 +249,13 @@ func getDappForkHeight(forkKey string) int64 {
var forkHeight int64
if types.IsPara() {
key := forkKey
if forkKey == pt.ForkCommitTx {
key = "MainForkParacrossCommitTx"
switch forkKey {
case pt.ForkCommitTx:
key = pt.MainForkParacrossCommitTx
case pt.ForkLoopCheckCommitTxDone:
key = pt.MainLoopCheckCommitTxDoneForkHeight
}
forkHeight = types.Conf("config.consensus.sub.para").GInt(key)
if forkHeight <= 0 {
forkHeight = types.MaxHeight
......@@ -273,12 +298,19 @@ func (a *action) getNodesGroup(title string) (map[string]struct{}, error) {
}
//根据nodes过滤掉可能退出了的addrs
func updateCommitAddrs(stat *pt.ParacrossHeightStatus, nodes map[string]struct{}) {
func updateCommitAddrs(stat *pt.ParacrossHeightStatus, nodes map[string]struct{}, execMainHeight int64) {
details := &pt.ParacrossStatusDetails{}
for i, addr := range stat.Details.Addrs {
if _, ok := nodes[addr]; ok {
details.Addrs = append(details.Addrs, addr)
details.BlockHash = append(details.BlockHash, stat.Details.BlockHash[i])
if execMainHeight >= getDappForkHeight(pt.ForkLoopCheckCommitTxDone) {
details.TxResult = append(details.TxResult, stat.Details.TxResult[i])
details.TxHashs = append(details.TxHashs, stat.Details.TxHashs[i])
details.CrossTxResult = append(details.CrossTxResult, stat.Details.CrossTxResult[i])
details.CrossTxHashs = append(details.CrossTxHashs, stat.Details.CrossTxHashs[i])
}
}
}
stat.Details = details
......@@ -371,6 +403,14 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
stat.MainHeight = commit.Status.MainBlockHeight
stat.MainHash = commit.Status.MainBlockHash
}
if a.exec.GetMainHeight() >= getDappForkHeight(pt.ForkLoopCheckCommitTxDone){
stat.Details.TxResult = append(stat.Details.TxResult, commit.Status.TxResult)
stat.Details.TxHashs = append(stat.Details.TxHashs, commit.Status.TxHashs[0])
stat.Details.CrossTxResult = append(stat.Details.CrossTxResult, commit.Status.CrossTxResult)
stat.Details.CrossTxHashs = append(stat.Details.CrossTxHashs, commit.Status.CrossTxHashs[0])
}
receipt = makeCommitReceipt(a.fromaddr, commit, nil, stat)
} else {
var copyStat pt.ParacrossHeightStatus
......@@ -383,26 +423,34 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
found, index := hasCommited(stat.Details.Addrs, a.fromaddr)
if found {
stat.Details.BlockHash[index] = commit.Status.BlockHash
if a.exec.GetMainHeight() >= getDappForkHeight(pt.ForkLoopCheckCommitTxDone){
stat.Details.TxResult[index] = commit.Status.TxResult
stat.Details.TxHashs[index] = commit.Status.TxHashs[0]
stat.Details.CrossTxResult[index] = commit.Status.CrossTxResult
stat.Details.CrossTxHashs[index] = commit.Status.CrossTxHashs[0]
}
} else {
stat.Details.Addrs = append(stat.Details.Addrs, a.fromaddr)
stat.Details.BlockHash = append(stat.Details.BlockHash, commit.Status.BlockHash)
if a.exec.GetMainHeight() >= getDappForkHeight(pt.ForkLoopCheckCommitTxDone){
stat.Details.TxResult = append(stat.Details.TxResult, commit.Status.TxResult)
stat.Details.TxHashs = append(stat.Details.TxHashs, commit.Status.TxHashs[0])
stat.Details.CrossTxResult = append(stat.Details.CrossTxResult, commit.Status.CrossTxResult)
stat.Details.CrossTxHashs = append(stat.Details.CrossTxHashs, commit.Status.CrossTxHashs[0])
}
}
receipt = makeCommitReceipt(a.fromaddr, commit, &copyStat, stat)
}
//平行链fork pt.ForkCommitTx=0,主链在ForkCommitTx后支持nodegroup,这里平行链dappFork一定为true
if types.IsDappFork(commit.Status.MainBlockHeight, pt.ParaX, pt.ForkCommitTx) {
updateCommitAddrs(stat, nodes)
}
clog.Info("paracross.Commit commit", "stat.title", stat.Title, "stat.height", stat.Height, "notes", len(nodes))
for i, v := range stat.Details.Addrs {
clog.Info("paracross.Commit commit detail", "addr", v, "hash", hex.EncodeToString(stat.Details.BlockHash[i]))
updateCommitAddrs(stat, nodes,a.exec.GetMainHeight())
}
if commit.Status.Height > titleStatus.Height+1 {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
//平行链由主链共识无缝切换,即接收第一个收到的高度,可以不从0开始
paraSwitch, err := a.isParaSelfConsensSwitch(commit, titleStatus)
paraSwitch, err := isParaSelfConsensSwitch(a.db,stat, titleStatus)
if err != nil {
return nil, err
}
......@@ -410,68 +458,207 @@ func (a *action) Commit(commit *pt.ParacrossCommitAction) (*types.Receipt, error
return receipt, nil
}
}
return a.commitTxDone(commit.Status,stat,titleStatus,nodes,receipt)
}
func (a *action)commitTxDone(nodeStatus *pt.ParacrossNodeStatus, stat *pt.ParacrossHeightStatus,titleStatus *pt.ParacrossStatus,
nodes map[string]struct{},receipt *types.Receipt) (*types.Receipt, error){
clog.Info("paracross.Commit commit", "stat.title", stat.Title, "stat.height", stat.Height, "notes", len(nodes))
for i, v := range stat.Details.Addrs {
clog.Info("paracross.Commit commit detail", "addr", v, "hash", hex.EncodeToString(stat.Details.BlockHash[i]))
}
commitCount := len(stat.Details.Addrs)
most, mostHash := getMostCommit(stat)
if !isCommitDone(stat, nodes, most) {
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
if !isCommitDone(nodes, most) {
saveTitleHeight(a.db, calcTitleHeightKey(stat.Title, stat.Height), stat)
return receipt, nil
}
clog.Info("paracross.Commit commit ----pass", "most", most, "mostHash", hex.EncodeToString([]byte(mostHash)))
stat.Status = pt.ParacrossStatusCommitDone
saveTitleHeight(a.db, calcTitleHeightKey(stat.Title, stat.Height), stat)
//add commit done receipt
receiptDone := makeDoneReceipt(a.fromaddr, commit, stat, int32(most), int32(commitCount), int32(len(nodes)))
receiptDone := makeDoneReceipt(a.exec.GetMainHeight(), nodeStatus, int32(most), int32(commitCount), int32(len(nodes)))
receipt = mergeReceipt(receipt, receiptDone)
//平行连进行奖励分配,考虑可能的失败,需要在保存共识高度等数据之前处理
if types.IsPara() {
rewardReceipt, err := a.reward(commit.Status, stat)
//错误会导致和主链处理的共识结果不一致
if err != nil {
clog.Error("paracross mining reward err", "height", titleStatus.Height,
"blockhash", hex.EncodeToString(titleStatus.BlockHash), "err", err)
return nil, err
}
receipt = mergeReceipt(receipt, rewardReceipt)
}
return a.commitTxDoneStep2(nodeStatus,stat,titleStatus,receipt)
}
clog.Info("paracross.Commit commit ----pass", "most", most, "mostHash", hex.EncodeToString([]byte(mostHash)))
stat.Status = pt.ParacrossStatusCommitDone
saveTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, commit.Status.Height), stat)
func (a *action)commitTxDoneStep2(nodeStatus *pt.ParacrossNodeStatus, stat *pt.ParacrossHeightStatus,titleStatus *pt.ParacrossStatus,
receipt *types.Receipt) (*types.Receipt, error){
titleStatus.Title = commit.Status.Title
titleStatus.Height = commit.Status.Height
titleStatus.BlockHash = commit.Status.BlockHash
saveTitle(a.db, calcTitleKey(commit.Status.Title), titleStatus)
titleStatus.Title = nodeStatus.Title
titleStatus.Height = nodeStatus.Height
titleStatus.BlockHash = nodeStatus.BlockHash
if a.exec.GetMainHeight() >= getDappForkHeight(pt.ForkLoopCheckCommitTxDone) {
titleStatus.MainHeight = nodeStatus.MainBlockHeight
titleStatus.MainHash = nodeStatus.MainBlockHash
}
saveTitle(a.db, calcTitleKey(titleStatus.Title), titleStatus)
clog.Info("paracross.Commit commit done", "height", commit.Status.Height,
"cross tx bitmap", string(commit.Status.CrossTxResult), "statusBlockHash", hex.EncodeToString(titleStatus.BlockHash))
clog.Info("paracross.Commit commit done", "height", nodeStatus.Height, "statusBlockHash", hex.EncodeToString(nodeStatus.BlockHash))
//parallel chain not need to process cross commit tx here
if types.IsPara() {
//平行连进行奖励分配
rewardReceipt, err := a.reward(nodeStatus,stat)
//错误会导致和主链处理的共识结果不一致
if err != nil {
clog.Error("paracross mining reward err", "height", nodeStatus.Height,
"blockhash", hex.EncodeToString(nodeStatus.BlockHash), "err", err)
return nil, err
}
receipt = mergeReceipt(receipt, rewardReceipt)
return receipt, nil
}
haveCrossTxs := len(commit.Status.CrossTxHashs) > 0
if commit.Status.Height > 0 && types.IsDappFork(commit.Status.MainBlockHeight, pt.ParaX, pt.ForkCommitTx) && len(commit.Status.CrossTxHashs[0]) == 0 {
//主链,处理跨链交易
r,err := a.procCrossTxs(nodeStatus)
if err != nil{
return nil,err
}
receipt = mergeReceipt(receipt,r)
return receipt, nil
}
func (a *action) procCrossTxs(status *pt.ParacrossNodeStatus)(*types.Receipt, error){
haveCrossTxs := len(status.CrossTxHashs) > 0
if status.Height > 0 && types.IsDappFork(status.MainBlockHeight, pt.ParaX, pt.ForkCommitTx) && len(status.CrossTxHashs[0]) == 0 {
haveCrossTxs = false
}
if enableParacrossTransfer && commit.Status.Height > 0 && haveCrossTxs {
if enableParacrossTransfer && status.Height > 0 && haveCrossTxs {
clog.Debug("paracross.Commit commitDone", "do cross", "")
crossTxReceipt, err := a.execCrossTxs(commit)
crossTxReceipt, err := a.execCrossTxs(status)
if err != nil {
return nil, err
}
receipt.KV = append(receipt.KV, crossTxReceipt.KV...)
receipt.Logs = append(receipt.Logs, crossTxReceipt.Logs...)
return crossTxReceipt,nil
}
return receipt, nil
return nil, nil
}
//由于可能对当前块的共识交易进行处理,需要全部数据保存到statedb,通过tx获取数据无法处理当前块的场景
func (a *action) loopCommitTxDone(title string)(*types.Receipt, error){
receipt := &types.Receipt{}
nodes, _, err := getParacrossNodes(a.db, title)
if err != nil {
return nil, errors.Wrapf(err, "getNodes for title:%s", title)
}
//从当前共识高度开始遍历
titleStatus, err := getTitle(a.db, calcTitleKey(title))
if err != nil {
return nil, errors.Wrapf(err, "getTitle:%s", title)
}
//当前共识高度还未到分叉高度,则不处理
if titleStatus.GetMainHeight() < getDappForkHeight(pt.ForkLoopCheckCommitTxDone){
return nil,errors.Wrapf(pt.ErrForkHeightNotReach,
"titleHeight:%d,forkHeight", titleStatus.MainHeight,getDappForkHeight(pt.ForkLoopCheckCommitTxDone))
}
loopHeight := titleStatus.Height
for{
loopHeight++
stat, err := getTitleHeight(a.db, calcTitleHeightKey(title, loopHeight))
if err != nil {
clog.Error("paracross.Commit getTitleHeight failed", "err", err)
return receipt, err
}
//防止无限循环
if stat.MainHeight > a.exec.GetMainHeight(){
return receipt,nil
}
r,err := a.checkCommitTxDone(title,stat,nodes)
if err != nil{
clog.Error("paracross.Commit checkExecCommitTxDone", "para title", title, "height", stat.Height,"error", err, )
return receipt,nil
}
if r == nil{
return receipt,nil
}
receipt = mergeReceipt(receipt,r)
}
return receipt,nil
}
func (a *action) checkCommitTxDone(title string,stat *pt.ParacrossHeightStatus,nodes map[string]struct{})(*types.Receipt, error){
receipt := &types.Receipt{}
status, err := getTitle(a.db, calcTitleKey(title))
if err != nil {
return nil, errors.Wrapf(err, "getTitle:%s", a.fromaddr)
}
//如果是平行链自共识首次切换,可以在正常流程里面再触发
if stat.Height > status.Height+1 {
return nil, nil
}
updateCommitAddrs(stat, nodes,a.exec.GetMainHeight())
most, _ := getMostCommit(stat)
if !isCommitDone(nodes, most) {
return nil, nil
}
return a.commitTxDoneByStat(stat,status,nodes,receipt)
}
//只根据stat的信息在commitDone之后重构一个commitStatus做后续处理
func (a *action) commitTxDoneByStat(stat *pt.ParacrossHeightStatus,titleStatus *pt.ParacrossStatus,
nodes map[string]struct{},receipt *types.Receipt) (*types.Receipt, error){
clog.Info("paracross.Commit commit", "stat.title", stat.Title, "stat.height", stat.Height, "notes", len(nodes))
for i, v := range stat.Details.Addrs {
clog.Info("paracross.Commit commit detail", "addr", v, "hash", hex.EncodeToString(stat.Details.BlockHash[i]))
}
commitCount := len(stat.Details.Addrs)
most, mostHash := getMostCommit(stat)
if !isCommitDone(nodes, most) {
saveTitleHeight(a.db, calcTitleHeightKey(stat.Title, stat.Height), stat)
return receipt, nil
}
clog.Info("paracross.Commit commit ----pass", "most", most, "mostHash", hex.EncodeToString([]byte(mostHash)))
stat.Status = pt.ParacrossStatusCommitDone
saveTitleHeight(a.db, calcTitleHeightKey(stat.Title, stat.Height), stat)
txRst,txHash,crossTxRst,crossTxHash := getMostResults([]byte(mostHash),stat)
finalStatus := &pt.ParacrossNodeStatus{
MainBlockHash: stat.MainHash,
MainBlockHeight: stat.MainHeight,
Title: stat.Title,
Height: stat.Height,
BlockHash: []byte(mostHash),
TxResult: txRst,
TxHashs: [][]byte{txHash},
CrossTxResult: crossTxRst,
CrossTxHashs: [][]byte{crossTxHash},
}
//stat.ConsensBlockHash = []byte(mostHash)
//add commit done receipt
receiptDone := makeDoneReceipt(a.exec.GetMainHeight(), finalStatus,int32(most), int32(commitCount), int32(len(nodes)))
receipt = mergeReceipt(receipt, receiptDone)
clog.Info("paracross.Commit commit ----pass", "most", most, "mostHash", hex.EncodeToString([]byte(mostHash)))
return a.commitTxDoneStep2(finalStatus,stat,titleStatus,receipt)
}
//平行链自共识无缝切换条件:1,平行链没有共识过,2:commit高度是大于自共识分叉高度且上一次共识的主链高度小于自共识分叉高度,保证只运行一次,
// 这样在主链没有共识空洞前提下,平行链允许有条件的共识跳跃
func (a *action) isParaSelfConsensSwitch(commit *pt.ParacrossCommitAction, titleStatus *pt.ParacrossStatus) (bool, error) {
func (a *action) isParaSelfConsensSwitch(stat *pt.ParacrossHeightStatus, titleStatus *pt.ParacrossStatus) (bool, error) {
if !types.IsPara() {
return false, nil
}
......@@ -483,99 +670,101 @@ func (a *action) isParaSelfConsensSwitch(commit *pt.ParacrossCommitAction, title
selfConsensForkHeight := getDappForkHeight(pt.ParaSelfConsensForkHeight)
lastStatusMainHeight := int64(-1)
if titleStatus.Height > -1 {
stat, err := getTitleHeight(a.db, calcTitleHeightKey(commit.Status.Title, titleStatus.Height))
s, err := getTitleHeight(a.db, calcTitleHeightKey(stat.Title, titleStatus.Height))
if err != nil {
clog.Error("paracross.Commit isParaSelfConsensSwitch getTitleHeight failed", "err", err.Error())
return false, err
}
lastStatusMainHeight = stat.MainHeight
lastStatusMainHeight = s.MainHeight
}
return commit.Status.MainBlockHeight > selfConsensForkHeight && lastStatusMainHeight < selfConsensForkHeight, nil
return stat.MainHeight > selfConsensForkHeight && lastStatusMainHeight < selfConsensForkHeight, nil
}
func (a *action) execCrossTx(tx *types.TransactionDetail, commit *pt.ParacrossCommitAction, crossTxHash []byte) (*types.Receipt, error) {
func (a *action) execCrossTx(tx *types.TransactionDetail, crossTxHash []byte) (*types.Receipt, error) {
if !bytes.HasSuffix(tx.Tx.Execer, []byte(pt.ParaX)) {
return nil, nil
}
var payload pt.ParacrossAction
err := types.Decode(tx.Tx.Payload, &payload)
if err != nil {
clog.Crit("paracross.Commit Decode Tx failed", "para title", commit.Status.Title,
"para height", commit.Status.Height, "error", err, "txHash", hex.EncodeToString(crossTxHash))
clog.Crit("paracross.Commit Decode Tx failed", "error", err, "txHash", hex.EncodeToString(crossTxHash))
return nil, err
}
if payload.Ty == pt.ParacrossActionAssetWithdraw {
receiptWithdraw, err := a.assetWithdraw(payload.GetAssetWithdraw(), tx.Tx)
if err != nil {
clog.Crit("paracross.Commit Decode Tx failed", "para title", commit.Status.Title,
"para height", commit.Status.Height, "error", err, "txHash", hex.EncodeToString(crossTxHash))
clog.Crit("paracross.Commit Decode Tx failed", "error", err, "txHash", hex.EncodeToString(crossTxHash))
return nil, errors.Cause(err)
}
clog.Info("paracross.Commit WithdrawCoins", "para title", commit.Status.Title,
"para height", commit.Status.Height, "error", err, "txHash", hex.EncodeToString(crossTxHash))
clog.Info("paracross.Commit WithdrawCoins", "txHash", hex.EncodeToString(crossTxHash))
return receiptWithdraw, nil
} //else if tx.ActionName == pt.ParacrossActionAssetTransferStr {
}
return nil, nil
//}
}
func getCrossTxHashs(api client.QueueProtocolAPI, commit *pt.ParacrossCommitAction) ([][]byte, []byte, error) {
if types.IsDappFork(commit.Status.MainBlockHeight, pt.ParaX, pt.ForkCommitTx) {
if len(commit.Status.CrossTxHashs) == 0 {
clog.Error("getCrossTxHashs len=0", "paraHeight", commit.Status.Height,
"mainHeight", commit.Status.MainBlockHeight, "mainHash", hex.EncodeToString(commit.Status.MainBlockHash))
return nil, nil, types.ErrCheckTxHash
}
blockDetail, err := GetBlock(api, commit.Status.MainBlockHash)
if err != nil {
return nil, nil, err
}
//校验
paraBaseTxs := FilterTxsForPara(commit.Status.Title, blockDetail)
paraCrossHashs := FilterParaCrossTxHashes(commit.Status.Title, paraBaseTxs)
var baseHashs [][]byte
for _, tx := range paraBaseTxs {
baseHashs = append(baseHashs, tx.Hash())
}
baseCheckTxHash := CalcTxHashsHash(baseHashs)
crossCheckHash := CalcTxHashsHash(paraCrossHashs)
if !bytes.Equal(commit.Status.CrossTxHashs[0], crossCheckHash) {
clog.Error("getCrossTxHashs para hash not equal", "paraHeight", commit.Status.Height,
"mainHeight", commit.Status.MainBlockHeight, "mainHash", hex.EncodeToString(commit.Status.MainBlockHash),
"main.crossHash", hex.EncodeToString(crossCheckHash),
"commit.crossHash", hex.EncodeToString(commit.Status.CrossTxHashs[0]),
"main.baseHash", hex.EncodeToString(baseCheckTxHash), "commit.baseHash", hex.EncodeToString(commit.Status.TxHashs[0]))
for _, hash := range baseHashs {
clog.Error("getCrossTxHashs base tx hash", "txhash", hex.EncodeToString(hash))
}
for _, hash := range paraCrossHashs {
clog.Error("getCrossTxHashs paracross tx hash", "txhash", hex.EncodeToString(hash))
}
return nil, nil, types.ErrCheckTxHash
}
//只获取跨链tx
rst, err := hex.DecodeString(string(commit.Status.CrossTxResult))
if err != nil {
clog.Error("getCrossTxHashs decode string", "CrossTxResult", string(commit.Status.CrossTxResult),
"commit.height", commit.Status.Height)
return nil, nil, types.ErrInvalidParam
}
func getCrossTxHashs(api client.QueueProtocolAPI, status *pt.ParacrossNodeStatus) ([][]byte, []byte, error) {
if !types.IsDappFork(status.MainBlockHeight, pt.ParaX, pt.ForkCommitTx){
return status.CrossTxHashs, status.CrossTxResult, nil
}
return paraCrossHashs, rst, nil
if len(status.CrossTxHashs) == 0 {
clog.Error("getCrossTxHashs len=0", "paraHeight", status.Height,
"mainHeight", status.MainBlockHeight, "mainHash", hex.EncodeToString(status.MainBlockHash))
return nil, nil, types.ErrCheckTxHash
}
return commit.Status.CrossTxHashs, commit.Status.CrossTxResult, nil
blockDetail, err := GetBlock(api, status.MainBlockHash)
if err != nil {
return nil, nil, err
}
//校验
paraBaseTxs := FilterTxsForPara(status.Title, blockDetail)
paraCrossHashs := FilterParaCrossTxHashes(status.Title, paraBaseTxs)
var baseHashs [][]byte
for _, tx := range paraBaseTxs {
baseHashs = append(baseHashs, tx.Hash())
}
baseCheckTxHash := CalcTxHashsHash(baseHashs)
crossCheckHash := CalcTxHashsHash(paraCrossHashs)
if !bytes.Equal(status.CrossTxHashs[0], crossCheckHash) {
clog.Error("getCrossTxHashs para hash not equal", "paraHeight", status.Height,
"mainHeight", status.MainBlockHeight, "mainHash", hex.EncodeToString(status.MainBlockHash),
"main.crossHash", hex.EncodeToString(crossCheckHash),
"commit.crossHash", hex.EncodeToString(status.CrossTxHashs[0]),
"main.baseHash", hex.EncodeToString(baseCheckTxHash), "commit.baseHash", hex.EncodeToString(status.TxHashs[0]))
for _, hash := range baseHashs {
clog.Error("getCrossTxHashs base tx hash", "txhash", hex.EncodeToString(hash))
}
for _, hash := range paraCrossHashs {
clog.Error("getCrossTxHashs paracross tx hash", "txhash", hex.EncodeToString(hash))
}
return nil, nil, types.ErrCheckTxHash
}
//只获取跨链tx
rst, err := hex.DecodeString(string(status.CrossTxResult))
if err != nil {
clog.Error("getCrossTxHashs decode string", "CrossTxResult", string(status.CrossTxResult),
"commit.height", status.Height)
return nil, nil, types.ErrInvalidParam
}
return paraCrossHashs, rst, nil
}
func (a *action) execCrossTxs(commit *pt.ParacrossCommitAction) (*types.Receipt, error) {
func (a *action) execCrossTxs(status *pt.ParacrossNodeStatus) (*types.Receipt, error) {
var receipt types.Receipt
crossTxHashs, crossTxResult, err := getCrossTxHashs(a.api, commit)
crossTxHashs, crossTxResult, err := getCrossTxHashs(a.api, status)
if err != nil {
clog.Error("paracross.Commit getCrossTxHashs", "err", err.Error())
return nil, err
......@@ -585,25 +774,24 @@ func (a *action) execCrossTxs(commit *pt.ParacrossCommitAction) (*types.Receipt,
}
for i := 0; i < len(crossTxHashs); i++ {
clog.Info("paracross.Commit commitDone", "do cross number", i, "hash",
hex.EncodeToString(crossTxHashs[i]),
clog.Info("paracross.Commit commitDone", "do cross number", i, "hash",hex.EncodeToString(crossTxHashs[i]),
"res", util.BitMapBit(crossTxResult, uint32(i)))
if util.BitMapBit(crossTxResult, uint32(i)) {
tx, err := GetTx(a.api, crossTxHashs[i])
if err != nil {
clog.Crit("paracross.Commit Load Tx failed", "para title", commit.Status.Title,
"para height", commit.Status.Height, "para tx index", i, "error", err, "txHash",
hex.EncodeToString(crossTxHashs[i]))
clog.Crit("paracross.Commit Load Tx failed", "para title", title, "para height", status.Height,
"para tx index", i, "error", err, "txHash", hex.EncodeToString(crossTxHashs[i]))
return nil, err
}
if tx == nil {
clog.Error("paracross.Commit Load Tx failed", "para title", commit.Status.Title,
"para height", commit.Status.Height, "para tx index", i, "error", err, "txHash",
hex.EncodeToString(crossTxHashs[i]))
clog.Error("paracross.Commit Load Tx failed", "para title", title, "para height", status.Height,
"para tx index", i, "error", err, "txHash", hex.EncodeToString(crossTxHashs[i]))
return nil, types.ErrHashNotExist
}
receiptCross, err := a.execCrossTx(tx, commit, crossTxHashs[i])
receiptCross, err := a.execCrossTx(tx, crossTxHashs[i])
if err != nil {
clog.Error("paracross.Commit execCrossTx", "para title", title,"para height", status.Height,
"para tx index", i, "error", err)
return nil, errors.Cause(err)
}
if receiptCross == nil {
......
......@@ -77,6 +77,25 @@ func (e *Paracross) ExecDelLocal_NodeConfig(payload *pt.ParaNodeAddrConfig, tx *
}
key := calcLocalNodeTitleDone(g.Title, g.TargetAddr)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: nil})
}else if log.Ty == pt.TyLogParacrossCommitDone {
var g pt.ReceiptParacrossDone
types.Decode(log.Log, &g)
g.Height = g.Height - 1
key := calcLocalTitleKey(g.Title)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: types.Encode(&g)})
key = calcLocalHeightKey(g.Title, g.Height)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: nil})
if !types.IsPara() {
r, err := e.saveLocalParaTxsFork(&g, true)
if err != nil {
return nil, err
}
set.KV = append(set.KV, r.KV...)
}
}
}
return &set, nil
......
......@@ -81,6 +81,23 @@ func (e *Paracross) ExecLocal_NodeConfig(payload *pt.ParaNodeAddrConfig, tx *typ
}
key := calcLocalNodeTitleDone(g.Title, g.TargetAddr)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: types.Encode(&g)})
}else if log.Ty == pt.TyLogParacrossCommitDone {
var g pt.ReceiptParacrossDone
types.Decode(log.Log, &g)
key := calcLocalTitleKey(g.Title)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: types.Encode(&g)})
key = calcLocalHeightKey(g.Title, g.Height)
set.KV = append(set.KV, &types.KeyValue{Key: key, Value: types.Encode(&g)})
if !types.IsPara() {
r, err := e.saveLocalParaTxsFork(&g, false)
if err != nil {
return nil, err
}
set.KV = append(set.KV, r.KV...)
}
}
}
return &set, nil
......
......@@ -68,7 +68,7 @@ func (c *Paracross) checkTxGroup(tx *types.Transaction, index int) ([]*types.Tra
}
func (c *Paracross) saveLocalParaTxs(tx *types.Transaction, isDel bool) (*types.LocalDBSet, error) {
var set types.LocalDBSet
var payload pt.ParacrossAction
err := types.Decode(tx.Payload, &payload)
......@@ -80,10 +80,41 @@ func (c *Paracross) saveLocalParaTxs(tx *types.Transaction, isDel bool) (*types.
}
commit := payload.GetCommit()
crossTxHashs, crossTxResult, err := getCrossTxHashs(c.GetAPI(), commit)
crossTxHashs, crossTxResult, err := getCrossTxHashs(c.GetAPI(), commit.Status)
if err != nil {
return nil, err
}
return c.udpateLocalParaTxs(commit.Status.Title, commit.Status.Height,crossTxHashs,crossTxResult,isDel)
}
//无法获取到commit tx信息,从commitDone 结构里面构建
func (c *Paracross) saveLocalParaTxsFork(commitDone *pt.ReceiptParacrossDone,isDel bool) (*types.LocalDBSet, error) {
status := &pt.ParacrossNodeStatus{
MainBlockHash: commitDone.MainBlockHash,
MainBlockHeight: commitDone.MainBlockHeight,
Title: commitDone.Title,
Height: commitDone.Height,
BlockHash:commitDone.BlockHash,
TxResult: commitDone.TxResult,
TxHashs: commitDone.TxHashs,
CrossTxResult: commitDone.CrossTxResult,
CrossTxHashs: commitDone.CrossTxHashs,
}
crossTxHashs, crossTxResult, err := getCrossTxHashs(c.GetAPI(), status)
if err != nil {
return nil, err
}
return c.udpateLocalParaTxs(commitDone.Title, commitDone.Height,crossTxHashs,crossTxResult,isDel)
}
func (c *Paracross) udpateLocalParaTxs(paraTitle string, paraHeight int64,crossTxHashs [][]byte, crossTxResult []byte, isDel bool) (*types.LocalDBSet, error) {
var set types.LocalDBSet
if len(crossTxHashs) == 0 {
return &set, nil
}
......@@ -93,8 +124,8 @@ func (c *Paracross) saveLocalParaTxs(tx *types.Transaction, isDel bool) (*types.
paraTx, err := GetTx(c.GetAPI(), crossTxHashs[i])
if err != nil {
clog.Crit("paracross.Commit Load Tx failed", "para title", commit.Status.Title,
"para height", commit.Status.Height, "para tx index", i, "error", err, "txHash",
clog.Crit("paracross.Commit Load Tx failed", "para title", paraTitle,
"para height", paraHeight, "para tx index", i, "error", err, "txHash",
hex.EncodeToString(crossTxHashs[i]))
return nil, err
}
......@@ -102,19 +133,19 @@ func (c *Paracross) saveLocalParaTxs(tx *types.Transaction, isDel bool) (*types.
var payload pt.ParacrossAction
err = types.Decode(paraTx.Tx.Payload, &payload)
if err != nil {
clog.Crit("paracross.Commit Decode Tx failed", "para title", commit.Status.Title,
"para height", commit.Status.Height, "para tx index", i, "error", err, "txHash",
clog.Crit("paracross.Commit Decode Tx failed", "para title", paraTitle,
"para height", paraHeight, "para tx index", i, "error", err, "txHash",
hex.EncodeToString(crossTxHashs[i]))
return nil, err
}
if payload.Ty == pt.ParacrossActionAssetTransfer {
kv, err := c.updateLocalAssetTransfer(tx, paraTx.Tx, success, isDel)
kv, err := c.updateLocalAssetTransfer(paraHeight,paraTx.Tx ,success, isDel)
if err != nil {
return nil, err
}
set.KV = append(set.KV, kv)
} else if payload.Ty == pt.ParacrossActionAssetWithdraw {
kv, err := c.initLocalAssetWithdraw(tx, paraTx.Tx, true, success, isDel)
kv, err := c.initLocalAssetWithdraw(paraHeight, paraTx.Tx, true, success, isDel)
if err != nil {
return nil, err
}
......@@ -185,7 +216,7 @@ func (c *Paracross) initLocalAssetTransfer(tx *types.Transaction, success, isDel
return &types.KeyValue{Key: key, Value: types.Encode(&asset)}, nil
}
func (c *Paracross) initLocalAssetWithdraw(txCommit, tx *types.Transaction, isWithdraw, success, isDel bool) (*types.KeyValue, error) {
func (c *Paracross) initLocalAssetWithdraw(paraHeight int64, tx *types.Transaction, isWithdraw, success, isDel bool) (*types.KeyValue, error) {
key := calcLocalAssetKey(tx.Hash())
if isDel {
c.GetLocalDB().Set(key, nil)
......@@ -198,10 +229,7 @@ func (c *Paracross) initLocalAssetWithdraw(txCommit, tx *types.Transaction, isWi
if err != nil {
return nil, err
}
asset.ParaHeight, err = getCommitHeight(txCommit.Payload)
if err != nil {
return nil, err
}
asset.ParaHeight = paraHeight
var payload pt.ParacrossAction
err = types.Decode(tx.Payload, &payload)
......@@ -239,7 +267,7 @@ func (c *Paracross) initLocalAssetWithdraw(txCommit, tx *types.Transaction, isWi
return &types.KeyValue{Key: key, Value: types.Encode(&asset)}, nil
}
func (c *Paracross) updateLocalAssetTransfer(txCommit, tx *types.Transaction, success, isDel bool) (*types.KeyValue, error) {
func (c *Paracross) updateLocalAssetTransfer(paraHeight int64, tx *types.Transaction, success, isDel bool) (*types.KeyValue, error) {
clog.Debug("para execLocal", "tx hash", hex.EncodeToString(tx.Hash()))
key := calcLocalAssetKey(tx.Hash())
......@@ -253,10 +281,8 @@ func (c *Paracross) updateLocalAssetTransfer(txCommit, tx *types.Transaction, su
panic(err)
}
if !isDel {
asset.ParaHeight, err = getCommitHeight(txCommit.Payload)
if err != nil {
return nil, err
}
asset.ParaHeight = paraHeight
asset.CommitDoneHeight = c.GetHeight()
asset.Success = success
} else {
......
......@@ -545,7 +545,7 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
updateVotes(stat, nodes)
most, vote := getMostVote(stat)
if !isCommitDone(stat, nodes, most) {
if !isCommitDone(nodes, most) {
superManagerPass := false
if isSuperManager(a.fromaddr) {
//如果主链执行失败,交易不会过滤到平行链,如果主链成功,平行链直接成功
......@@ -612,6 +612,15 @@ func (a *action) nodeVote(config *pt.ParaNodeAddrConfig) (*types.Receipt, error)
}
receipt = mergeReceipt(receipt, r)
if a.exec.GetMainHeight() > getDappForkHeight(pt.ForkLoopCheckCommitTxDone){
//node quit后,如果committx满足2/3目标,自动触发commitDone
r,err = a.loopCommitTxDone(config.Title)
if err != nil{
clog.Error("unpdateNodeGroup.loopCommitTxDone", "title", title,"err",err.Error())
}
receipt = mergeReceipt(receipt, r)
}
stat.Status = pt.ParacrossNodeClosed
stat.Height = a.height
}
......@@ -664,7 +673,10 @@ func unpdateNodeGroup(db dbm.KV, title, addr string, add bool) (*types.Receipt,
}
}
}
err = db.Set(key,types.Encode(&item))
if err != nil {
return nil, errors.Wrapf(err,"unpdateNodeGroup set dbkey=%s",key)
}
return makeParaNodeGroupReceipt(title, &copyItem, &item), nil
}
......
......@@ -9,8 +9,12 @@ package types;
// stateDB
message ParacrossStatusDetails {
repeated string addrs = 1;
repeated bytes blockHash = 2;
repeated string addrs = 1;
repeated bytes blockHash = 2;
repeated bytes txResult = 3;
repeated bytes txHashs = 4;
repeated bytes crossTxResult = 5;
repeated bytes crossTxHashs = 6;
}
message ParacrossHeightStatus {
......@@ -35,9 +39,11 @@ message ParacrossHeightStatusRsp {
}
message ParacrossStatus {
string title = 1;
int64 height = 2;
bytes blockHash = 3;
string title = 1;
int64 height = 2;
bytes blockHash = 3;
int64 mainHeight = 4;
bytes mainHash = 5;
}
message ParacrossConsensusStatus {
......@@ -219,14 +225,20 @@ message ReceiptParacrossMiner {
}
message ReceiptParacrossDone {
int32 totalNodes = 1;
int32 totalCommit = 2;
int32 mostSameCommit = 3;
string title = 4;
int64 height = 5;
bytes stateHash = 6;
uint32 txCounts = 7;
bytes txResult = 8;
int32 totalNodes = 1;
int32 totalCommit = 2;
int32 mostSameCommit = 3;
string title = 4;
int64 height = 5;
bytes stateHash = 6;
uint32 txCounts = 7;
bytes txResult = 8;
bytes blockHash = 9;
repeated bytes txHashs = 10;
bytes crossTxResult = 11;
repeated bytes crossTxHashs = 12;
bytes mainBlockHash = 13;
int64 mainBlockHeight = 14;
}
message ReceiptParacrossRecord {
......
......@@ -49,4 +49,6 @@ var (
ErrParaNodeOpStatusWrong = errors.New("ErrParaNodeOpStatusWrong")
//ErrParaConsensStopBlocksNotReach consensus stop blocks not reach
ErrParaConsensStopBlocksNotReach = errors.New("ErrParaConsensStopBlocksNotReach")
//ErrForkHeightNotReach fork height not reach
ErrForkHeightNotReach = errors.New("ErrForkHeightNotReach")
)
......@@ -18,8 +18,14 @@ var (
glog = log.New("module", ParaX)
// ForkCommitTx main chain support paracross commit tx
ForkCommitTx = "ForkParacrossCommitTx"
//平行链配置项对应主链的ForkCommitTx 高度
MainForkParacrossCommitTx="MainForkParacrossCommitTx"
// ParaSelfConsensForkHeight para self consens height string
ParaSelfConsensForkHeight = "MainParaSelfConsensusForkHeight"
//ForkLoopCheckCommitTxDone 循环检查共识交易done的fork
ForkLoopCheckCommitTxDone ="ForkLoopCheckCommitTxDone"
//MainLoopCheckCommitTxDoneForkHeight 平行链的配置项,对应主链的ForkLoopCheckCommitTxDone高度
MainLoopCheckCommitTxDoneForkHeight="MainLoopCheckCommitTxDoneForkHeight"
)
func init() {
......@@ -29,6 +35,7 @@ func init() {
types.RegisterDappFork(ParaX, "Enable", 0)
types.RegisterDappFork(ParaX, "ForkParacrossWithdrawFromParachain", 1298600)
types.RegisterDappFork(ParaX, ForkCommitTx, 1850000)
types.RegisterDappFork(ParaX, ForkLoopCheckCommitTxDone, -1)
}
// GetExecName get para exec name
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment