Commit 508227d0 authored by mdj33's avatar mdj33 Committed by vipwzw

add feeRate

parent 0f3cc074
...@@ -178,6 +178,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -178,6 +178,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
waitConsensStopTimes: waitConsensTimes, waitConsensStopTimes: waitConsensTimes,
consensHeight: -2, consensHeight: -2,
sendingHeight: -1, sendingHeight: -1,
resetCh: make(chan interface{}, 1),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
......
...@@ -34,6 +34,7 @@ type commitMsgClient struct { ...@@ -34,6 +34,7 @@ type commitMsgClient struct {
paraClient *client paraClient *client
waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2 waitMainBlocks int32 //等待平行链共识消息在主链上链并成功的块数,超出会重发共识消息,最小是2
waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数 waitConsensStopTimes uint32 //共识高度低于完成高度, reset高度重发等待的次数
resetCh chan interface{}
sendMsgCh chan *types.Transaction sendMsgCh chan *types.Transaction
minerSwitch int32 minerSwitch int32
currentTx unsafe.Pointer currentTx unsafe.Pointer
...@@ -43,6 +44,7 @@ type commitMsgClient struct { ...@@ -43,6 +44,7 @@ type commitMsgClient struct {
authAccountIn bool authAccountIn bool
isRollBack int32 isRollBack int32
checkTxCommitTimes int32 checkTxCommitTimes int32
txFeeRate int64
privateKey crypto.PrivKey privateKey crypto.PrivKey
quit chan struct{} quit chan struct{}
mutex sync.Mutex mutex sync.Mutex
...@@ -59,7 +61,7 @@ func (client *commitMsgClient) handler() { ...@@ -59,7 +61,7 @@ func (client *commitMsgClient) handler() {
checkParams := &commitCheckParams{} checkParams := &commitCheckParams{}
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
go client.getMainConsensusHeight() go client.getMainConsensusInfo()
if client.paraClient.authAccount != "" { if client.paraClient.authAccount != "" {
client.paraClient.wg.Add(1) client.paraClient.wg.Add(1)
...@@ -74,6 +76,10 @@ func (client *commitMsgClient) handler() { ...@@ -74,6 +76,10 @@ func (client *commitMsgClient) handler() {
out: out:
for { for {
select { select {
//出错场景入口,需要reset 重发
case <-client.resetCh:
client.resetSend()
client.sendCommitTx()
//例行检查发送入口 //例行检查发送入口
case <-readTick: case <-readTick:
client.procChecks(checkParams) client.procChecks(checkParams)
...@@ -98,16 +104,15 @@ func (client *commitMsgClient) updateChainHeightNotify(height int64, isDel bool) ...@@ -98,16 +104,15 @@ func (client *commitMsgClient) updateChainHeightNotify(height int64, isDel bool)
atomic.StoreInt64(&client.chainHeight, height) atomic.StoreInt64(&client.chainHeight, height)
client.checkRollback(height) client.checkRollback(height)
client.sendCommitTx() if !client.isSendingCommitMsg() {
client.sendCommitTx()
}
} }
// reset notify 提供重设发送参数,发送tx的入口 // reset notify 提供重设发送参数,发送tx的入口
func (client *commitMsgClient) resetNotify() { func (client *commitMsgClient) resetNotify() {
client.mutex.Lock() client.resetCh <- 1
defer client.mutex.Unlock()
client.resetSendEnv()
} }
//新的区块产生,检查是否有commitTx正在发送入口 //新的区块产生,检查是否有commitTx正在发送入口
...@@ -121,6 +126,12 @@ func (client *commitMsgClient) resetSendEnv() { ...@@ -121,6 +126,12 @@ func (client *commitMsgClient) resetSendEnv() {
client.sendingHeight = -1 client.sendingHeight = -1
client.setCurrentTx(nil) client.setCurrentTx(nil)
} }
func (client *commitMsgClient) resetSend() {
client.mutex.Lock()
defer client.mutex.Unlock()
client.resetSendEnv()
}
//自共识后直接从本地获取最新共识高度,没有自共识,获取主链的共识高度 //自共识后直接从本地获取最新共识高度,没有自共识,获取主链的共识高度
func (client *commitMsgClient) getConsensusHeight() int64 { func (client *commitMsgClient) getConsensusHeight() int64 {
...@@ -249,9 +260,7 @@ func (client *commitMsgClient) checkAuthAccountIn() { ...@@ -249,9 +260,7 @@ func (client *commitMsgClient) checkAuthAccountIn() {
//如果授权节点重新加入,需要从当前共识高度重新发送 //如果授权节点重新加入,需要从当前共识高度重新发送
if !client.authAccountIn && authExist { if !client.authAccountIn && authExist {
client.mutex.Lock() client.resetSend()
client.resetSendEnv()
client.mutex.Unlock()
} }
client.authAccountIn = authExist client.authAccountIn = authExist
...@@ -318,7 +327,7 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type ...@@ -318,7 +327,7 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
return nil, 0 return nil, 0
} }
signTx, count, err := client.calcCommitMsgTxs(status) signTx, count, err := client.calcCommitMsgTxs(status, atomic.LoadInt64(&client.txFeeRate))
if err != nil || signTx == nil { if err != nil || signTx == nil {
return nil, 0 return nil, 0
} }
...@@ -334,10 +343,10 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type ...@@ -334,10 +343,10 @@ func (client *commitMsgClient) getSendingTx(startHeight, endHeight int64) (*type
return signTx, count return signTx, count
} }
func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int64, error) { func (client *commitMsgClient) calcCommitMsgTxs(notifications []*pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, int64, error) {
txs, count, err := client.batchCalcTxGroup(notifications) txs, count, err := client.batchCalcTxGroup(notifications, feeRate)
if err != nil { if err != nil {
txs, err = client.singleCalcTx((notifications)[0]) txs, err = client.singleCalcTx((notifications)[0], feeRate)
if err != nil { if err != nil {
plog.Error("single calc tx", "height", notifications[0].Height) plog.Error("single calc tx", "height", notifications[0].Height)
...@@ -373,14 +382,14 @@ func (client *commitMsgClient) getTxsGroup(txsArr *types.Transactions) (*types.T ...@@ -373,14 +382,14 @@ func (client *commitMsgClient) getTxsGroup(txsArr *types.Transactions) (*types.T
return newtx, nil return newtx, nil
} }
func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNodeStatus) (*types.Transaction, int, error) { func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, int, error) {
var rawTxs types.Transactions var rawTxs types.Transactions
for _, status := range notifications { for _, status := range notifications {
execName := pt.ParaX execName := pt.ParaX
if isParaSelfConsensusForked(status.MainBlockHeight) { if isParaSelfConsensusForked(status.MainBlockHeight) {
execName = paracross.GetExecName() execName = paracross.GetExecName()
} }
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0) tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, feeRate)
if err != nil { if err != nil {
plog.Error("para get commit tx", "block height", status.Height) plog.Error("para get commit tx", "block height", status.Height)
return nil, 0, err return nil, 0, err
...@@ -395,12 +404,12 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod ...@@ -395,12 +404,12 @@ func (client *commitMsgClient) batchCalcTxGroup(notifications []*pt.ParacrossNod
return txs, len(notifications), nil return txs, len(notifications), nil
} }
func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus) (*types.Transaction, error) { func (client *commitMsgClient) singleCalcTx(status *pt.ParacrossNodeStatus, feeRate int64) (*types.Transaction, error) {
execName := pt.ParaX execName := pt.ParaX
if isParaSelfConsensusForked(status.MainBlockHeight) { if isParaSelfConsensusForked(status.MainBlockHeight) {
execName = paracross.GetExecName() execName = paracross.GetExecName()
} }
tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, 0) tx, err := paracross.CreateRawCommitTx4MainChain(status, execName, feeRate)
if err != nil { if err != nil {
plog.Error("para get commit tx", "block height", status.Height) plog.Error("para get commit tx", "block height", status.Height)
return nil, err return nil, err
...@@ -460,6 +469,14 @@ out: ...@@ -460,6 +469,14 @@ out:
select { select {
case tx = <-client.sendMsgCh: case tx = <-client.sendMsgCh:
err = client.sendCommitTxOut(tx) err = client.sendCommitTxOut(tx)
if err != nil && err == types.ErrTxFeeTooLow {
feeRate, err := client.GetProperFeeRate()
if err == nil {
atomic.StoreInt64(&client.txFeeRate, feeRate)
client.resetNotify()
}
continue
}
if err != nil && (err != types.ErrBalanceLessThanTenTimesFee && err != types.ErrNoBalance) { if err != nil && (err != types.ErrBalanceLessThanTenTimesFee && err != types.ErrNoBalance) {
resendTimer = time.After(time.Second * 2) resendTimer = time.After(time.Second * 2)
} }
...@@ -611,7 +628,7 @@ func (client *commitMsgClient) mainSync() error { ...@@ -611,7 +628,7 @@ func (client *commitMsgClient) mainSync() error {
} }
func (client *commitMsgClient) getMainConsensusHeight() { func (client *commitMsgClient) getMainConsensusInfo() {
ticker := time.NewTicker(time.Second * time.Duration(consensusInterval)) ticker := time.NewTicker(time.Second * time.Duration(consensusInterval))
isSync := false isSync := false
defer ticker.Stop() defer ticker.Stop()
...@@ -648,7 +665,16 @@ out: ...@@ -648,7 +665,16 @@ out:
if selfStatus != nil { if selfStatus != nil {
selfHeight = selfStatus.Height selfHeight = selfStatus.Height
} }
plog.Info("para consensusHeight", "mainHeight", status.Height, "selfHeight", selfHeight)
var feeRate int64
if client.paraClient.authAccount != "" {
feeRate, err = client.GetProperFeeRate()
if err == nil {
atomic.StoreInt64(&client.txFeeRate, feeRate)
}
}
plog.Info("para consensusHeight", "mainHeight", status.Height, "selfHeight", selfHeight, "feeRate", feeRate)
} }
} }
...@@ -656,6 +682,20 @@ out: ...@@ -656,6 +682,20 @@ out:
client.paraClient.wg.Done() client.paraClient.wg.Done()
} }
func (client *commitMsgClient) GetProperFeeRate() (int64, error) {
feeRate, err := client.paraClient.grpcClient.GetProperFee(context.Background(), &types.ReqProperFee{})
if err != nil {
plog.Error("para commit.GetProperFee", "err", err.Error())
return -1, err
}
if feeRate == nil {
plog.Error("para commit.GetProperFee return nil")
return -1, types.ErrInvalidParam
}
return feeRate.ProperFee, nil
}
func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, error) { func (client *commitMsgClient) getSelfConsensusStatus() (*pt.ParacrossStatus, error) {
block, err := client.paraClient.getLastBlockInfo() block, err := client.paraClient.getLastBlockInfo()
if err != nil { if err != nil {
...@@ -715,17 +755,17 @@ func (client *commitMsgClient) getMainConsensusStatus() (*pt.ParacrossStatus, er ...@@ -715,17 +755,17 @@ func (client *commitMsgClient) getMainConsensusStatus() (*pt.ParacrossStatus, er
Param: types.Encode(&pt.ReqParacrossTitleHash{Title: types.GetTitle(), BlockHash: block.MainHash}), Param: types.Encode(&pt.ReqParacrossTitleHash{Title: types.GetTitle(), BlockHash: block.MainHash}),
}) })
if err != nil { if err != nil {
plog.Error("getMainConsensusHeight", "err", err.Error()) plog.Error("getMainConsensusStatus", "err", err.Error())
return nil, err return nil, err
} }
if !reply.GetIsOk() { if !reply.GetIsOk() {
plog.Info("getMainConsensusHeight nok", "error", reply.GetMsg()) plog.Info("getMainConsensusStatus nok", "error", reply.GetMsg())
return nil, err return nil, err
} }
var result pt.ParacrossStatus var result pt.ParacrossStatus
err = types.Decode(reply.Msg, &result) err = types.Decode(reply.Msg, &result)
if err != nil { if err != nil {
plog.Error("getMainConsensusHeight decode", "err", err.Error()) plog.Error("getMainConsensusStatus decode", "err", err.Error())
return nil, err return nil, err
} }
return &result, nil return &result, nil
......
...@@ -53,18 +53,18 @@ func TestCalcCommitMsgTxs(t *testing.T) { ...@@ -53,18 +53,18 @@ func TestCalcCommitMsgTxs(t *testing.T) {
Title: "user.p.para", Title: "user.p.para",
} }
notify := []*pt.ParacrossNodeStatus{nt1} notify := []*pt.ParacrossNodeStatus{nt1}
tx, count, err := client.calcCommitMsgTxs(notify) tx, count, err := client.calcCommitMsgTxs(notify, 0)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, int64(1), count) assert.Equal(t, int64(1), count)
assert.NotNil(t, tx) assert.NotNil(t, tx)
notify = append(notify, nt2) notify = append(notify, nt2)
tx, count, err = client.calcCommitMsgTxs(notify) tx, count, err = client.calcCommitMsgTxs(notify, 0)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, int64(2), count) assert.Equal(t, int64(2), count)
assert.NotNil(t, tx) assert.NotNil(t, tx)
tx, err = client.singleCalcTx(nt2) tx, err = client.singleCalcTx(nt2, 0)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, tx) assert.NotNil(t, tx)
......
...@@ -172,7 +172,7 @@ func createRawParacrossCommitTx(parm *paracrossCommitTx) (*types.Transaction, er ...@@ -172,7 +172,7 @@ func createRawParacrossCommitTx(parm *paracrossCommitTx) (*types.Transaction, er
return createRawCommitTx(&parm.Status, types.ExecName(ParaX), parm.Fee) return createRawCommitTx(&parm.Status, types.ExecName(ParaX), parm.Fee)
} }
func createRawCommitTx(status *ParacrossNodeStatus, name string, fee int64) (*types.Transaction, error) { func createRawCommitTx(status *ParacrossNodeStatus, name string, feeRate int64) (*types.Transaction, error) {
v := &ParacrossCommitAction{ v := &ParacrossCommitAction{
Status: status, Status: status,
} }
...@@ -183,7 +183,6 @@ func createRawCommitTx(status *ParacrossNodeStatus, name string, fee int64) (*ty ...@@ -183,7 +183,6 @@ func createRawCommitTx(status *ParacrossNodeStatus, name string, fee int64) (*ty
tx := &types.Transaction{ tx := &types.Transaction{
Execer: []byte(name), Execer: []byte(name),
Payload: types.Encode(action), Payload: types.Encode(action),
Fee: fee,
To: address.ExecAddress(name), To: address.ExecAddress(name),
Expire: types.Now().Unix() + int64(120), //120s Expire: types.Now().Unix() + int64(120), //120s
} }
...@@ -191,6 +190,12 @@ func createRawCommitTx(status *ParacrossNodeStatus, name string, fee int64) (*ty ...@@ -191,6 +190,12 @@ func createRawCommitTx(status *ParacrossNodeStatus, name string, fee int64) (*ty
if err != nil { if err != nil {
return nil, err return nil, err
} }
if feeRate != 0 {
tx.Fee, err = tx.GetRealFee(feeRate)
if err != nil {
return nil, err
}
}
return tx, nil return tx, nil
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment