Commit 36f18d1b authored by harrylee's avatar harrylee

fix a bug for initbroker

parent 0df097c6
Pipeline #8235 failed with stages
in 0 seconds
...@@ -13,7 +13,7 @@ import ( ...@@ -13,7 +13,7 @@ import (
//BrokerDB ... //BrokerDB ...
type BrokerDB struct { type BrokerDB struct {
api client.QueueProtocolAPI api client.QueueProtocolAPI
db dbm.KV statedb dbm.KV
localdb dbm.KV localdb dbm.KV
txhash []byte txhash []byte
fromaddr string fromaddr string
...@@ -24,7 +24,7 @@ type BrokerDB struct { ...@@ -24,7 +24,7 @@ type BrokerDB struct {
var ( var (
// DefaultManagerAddr 默认管理员地址 // DefaultManagerAddr 默认管理员地址
DefaultManagerAddr = "12qyocayNF7Lv6C9qW4avxs2E7U41fKSfv" DefaultManagerAddr = "14KEKbYtKKQm4wMthSK9J4La4nAiidGozt"
) )
func newBrokerDB(b *broker, tx *types.Transaction, index int) *BrokerDB { func newBrokerDB(b *broker, tx *types.Transaction, index int) *BrokerDB {
...@@ -55,24 +55,27 @@ func (b *BrokerDB) GetEventKVSet(event *brokertypes.InterchainEvent) (kvset []*t ...@@ -55,24 +55,27 @@ func (b *BrokerDB) GetEventKVSet(event *brokertypes.InterchainEvent) (kvset []*t
//初始化broker //初始化broker
func (b *BrokerDB) initBroker(payload *brokertypes.Init) (*types.Receipt, error) { func (b *BrokerDB) initBroker(payload *brokertypes.Init) (*types.Receipt, error) {
//权限检查且只能初始化一次 //权限检查且只能初始化一次
if b.fromaddr != getManagerAddr(b.api.GetConfig(), b.db, brokerAdmin, DefaultManagerAddr) { if b.fromaddr != getManagerAddr(b.api.GetConfig(), b.statedb, brokerAdmin, DefaultManagerAddr) {
return nil, fmt.Errorf("the addr %s is not admin", b.fromaddr) return nil, fmt.Errorf("the addr %s is not admin", b.fromaddr)
} }
m := make(map[string]uint64)
//初始化map,填充一些默认值,防止序列化后为nil
m["broker"] = 0
meta := &brokertypes.Meta{ meta := &brokertypes.Meta{
Meta: make(map[string]uint64), Meta: m,
XXX_NoUnkeyedLiteral: struct{}{},
XXX_unrecognized: nil,
XXX_sizecache: 0,
} }
var kvsets []*types.KeyValue var kvsets []*types.KeyValue
kvsets = append(kvsets,&types.KeyValue{Key: calOutterMetaKey(), Value: types.Encode(meta)}) kvsets = append(kvsets, &types.KeyValue{Key: calOutterMetaKey(), Value: types.Encode(meta)})
kvsets = append(kvsets,&types.KeyValue{Key: calInnerMetaKey(), Value: types.Encode(meta)}) kvsets = append(kvsets, &types.KeyValue{Key: calInnerMetaKey(), Value: types.Encode(meta)})
kvsets = append(kvsets,&types.KeyValue{Key: calCallBackMetaKey(), Value: types.Encode(meta)}) kvsets = append(kvsets, &types.KeyValue{Key: calCallBackMetaKey(), Value: types.Encode(meta)})
kvsets = append(kvsets,&types.KeyValue{Key: calDstRollBackMetaKey(), Value: types.Encode(meta)}) kvsets = append(kvsets, &types.KeyValue{Key: calDstRollBackMetaKey(), Value: types.Encode(meta)})
receipt := &types.Receipt{Ty: types.ExecOk} receipt := &types.Receipt{Ty: types.ExecOk}
receipt.KV = append(receipt.KV,kvsets...) receipt.KV = append(receipt.KV, kvsets...)
receipt.KV = append(receipt.KV, &types.KeyValue{Key: BxhIDKey(), Value: types.Encode(payload)}) receipt.KV = append(receipt.KV, &types.KeyValue{Key: BxhIDKey(), Value: types.Encode(payload)})
receipt.Ty = brokertypes.TyInitLog receipt.Ty = brokertypes.TyInitLog
for _, v := range receipt.KV {
elog.Error("KV", "key:", string(v.GetKey()), "value", string(v.GetValue()))
}
return receipt, nil return receipt, nil
} }
...@@ -92,9 +95,9 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt ...@@ -92,9 +95,9 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt
//FIXME 权限检查 //FIXME 权限检查
receipt := &types.Receipt{Ty: types.ExecOk} receipt := &types.Receipt{Ty: types.ExecOk}
if payload.ReqType == brokertypes.Req_Inner { if payload.ReqType == brokertypes.Req_Inner {
meta, err := getMeta(b.db, calInnerMetaKey()) meta, err := getMeta(b.statedb, calInnerMetaKey())
if err != nil { if err != nil {
return nil,fmt.Errorf("broker not init!") return nil, fmt.Errorf("broker not init!")
} }
idx := meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)] idx := meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)]
if payload.SequenceNum != idx+1 { if payload.SequenceNum != idx+1 {
...@@ -106,9 +109,9 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt ...@@ -106,9 +109,9 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt
receipt.KV = append(receipt.KV, kvset...) receipt.KV = append(receipt.KV, kvset...)
} else if payload.ReqType == brokertypes.Req_Callback { } else if payload.ReqType == brokertypes.Req_Callback {
meta, err := getMeta(b.db, calCallBackMetaKey()) meta, err := getMeta(b.statedb, calCallBackMetaKey())
if err != nil { if err != nil {
return nil,fmt.Errorf("broker not init!") return nil, fmt.Errorf("broker not init!")
} }
idx := meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)] idx := meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)]
if payload.SequenceNum != idx+1 { if payload.SequenceNum != idx+1 {
...@@ -116,13 +119,13 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt ...@@ -116,13 +119,13 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt
} }
meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)] = idx + 1 meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)] = idx + 1
kvset := b.GetMetaKVSet(payload.ReqType, meta) kvset := b.GetMetaKVSet(payload.ReqType, meta)
kvset = append(kvset, &types.KeyValue{Key: outMsgKey(genServicePair(payload.SrcServiceID, payload.DstServiceID), idx+1), Value: types.Encode(payload.Response)}) // 跨出交易不需要确认是否成功
receipt.KV = append(receipt.KV, kvset...) receipt.KV = append(receipt.KV, kvset...)
} else if payload.ReqType == brokertypes.Req_DstRollback { } else if payload.ReqType == brokertypes.Req_DstRollback {
//todo //todo
meta, err := getMeta(b.db, calDstRollBackMetaKey()) meta, err := getMeta(b.statedb, calDstRollBackMetaKey())
if err != nil { if err != nil {
return nil,fmt.Errorf("broker not init!") return nil, fmt.Errorf("broker not init!")
} }
idx := meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)] idx := meta.Meta[genServicePair(payload.SrcServiceID, payload.DstServiceID)]
if payload.SequenceNum < idx+1 { if payload.SequenceNum < idx+1 {
...@@ -137,6 +140,7 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt ...@@ -137,6 +140,7 @@ func (b *BrokerDB) updateIndex(payload *brokertypes.UpdateIndex) (*types.Receipt
receipt.Logs = append(receipt.Logs, receiptlog) receipt.Logs = append(receipt.Logs, receiptlog)
return receipt, nil return receipt, nil
} }
//TODO 重构跨链事件发布 //TODO 重构跨链事件发布
func (b *BrokerDB) emitInterchainEvent(payload *brokertypes.InterchainEvent) (*types.Receipt, error) { func (b *BrokerDB) emitInterchainEvent(payload *brokertypes.InterchainEvent) (*types.Receipt, error) {
//FIXME 权限检查,是否需要设定指定账户拥有跨链的权限 //FIXME 权限检查,是否需要设定指定账户拥有跨链的权限
...@@ -149,9 +153,9 @@ func (b *BrokerDB) emitInterchainEvent(payload *brokertypes.InterchainEvent) (*t ...@@ -149,9 +153,9 @@ func (b *BrokerDB) emitInterchainEvent(payload *brokertypes.InterchainEvent) (*t
if err != nil { if err != nil {
return nil, brokertypes.ErrBrokerStorageTx return nil, brokertypes.ErrBrokerStorageTx
} }
meta, err := getMeta(b.db, calOutterMetaKey()) meta, err := getMeta(b.statedb, calOutterMetaKey())
if err != nil { if err != nil {
return nil,fmt.Errorf("broker not init!") return nil, fmt.Errorf("broker not init!")
} }
currServiceID, err := b.getCurrServiceID() currServiceID, err := b.getCurrServiceID()
if err != nil { if err != nil {
...@@ -164,7 +168,7 @@ func (b *BrokerDB) emitInterchainEvent(payload *brokertypes.InterchainEvent) (*t ...@@ -164,7 +168,7 @@ func (b *BrokerDB) emitInterchainEvent(payload *brokertypes.InterchainEvent) (*t
payload.Index = meta.Meta[genServicePair(currServiceID, payload.DstServiceID)] payload.Index = meta.Meta[genServicePair(currServiceID, payload.DstServiceID)]
receipt.KV = append(receipt.KV, kvset) receipt.KV = append(receipt.KV, kvset)
receipt.KV = append(receipt.KV, b.GetEventKVSet(payload)...) receipt.KV = append(receipt.KV, b.GetEventKVSet(payload)...)
receipt.KV = append(receipt.KV, &types.KeyValue{Key: outMsgKey(genServicePair(payload.SrcServiceID, payload.DstServiceID), payload.Index), Value: types.Encode(payload)})
log := &brokertypes.ReceiptBrokerLog{Value: &brokertypes.ReceiptBrokerLog_EmitInterchainEvent{payload}, Ty: brokertypes.TyEmitInterchainEventAction} log := &brokertypes.ReceiptBrokerLog{Value: &brokertypes.ReceiptBrokerLog_EmitInterchainEvent{payload}, Ty: brokertypes.TyEmitInterchainEventAction}
receiptlog := &types.ReceiptLog{Ty: brokertypes.TyEmitInterchainEventLog, Log: types.Encode(log)} receiptlog := &types.ReceiptLog{Ty: brokertypes.TyEmitInterchainEventLog, Log: types.Encode(log)}
receipt.Logs = append(receipt.Logs, receiptlog) receipt.Logs = append(receipt.Logs, receiptlog)
...@@ -184,7 +188,7 @@ func (b *BrokerDB) getCurrServiceID() (string, error) { ...@@ -184,7 +188,7 @@ func (b *BrokerDB) getCurrServiceID() (string, error) {
func (b *BrokerDB) genFullServiceID(serviceId string) (string, error) { func (b *BrokerDB) genFullServiceID(serviceId string) (string, error) {
//TODO 需要init方法,用于初始化bixhID和appChainID //TODO 需要init方法,用于初始化bixhID和appChainID
data, err := b.db.Get(BxhIDKey()) data, err := b.statedb.Get(BxhIDKey())
if err != nil { if err != nil {
return "", err return "", err
} }
...@@ -238,19 +242,19 @@ func getMeta(statedb dbm.KV, key []byte) (*brokertypes.Meta, error) { ...@@ -238,19 +242,19 @@ func getMeta(statedb dbm.KV, key []byte) (*brokertypes.Meta, error) {
} }
//去状态数据库中查询message //去状态数据库中查询message
func getMessage(statedb dbm.KV, key []byte) (*brokertypes.Response, error) { func getMessage(statedb dbm.KV, key []byte) (*brokertypes.InterchainEvent, error) {
data, err := statedb.Get(key) data, err := statedb.Get(key)
if err != nil { if err != nil {
elog.Error("getMessage", "not found", "key:", key) elog.Error("getMessage", "not found", "key:", key)
return nil, err return nil, err
} }
var resp brokertypes.Response var event brokertypes.InterchainEvent
err = types.Decode(data, &resp) err = types.Decode(data, &event)
if err != nil { if err != nil {
elog.Error("getMessage", "can't decode", "err:", err.Error()) elog.Error("getMessage", "can't decode", "err:", err.Error())
return nil, err return nil, err
} }
return &resp, nil return &event, nil
} }
//监听获取跨出事件列表 //监听获取跨出事件列表
...@@ -287,18 +291,30 @@ func getOutterMeta(statedb dbm.KV) (*brokertypes.Meta, error) { ...@@ -287,18 +291,30 @@ func getOutterMeta(statedb dbm.KV) (*brokertypes.Meta, error) {
func getCallBackMeta(statedb dbm.KV) (*brokertypes.Meta, error) { func getCallBackMeta(statedb dbm.KV) (*brokertypes.Meta, error) {
return getMeta(statedb, calCallBackMetaKey()) return getMeta(statedb, calCallBackMetaKey())
} }
//获取innerMeta //获取innerMeta
func getInnerMeta(statedb dbm.KV) (*brokertypes.Meta, error) { func getInnerMeta(statedb dbm.KV) (*brokertypes.Meta, error) {
return getMeta(statedb, calInnerMetaKey()) return getMeta(statedb, calInnerMetaKey())
} }
//获取message //获取message,用来查询跨入交易执行状态
func getInMessage(statedb dbm.KV, query *brokertypes.QueryInMessage) (*brokertypes.Response, error) { func getInMessage(statedb dbm.KV, query *brokertypes.QueryInMessage) (*brokertypes.Response, error) {
return getMessage(statedb, inMsgKey(query.InServicePair, query.SequenceNum)) data, err := statedb.Get(inMsgKey(query.InServicePair, query.SequenceNum))
if err != nil {
elog.Error("getInMessage", "not found", "key:", inMsgKey(query.InServicePair, query.SequenceNum))
return nil, err
}
var response brokertypes.Response
err = types.Decode(data, &response)
if err != nil {
elog.Error("getInMessage", "can't decode", "err:", err.Error())
return nil, err
}
return &response, nil
} }
//获取message //获取message,实质用来获取跨出事件
func getOutMessage(statedb dbm.KV, query *brokertypes.QueryOutMessage) (*brokertypes.Response, error) { func getOutMessage(statedb dbm.KV, query *brokertypes.QueryOutMessage) (*brokertypes.InterchainEvent, error) {
return getMessage(statedb, outMsgKey(query.InServicePair, query.SequenceNum)) return getMessage(statedb, outMsgKey(query.InServicePair, query.SequenceNum))
} }
......
...@@ -25,7 +25,7 @@ func (b *broker) Query_QueryInMessage(query *brokertypes.QueryInMessage) (*broke ...@@ -25,7 +25,7 @@ func (b *broker) Query_QueryInMessage(query *brokertypes.QueryInMessage) (*broke
} }
//获取outMessage //获取outMessage
func (b *broker) Query_QueryOutMessage(query *brokertypes.QueryOutMessage) (*brokertypes.Response, error) { func (b *broker) Query_QueryOutMessage(query *brokertypes.QueryOutMessage) (*brokertypes.InterchainEvent, error) {
return getOutMessage(b.GetStateDB(), query) return getOutMessage(b.GetStateDB(), query)
} }
......
...@@ -49,8 +49,8 @@ message UpdateIndex { ...@@ -49,8 +49,8 @@ message UpdateIndex {
string srcServiceID = 3; string srcServiceID = 3;
//请求类型 0表示信息流入 1表示信息流出 2表示回滚 //请求类型 0表示信息流入 1表示信息流出 2表示回滚
uint64 reqType =4; uint64 reqType =4;
//响应信息 //跨入交易执行结果更新
Response response = 5; Response response =5;
} }
// 跨链事件 // 跨链事件
message InterchainEvent { message InterchainEvent {
...@@ -133,10 +133,6 @@ message Meta { ...@@ -133,10 +133,6 @@ message Meta {
message Response{ message Response{
// A status code that should follow the HTTP status codes. //状态 0表示开始处理 1表示跨链成功 2表示跨链失败
int32 status = 1; int32 status = 1;
// A message associated with the response code.
string message = 2;
// A payload that can be used to include metadata with this response.
bytes payload = 3;
} }
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment