Commit 9f60df2f authored by harrylee's avatar harrylee

update broker

parent 65527d44
Pipeline #8119 failed with stages
in 0 seconds
...@@ -22,6 +22,7 @@ var driverName = brokertypes.BrokerX ...@@ -22,6 +22,7 @@ var driverName = brokertypes.BrokerX
// Init register dapp // Init register dapp
func Init(name string, cfg *types.Chain33Config, sub []byte) { func Init(name string, cfg *types.Chain33Config, sub []byte) {
drivers.Register(cfg, GetName(), newBroker, cfg.GetDappFork(driverName, "Enable")) drivers.Register(cfg, GetName(), newBroker, cfg.GetDappFork(driverName, "Enable"))
//TODO 初始化时,需要把应用链的ID加载进去
InitExecType() InitExecType()
} }
......
This diff is collapsed.
...@@ -10,6 +10,11 @@ import ( ...@@ -10,6 +10,11 @@ import (
* 关键数据上链(statedb)并生成交易回执(log) * 关键数据上链(statedb)并生成交易回执(log)
*/ */
func (b *broker) Exec_Init()(*types.Receipt, error){
return nil,nil
}
func (b *broker) Exec_Register(payload *brokertypes.Register, tx *types.Transaction, index int) (*types.Receipt, error) { func (b *broker) Exec_Register(payload *brokertypes.Register, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index) db := newBrokerDB(b,tx,index)
return db.register(payload) return db.register(payload)
...@@ -21,9 +26,9 @@ func (b *broker) Exec_Audit(payload *brokertypes.Audit, tx *types.Transaction, i ...@@ -21,9 +26,9 @@ func (b *broker) Exec_Audit(payload *brokertypes.Audit, tx *types.Transaction, i
return db.audit(payload) return db.audit(payload)
} }
func (b *broker) Exec_UpdateEventStatus(payload *brokertypes.UpdateEventStatus, tx *types.Transaction, index int) (*types.Receipt, error) { func (b *broker) Exec_UpdateIndex(payload *brokertypes.UpdateIndex, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index) db := newBrokerDB(b,tx,index)
return db.updateEventStatus(payload) return db.updateIndex(payload)
} }
func (b *broker) Exec_EmitInterchainEvent(payload *brokertypes.InterchainEvent, tx *types.Transaction, index int) (*types.Receipt, error) { func (b *broker) Exec_EmitInterchainEvent(payload *brokertypes.InterchainEvent, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index) db := newBrokerDB(b,tx,index)
......
package executor package executor
import ( import (
"fmt"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types" brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
) )
...@@ -27,36 +26,17 @@ func (b *broker) ExecLocal_Audit(payload *brokertypes.Audit, tx *types.Transacti ...@@ -27,36 +26,17 @@ func (b *broker) ExecLocal_Audit(payload *brokertypes.Audit, tx *types.Transacti
return b.addAutoRollBack(tx, dbSet.KV), nil return b.addAutoRollBack(tx, dbSet.KV), nil
} }
func (b *broker) ExecLocal_UpdateEventStatus(payload *brokertypes.UpdateEventStatus, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
func (b *broker) ExecLocal_UpdateIndex(payload *brokertypes.UpdateIndex, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
dbSet := &types.LocalDBSet{} dbSet := &types.LocalDBSet{}
table :=NewInterchainEventTable(b.GetLocalDB())
primaryKey := []byte(fmt.Sprintf("%018d", payload.Index))
row,err:=table.GetData(primaryKey)
if err !=nil {
return nil,err
}
event:=row.Data.(*brokertypes.InterchainEvent)
//更新状态
event.Status=payload.Status
table.Replace(event)
kvs,err:=table.Save()
if err !=nil {
return nil,err
}
dbSet.KV=append(dbSet.KV,kvs...)
return b.addAutoRollBack(tx, dbSet.KV), nil return b.addAutoRollBack(tx, dbSet.KV), nil
} }
func (b *broker) ExecLocal_EmitInterchainEvent(payload *brokertypes.InterchainEvent, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) { func (b *broker) ExecLocal_EmitInterchainEvent(payload *brokertypes.InterchainEvent, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
dbSet := &types.LocalDBSet{} dbSet := &types.LocalDBSet{}
table :=NewInterchainEventTable(b.GetLocalDB())
payload.Index=b.GetIndex(index)
table.Add(payload)
kvs,err:=table.Save()
if err !=nil {
return nil,err
}
dbSet.KV=append(dbSet.KV,kvs...)
return b.addAutoRollBack(tx, dbSet.KV), nil return b.addAutoRollBack(tx, dbSet.KV), nil
} }
......
package executor package executor
import "fmt"
/* /*
* 用户合约存取kv数据时,key值前缀需要满足一定规范 * 用户合约存取kv数据时,key值前缀需要满足一定规范
* 即key = keyPrefix + userKey * 即key = keyPrefix + userKey
* 需要字段前缀查询时,使用’-‘作为分割符号 * 需要字段前缀查询时,使用’-‘作为分割符号
*/ */
var ( const (
//KeyPrefixStateDB state db key必须前缀 //KeyPrefixStateDB state db key必须前缀
KeyPrefixStateDB = "mavl-broker-" KeyPrefixStateDB = "mavl-broker-"
//KeyPrefixLocalDB local db的key必须前缀 //KeyPrefixLocalDB local db的key必须前缀
KeyPrefixLocalDB = "LODB-broker-" KeyPrefixLocalDB = "LODB-broker-"
innerMeta = "inner-meta"
outterMeta = "outter-meta"
callbackMeta = "callback-meta"
dstRollbackMeta = "dst-rollback-meta"
whiteList = "white-list"
adminList = "admin-list"
passed = "1"
rejected = "2"
delimiter = "&"
bxhID = "bxh-id"
appchainID = "appchain-id"
) )
//状态数据库中存储innermeta信息
func calInnerMetaKey() []byte {
key := fmt.Sprintf("%s"+":%s", KeyPrefixStateDB, innerMeta)
return []byte(key)
}
//状态数据库中存储outtermeta信息
func calOutterMetaKey() []byte {
key := fmt.Sprintf("%s"+":%s", KeyPrefixStateDB, outterMeta)
return []byte(key)
}
func calCallBackMetaKey() []byte {
key := fmt.Sprintf("%s"+":%s", KeyPrefixStateDB, callbackMeta)
return []byte(key)
}
func calDstRollBackMetaKey() []byte {
key := fmt.Sprintf("%s"+":%s", KeyPrefixStateDB, dstRollbackMeta)
return []byte(key)
}
//生成service key
func genServicePair(from, to string) string {
return fmt.Sprintf("%s-%s",from, to)
}
func calServicePair(from,to string,index uint64)[]byte{
key :=fmt.Sprintf("%s%s-%s-%016d",KeyPrefixStateDB,from, to,index)
return []byte(key)
}
func calEventKey(servicePair string,index uint64)[]byte{
key :=fmt.Sprintf("%s%s-%016d",KeyPrefixStateDB,servicePair,index)
return []byte(key)
}
func outMsgKey(to string, index uint64) []byte {
return []byte(fmt.Sprintf("%sout-msg-%s-%016d",KeyPrefixStateDB, to, index))
}
func inMsgKey(from string, index uint64) []byte {
return []byte(fmt.Sprintf("%sin-msg-%s-%016d",KeyPrefixStateDB,from, index))
}
\ No newline at end of file
package executor package executor
import (
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
)
//获取innterMeta
func(b *broker) Query_QueryInnerMeta(query *brokertypes.QueryInnerMeta)(*brokertypes.Meta,error){
return getInnerMeta(b.GetStateDB())
}
//获取outterMeta
func(b *broker) Query_QueryOutterMeta(query *brokertypes.QueryOutterMeta)(*brokertypes.Meta,error){
return getOutterMeta(b.GetStateDB())
}
//获取inMessage
func(b *broker) Query_QueryInMessage(query *brokertypes.QueryInMessage)(*brokertypes.Response,error){
return getInMessage(b.GetStateDB(),query)
}
//获取outMessage
func(b *broker) Query_QueryOutMessage(query *brokertypes.QueryOutMessage)(*brokertypes.Response,error){
return getOutMessage(b.GetStateDB(),query)
}
//获取监听列表
func(b *broker) Query_PollingEvent(query *brokertypes.PollingEvent)(*brokertypes.InterChainEventList,error){
return pollInterEvent(b.GetStateDB(),query.Meta)
}
\ No newline at end of file
...@@ -13,7 +13,7 @@ var opt_broker_event = &table.Option{ ...@@ -13,7 +13,7 @@ var opt_broker_event = &table.Option{
Prefix: KeyPrefixLocalDB, Prefix: KeyPrefixLocalDB,
Name: "broker", Name: "broker",
Primary: "index", Primary: "index",
Index: []string{"dstChainID", "status","reqType"}, Index: []string{"dstServiceID", "status","reqType"},
} }
...@@ -45,12 +45,12 @@ func (m *InterchainEventRow) SetPayload(data types.Message) error { ...@@ -45,12 +45,12 @@ func (m *InterchainEventRow) SetPayload(data types.Message) error {
func (m *InterchainEventRow) Get(key string) ([]byte, error) { func (m *InterchainEventRow) Get(key string) ([]byte, error) {
if key == "index" { if key == "index" {
return []byte(fmt.Sprintf("%018d", m.Index)), nil return []byte(fmt.Sprintf("%018d", m.Index)), nil
} else if key == "dstChainID" { } else if key == "dstServiceID" {
return []byte(fmt.Sprintf("%s", m.GetDstChainID())), nil return []byte(fmt.Sprintf("%s", m.GetDstServiceID())), nil
} else if key == "status" { } else if key == "status" {
return []byte(fmt.Sprintf("%d", m.GetStatus())), nil return []byte(fmt.Sprintf("%d", m.GetStatus())), nil
}else if key == "reqType" { }else if key == "type" {
return []byte(fmt.Sprintf("%d", m.GetReqType())), nil return []byte(fmt.Sprintf("%d", m.GetType())), nil
} }
return nil, types.ErrNotFound return nil, types.ErrNotFound
} }
......
...@@ -6,7 +6,7 @@ message BrokerAction { ...@@ -6,7 +6,7 @@ message BrokerAction {
Register register = 1; Register register = 1;
Audit audit = 2; Audit audit = 2;
InterchainEvent emitInterchainEvent = 3; InterchainEvent emitInterchainEvent = 3;
UpdateEventStatus updateEventStatus =4; UpdateIndex updateIndex =4;
} }
int32 ty = 5; int32 ty = 5;
} }
...@@ -26,43 +26,33 @@ message Audit { ...@@ -26,43 +26,33 @@ message Audit {
// 轮循事件 // 轮循事件
message PollingEvent { message PollingEvent {
string outMeta = 1; Meta meta = 1;
} }
// 更新状态 // 更新跨链事件索引
message UpdateEventStatus { message UpdateIndex {
//源链ID //当前链已经处理到的位置
string index = 1; uint64 sequenceNum = 1;
//状态 0表示开始处理 1表示跨链成功 2表示跨链失败 //目的链服务ID,固定格式0x000001
int32 status = 2; string dstServiceID = 2;
//源链ID,固定格式0x000002
string srcServiceID = 3;
//请求类型 0表示信息流入 1表示信息流出 2表示回滚
uint64 reqType =4;
//响应信息
Response response = 5;
} }
////发布跨链事件
//message EmitInterchainEvent{
// //目的链ID,固定格式0x000001
// string dstChainID = 1;
// //源链ID,固定格式0x000002
// string srcChainID = 2;
// //跨链交易类型 0表示storage,1表示coins,2表示token......
// uint64 reqType =3;
// //调用方法
// string func = 4;
// //参数列表
// repeated string args = 6;
//}
// 跨链事件 // 跨链事件
message InterchainEvent { message InterchainEvent {
//索引值,这个有系统自动生成 //索引值,这个有系统自动生成
uint64 index = 1; uint64 index = 1;
//目的链ID,固定格式0x000001 //目的链ID,固定格式0x000001
string dstChainID = 2; string dstServiceID = 2;
//源链ID,固定格式0x000002 //源链ID,固定格式0x000002
string srcChainID = 3; string srcServiceID = 3;
//跨链交易类型 0表示storage,1表示coins,2表示token...... //跨链交易类型 0表示storage,1表示coins,2表示token......
uint64 reqType =4; uint64 type =4;
//调用方法 //调用方法
string func = 5; string func = 5;
//参数列表 //参数列表
...@@ -76,25 +66,69 @@ message ReceiptBrokerLog{ ...@@ -76,25 +66,69 @@ message ReceiptBrokerLog{
Register register = 1; Register register = 1;
Audit audit = 2; Audit audit = 2;
InterchainEvent emitInterchainEvent = 3; InterchainEvent emitInterchainEvent = 3;
UpdateEventStatus updateEventStatus =4; UpdateIndex updateIndex =4;
} }
int32 ty = 5; int32 ty = 5;
} }
service broker {} service broker {}
//根据状态查看跨链事件
message QueryInterchainEventList { //查询跨出事件
//事件状态必填(默认是0,只查询待处理状态的事件) message QueryInterchainEvent{
int32 status = 1; uint64 index =1;
// 主键索引 //目的链ID,固定格式0x000001
string primaryKey = 2; string dstServiceID = 2;
//单页返回多少条记录,默认返回10条,为了系统安全最多单次只能返回20条 //源链ID,固定格式0x000002
int32 count = 3; string srcServiceID = 3;
// 0降序,1升序,默认降序
int32 direction = 4;
} }
//跨链事件列表 //跨链事件列表
message InterchainEventList { message InterChainEventList{
repeated InterchainEvent list = 1; repeated InterchainEvent list =1;
string primaryKey = 2;
} }
////根据状态查看跨链事件
//message QueryInterchainEventList {
// //事件状态必填(默认是0,只查询待处理状态的事件)
// int32 status = 1;
// // 主键索引
// string primaryKey = 2;
// //单页返回多少条记录,默认返回10条,为了系统安全最多单次只能返回20条
// int32 count = 3;
// // 0降序,1升序,默认降序
// int32 direction = 4;
//}
////跨链事件列表
//message InterchainEventList {
// repeated InterchainEvent list = 1;
// string primaryKey = 2;
//}
message QueryOutterMeta{
}
message QueryInnerMeta{
}
message QueryInMessage{
string inServicePair = 1;
uint64 sequenceNum = 2;
}
message QueryOutMessage{
string inServicePair = 1;
uint64 sequenceNum = 2;
}
message Meta {
map<string,uint64> meta = 1;
}
message Response{
// A status code that should follow the HTTP status codes.
int32 status = 1;
// A message associated with the response code.
string message = 2;
// A payload that can be used to include metadata with this response.
bytes payload = 3;
}
\ No newline at end of file
...@@ -16,12 +16,12 @@ const ( ...@@ -16,12 +16,12 @@ const (
TyUnknowAction = iota + 100 TyUnknowAction = iota + 100
TyRegisterAction TyRegisterAction
TyAuditAction TyAuditAction
TyUpdateEventStatusAction TyUpdateIndexAction
TyEmitInterchainEventAction TyEmitInterchainEventAction
NameRegisterAction = "Register" NameRegisterAction = "Register"
NameAuditAction = "Audit" NameAuditAction = "Audit"
NameUpdateEventStatusAction = "UpdateEventStatus" NameUpdateIndexAction = "UpdateIndex"
NameEmitInterchainEventAction = "EmitInterchainEvent" NameEmitInterchainEventAction = "EmitInterchainEvent"
) )
...@@ -32,12 +32,27 @@ const ( ...@@ -32,12 +32,27 @@ const (
Req_Type_Token Req_Type_Token
) )
//元数据类型
const (
Meta_Inner = iota
Meta_Outter
Meta_RollBack
)
//元数据类型
const (
Req_Inner = iota
Req_Callback
Req_DstRollback
)
// 跨链事件状态 // 跨链事件状态
const ( const (
Pending = iota Pending = iota
Success Success
Fail Fail
) )
//查询方向 //查询方向
const ( const (
ListDESC = int32(0) ListDESC = int32(0)
...@@ -47,7 +62,7 @@ const ( ...@@ -47,7 +62,7 @@ const (
const ( const (
//Count 单次list还回条数 //Count 单次list还回条数
PageSize = int32(10) PageSize = int32(10)
MaxPageSize = 100 MaxPageSize = 100
) )
...@@ -56,7 +71,7 @@ const ( ...@@ -56,7 +71,7 @@ const (
TyUnknownLog = iota + 100 TyUnknownLog = iota + 100
TyRegisterLog TyRegisterLog
TyAuditLog TyAuditLog
TyUpdateEventStatusLog TyUpdateIndexLog
TyEmitInterchainEventLog TyEmitInterchainEventLog
) )
...@@ -67,7 +82,7 @@ var ( ...@@ -67,7 +82,7 @@ var (
actionMap = map[string]int32{ actionMap = map[string]int32{
NameRegisterAction: TyRegisterAction, NameRegisterAction: TyRegisterAction,
NameAuditAction: TyAuditAction, NameAuditAction: TyAuditAction,
NameUpdateEventStatusAction: TyUpdateEventStatusAction, NameUpdateIndexAction: TyUpdateIndexAction,
NameEmitInterchainEventAction: TyEmitInterchainEventAction, NameEmitInterchainEventAction: TyEmitInterchainEventAction,
} }
//定义log的id和具体log类型及名称,填入具体自定义log类型 //定义log的id和具体log类型及名称,填入具体自定义log类型
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment