Commit 8d517298 authored by harrylee's avatar harrylee

add broker dapp

parent 614c36d2
all:
bash build.sh $(OUT) $(FLAG)
#!/bin/bash
# 官方ci集成脚本
strpwd=$(pwd)
strcmd=${strpwd##*dapp/}
strapp=${strcmd%/cmd*}
OUT_DIR="${1}/$strapp"
#FLAG=$2
mkdir -p "${OUT_DIR}"
cp ./build/* "${OUT_DIR}"
/*Package commands implement dapp client commands*/
package commands
import (
"github.com/spf13/cobra"
)
/*
* 实现合约对应客户端
*/
// Cmd broker client command
func Cmd() *cobra.Command {
cmd := &cobra.Command{
Use: "broker",
Short: "broker command",
Args: cobra.MinimumNArgs(1),
}
cmd.AddCommand(
//add sub command
)
return cmd
}
package executor
import (
log "github.com/33cn/chain33/common/log/log15"
drivers "github.com/33cn/chain33/system/dapp"
"github.com/33cn/chain33/types"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
)
/*
* 执行器相关定义
* 重载基类相关接口
*/
var (
//日志
elog = log.New("module", "broker.executor")
)
var driverName = brokertypes.BrokerX
// Init register dapp
func Init(name string, cfg *types.Chain33Config, sub []byte) {
drivers.Register(cfg, GetName(), newBroker, cfg.GetDappFork(driverName, "Enable"))
InitExecType()
}
// InitExecType Init Exec Type
func InitExecType() {
ety := types.LoadExecutorType(driverName)
ety.InitFuncList(types.ListMethod(&broker{}))
}
type broker struct {
drivers.DriverBase
}
func newBroker() drivers.Driver {
t := &broker{}
t.SetChild(t)
t.SetExecutorType(types.LoadExecutorType(driverName))
return t
}
// GetName get driver name
func GetName() string {
return newBroker().GetName()
}
func (b *broker) GetDriverName() string {
return driverName
}
// CheckTx 实现自定义检验交易接口,供框架调用
func (b *broker) CheckTx(tx *types.Transaction, index int) error {
// implement code
return nil
}
package executor
import (
"fmt"
"github.com/33cn/chain33/client"
dbm "github.com/33cn/chain33/common/db"
tab "github.com/33cn/chain33/common/db/table"
"github.com/33cn/chain33/types"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
storagetypes "github.com/33cn/plugin/plugin/dapp/storage/types"
)
//BrokerDB ...
type BrokerDB struct {
api client.QueueProtocolAPI
db dbm.KV
localdb dbm.KV
txhash []byte
fromaddr string
blocktime int64
height int64
index int
}
func newBrokerDB(b *broker, tx *types.Transaction, index int) *BrokerDB {
hash := tx.Hash()
fromaddr := tx.From()
return &BrokerDB{b.GetAPI(), b.GetStateDB(), b.GetLocalDB(), hash, fromaddr,
b.GetBlockTime(), b.GetHeight(), index}
}
////GetKVSet ...
//func (b *BrokerDB) GetKVSet() (kvset []*types.KeyValue) {
// kvset = append(kvset, &types.KeyValue{Key: Key(common.ToHex(s.txhash)), Value: types.Encode(payload)})
// return kvset
//}
func(b *BrokerDB)register(payload *brokertypes.Register )(*types.Receipt, error){
return nil,nil
}
func(b *BrokerDB)audit(payload *brokertypes.Audit )(*types.Receipt, error){
return nil,nil
}
func(b *BrokerDB)updateEventStatus(payload *brokertypes.UpdateEventStatus)(*types.Receipt, error){
receipt := &types.Receipt{Ty: types.ExecOk}
log :=&brokertypes.ReceiptBrokerLog{Value:&brokertypes.ReceiptBrokerLog_UpdateEventStatus{payload},Ty:brokertypes.TyEmitInterchainEventAction}
receiptlog := &types.ReceiptLog{Ty:brokertypes.TyUpdateEventStatusLog,Log:types.Encode(log)}
receipt.Logs=append(receipt.Logs,receiptlog)
return receipt,nil
}
func(b *BrokerDB)emitInterchainEvent(payload *brokertypes.InterchainEvent )(*types.Receipt, error){
receipt := &types.Receipt{Ty: types.ExecOk}
//发布跨链事件 分为1.信息跨链 2.代币跨链,代币跨链需要在这步将所需金额冻结在broker下面
if payload.ReqType == brokertypes.Req_Type_Storage {
//校验存在交易是否存在
params :=&storagetypes.QueryStorage{TxHash:payload.Args[0]}
_,err:=b.api.Query(storagetypes.StorageX,payload.GetFunc(),params)
if err != nil {
return nil,brokertypes.ErrBrokerStorageTx
}
log :=&brokertypes.ReceiptBrokerLog{Value:&brokertypes.ReceiptBrokerLog_EmitInterchainEvent{payload},Ty:brokertypes.TyEmitInterchainEventAction}
receiptlog := &types.ReceiptLog{Ty:brokertypes.TyEmitInterchainEventLog,Log:types.Encode(log)}
receipt.Logs=append(receipt.Logs,receiptlog)
}
if payload.ReqType == brokertypes.Req_Type_Coins {
//TODO 跨链转账
}
return receipt,nil
}
//根据状态遍历时间列表(可通过定时轮循这个接口,获取跨链时间)
func findInterChainEventListByStatus(localdb dbm.KV,status, direction int32, primaryKey string) (*brokertypes.InterchainEventList, error) {
table := NewInterchainEventTable(localdb)
prefix := []byte(fmt.Sprintf("%d",status))
var rows []*tab.Row
var err error
if primaryKey == "" { //第一次查询,默认展示最新得成交记录
rows, err = table.ListIndex("status", prefix, nil, brokertypes.PageSize, direction)
} else {
rows, err = table.ListIndex("status", prefix, []byte(primaryKey), brokertypes.PageSize, direction)
}
if err != nil {
elog.Error("findInterChainEventListByStatus.", "err", err.Error())
return nil, err
}
var eventList brokertypes.InterchainEventList
for _, row := range rows {
event := row.Data.(*brokertypes.InterchainEvent)
eventList.List = append(eventList.List, event)
}
//设置主键索引
if len(rows) == int(brokertypes.PageSize) {
eventList.PrimaryKey = string(rows[len(rows)-1].Primary)
}
return &eventList, nil
}
//根据index查询event
func findInterChainEventByIndex(localdb dbm.KV,index uint64)(*brokertypes.InterchainEvent,error){
table :=NewInterchainEventTable(localdb)
primaryKey := []byte(fmt.Sprintf("%018d", index))
row,err:=table.GetData(primaryKey)
if err !=nil {
return nil,err
}
return row.Data.(*brokertypes.InterchainEvent),nil
}
\ No newline at end of file
package executor
import (
"github.com/33cn/chain33/types"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
)
/*
* 实现交易的链上执行接口
* 关键数据上链(statedb)并生成交易回执(log)
*/
func (b *broker) Exec_Register(payload *brokertypes.Register, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index)
return db.register(payload)
}
func (b *broker) Exec_Audit(payload *brokertypes.Audit, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index)
//implement code 1.授权执行器 2.跨链账户是否需要授权
return db.audit(payload)
}
func (b *broker) Exec_UpdateEventStatus(payload *brokertypes.UpdateEventStatus, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index)
return db.updateEventStatus(payload)
}
func (b *broker) Exec_EmitInterchainEvent(payload *brokertypes.InterchainEvent, tx *types.Transaction, index int) (*types.Receipt, error) {
db := newBrokerDB(b,tx,index)
return db.emitInterchainEvent(payload)
}
package executor
import (
"github.com/33cn/chain33/types"
)
/*
* 实现区块回退时本地执行的数据清除
*/
// ExecDelLocal localdb kv数据自动回滚接口
func (b *broker) ExecDelLocal(tx *types.Transaction, receipt *types.ReceiptData, index int) (*types.LocalDBSet, error) {
kvs, err := b.DelRollbackKV(tx, tx.Execer)
if err != nil {
return nil, err
}
dbSet := &types.LocalDBSet{}
dbSet.KV = append(dbSet.KV, kvs...)
return dbSet, nil
}
package executor
import (
"fmt"
"github.com/33cn/chain33/types"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
)
/*
* 实现交易相关数据本地执行,数据不上链
* 非关键数据,本地存储(localDB), 用于辅助查询,效率高
*/
func (b *broker) ExecLocal_Register(payload *brokertypes.Register, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
dbSet := &types.LocalDBSet{}
//implement code, add customize kv to dbSet...
//auto gen for localdb auto rollback
return b.addAutoRollBack(tx, dbSet.KV), nil
}
func (b *broker) ExecLocal_Audit(payload *brokertypes.Audit, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
dbSet := &types.LocalDBSet{}
//implement code, add customize kv to dbSet...
//auto gen for localdb auto rollback
return b.addAutoRollBack(tx, dbSet.KV), nil
}
func (b *broker) ExecLocal_UpdateEventStatus(payload *brokertypes.UpdateEventStatus, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
dbSet := &types.LocalDBSet{}
table :=NewInterchainEventTable(b.GetLocalDB())
primaryKey := []byte(fmt.Sprintf("%018d", payload.Index))
row,err:=table.GetData(primaryKey)
if err !=nil {
return nil,err
}
event:=row.Data.(*brokertypes.InterchainEvent)
//更新状态
event.Status=payload.Status
table.Replace(event)
kvs,err:=table.Save()
if err !=nil {
return nil,err
}
dbSet.KV=append(dbSet.KV,kvs...)
return b.addAutoRollBack(tx, dbSet.KV), nil
}
func (b *broker) ExecLocal_EmitInterchainEvent(payload *brokertypes.InterchainEvent, tx *types.Transaction, receiptData *types.ReceiptData, index int) (*types.LocalDBSet, error) {
dbSet := &types.LocalDBSet{}
table :=NewInterchainEventTable(b.GetLocalDB())
payload.Index=b.GetIndex(index)
table.Add(payload)
kvs,err:=table.Save()
if err !=nil {
return nil,err
}
dbSet.KV=append(dbSet.KV,kvs...)
return b.addAutoRollBack(tx, dbSet.KV), nil
}
//GetIndex get index
func (b *broker) GetIndex(index int) uint64 {
return uint64(int(b.GetHeight())*types.MaxTxsPerBlock + index)
}
//当区块回滚时,框架支持自动回滚localdb kv,需要对exec-local返回的kv进行封装
func (b *broker) addAutoRollBack(tx *types.Transaction, kv []*types.KeyValue) *types.LocalDBSet {
dbSet := &types.LocalDBSet{}
dbSet.KV = b.AddRollbackKV(tx, tx.Execer, kv)
return dbSet
}
package executor
/*
* 用户合约存取kv数据时,key值前缀需要满足一定规范
* 即key = keyPrefix + userKey
* 需要字段前缀查询时,使用’-‘作为分割符号
*/
var (
//KeyPrefixStateDB state db key必须前缀
KeyPrefixStateDB = "mavl-broker-"
//KeyPrefixLocalDB local db的key必须前缀
KeyPrefixLocalDB = "LODB-broker-"
)
package executor
import (
"fmt"
"github.com/33cn/chain33/common/db"
"github.com/33cn/chain33/common/db/table"
"github.com/33cn/chain33/types"
ety "github.com/33cn/plugin/plugin/dapp/broker/types"
)
//重新设计表,list查询全部在跨链事件localdb查询中
var opt_broker_event = &table.Option{
Prefix: KeyPrefixLocalDB,
Name: "broker",
Primary: "index",
Index: []string{"dstChainID", "status","reqType"},
}
//InterchainEventRow table meta 结构
type InterchainEventRow struct {
*ety.InterchainEvent
}
//NewInterchainEventRow ...
func NewInterchainEventRow() *InterchainEventRow {
return &InterchainEventRow{&ety.InterchainEvent{}}
}
//CreateRow ...
func (m *InterchainEventRow) CreateRow() *table.Row {
return &table.Row{Data: &ety.InterchainEvent{}}
}
//SetPayload 设置数据
func (m *InterchainEventRow) SetPayload(data types.Message) error {
if txdata, ok := data.(*ety.InterchainEvent); ok {
m.InterchainEvent = txdata
return nil
}
return ety.ErrData
}
//Get 按照indexName 查询 indexValue
func (m *InterchainEventRow) Get(key string) ([]byte, error) {
if key == "index" {
return []byte(fmt.Sprintf("%018d", m.Index)), nil
} else if key == "dstChainID" {
return []byte(fmt.Sprintf("%s", m.GetDstChainID())), nil
} else if key == "status" {
return []byte(fmt.Sprintf("%d", m.GetStatus())), nil
}else if key == "reqType" {
return []byte(fmt.Sprintf("%d", m.GetReqType())), nil
}
return nil, types.ErrNotFound
}
//NewInterchainEventTable
func NewInterchainEventTable(kvdb db.KV) *table.Table {
rowmeta := NewInterchainEventRow()
table, err := table.NewTable(rowmeta, kvdb, opt_broker_event)
if err != nil {
panic(err)
}
return table
}
\ No newline at end of file
package types
import (
"github.com/33cn/chain33/pluginmgr"
"github.com/33cn/plugin/plugin/dapp/broker/commands"
"github.com/33cn/plugin/plugin/dapp/broker/executor"
"github.com/33cn/plugin/plugin/dapp/broker/rpc"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
)
/*
* 初始化dapp相关的组件
*/
func init() {
pluginmgr.Register(&pluginmgr.PluginBase{
Name: brokertypes.BrokerX,
ExecName: executor.GetName(),
Exec: executor.Init,
Cmd: commands.Cmd,
RPC: rpc.Init,
})
}
all:
bash ./create_protobuf.sh
syntax = "proto3";
package types;
option go_package = "../types";
message BrokerAction {
oneof value {
Register register = 1;
Audit audit = 2;
InterchainEvent emitInterchainEvent = 3;
UpdateEventStatus updateEventStatus =4;
}
int32 ty = 5;
}
// 业务合约注册模型: 0表示正在审核,1表示审核通过,2表示审核失败
message Register {
//业务合约名称
string exectorName = 1;
}
//审核
message Audit {
//业务合约名称
string exectorName = 1;
//状态码 0表示正在审核,1表示审核通过,2表示审核失败
string status = 2;
}
// 轮循事件
message PollingEvent {
string outMeta = 1;
}
// 更新状态
message UpdateEventStatus {
//源链ID
string index = 1;
//状态 0表示开始处理 1表示跨链成功 2表示跨链失败
int32 status = 2;
}
////发布跨链事件
//message EmitInterchainEvent{
// //目的链ID,固定格式0x000001
// string dstChainID = 1;
// //源链ID,固定格式0x000002
// string srcChainID = 2;
// //跨链交易类型 0表示storage,1表示coins,2表示token......
// uint64 reqType =3;
// //调用方法
// string func = 4;
// //参数列表
// repeated string args = 6;
//}
// 跨链事件
message InterchainEvent {
//索引值,这个有系统自动生成
uint64 index = 1;
//目的链ID,固定格式0x000001
string dstChainID = 2;
//源链ID,固定格式0x000002
string srcChainID = 3;
//跨链交易类型 0表示storage,1表示coins,2表示token......
uint64 reqType =4;
//调用方法
string func = 5;
//参数列表
repeated string args = 6;
//状态 0表示开始处理 1表示跨链成功 2表示跨链失败
int32 status = 7;
}
//ReceiptBrokerLog
message ReceiptBrokerLog{
oneof value {
Register register = 1;
Audit audit = 2;
InterchainEvent emitInterchainEvent = 3;
UpdateEventStatus updateEventStatus =4;
}
int32 ty = 5;
}
service broker {}
//根据状态查看跨链事件
message QueryInterchainEventList {
//事件状态必填(默认是0,只查询待处理状态的事件)
int32 status = 1;
// 主键索引
string primaryKey = 2;
//单页返回多少条记录,默认返回10条,为了系统安全最多单次只能返回20条
int32 count = 3;
// 0降序,1升序,默认降序
int32 direction = 4;
}
//跨链事件列表
message InterchainEventList {
repeated InterchainEvent list = 1;
string primaryKey = 2;
}
#!/bin/bash
# proto生成命令,将pb.go文件生成到types/目录下, chain33_path支持引用chain33框架的proto文件
chain33_path=$(go list -f '{{.Dir}}' "github.com/33cn/chain33")
protoc --go_out=plugins=grpc:../types ./*.proto --proto_path=. --proto_path="${chain33_path}/types/proto/"
package rpc
/*
* 实现json rpc和grpc service接口
* json rpc用Jrpc结构作为接收实例
* grpc使用channelClient结构作为接收实例
*/
package rpc
import (
rpctypes "github.com/33cn/chain33/rpc/types"
brokertypes "github.com/33cn/plugin/plugin/dapp/broker/types"
)
/*
* rpc相关结构定义和初始化
*/
// 实现grpc的service接口
type channelClient struct {
rpctypes.ChannelClient
}
// Jrpc 实现json rpc调用实例
type Jrpc struct {
cli *channelClient
}
// Grpc grpc
type Grpc struct {
*channelClient
}
// Init init rpc
func Init(name string, s rpctypes.RPCServer) {
cli := &channelClient{}
grpc := &Grpc{channelClient: cli}
cli.Init(name, s, &Jrpc{cli: cli}, grpc)
//存在grpc service时注册grpc server,需要生成对应的pb.go文件
brokertypes.RegisterBrokerServer(s.GRPC(), grpc)
}
package types
import (
log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/types"
)
/*
* 交易相关类型定义
* 交易action通常有对应的log结构,用于交易回执日志记录
* 每一种action和log需要用id数值和name名称加以区分
*/
// action类型id和name,这些常量可以自定义修改
const (
TyUnknowAction = iota + 100
TyRegisterAction
TyAuditAction
TyUpdateEventStatusAction
TyEmitInterchainEventAction
NameRegisterAction = "Register"
NameAuditAction = "Audit"
NameUpdateEventStatusAction = "UpdateEventStatus"
NameEmitInterchainEventAction = "EmitInterchainEvent"
)
// 请求类型
const (
Req_Type_Storage = iota
Req_Type_Coins
Req_Type_Token
)
// 跨链事件状态
const (
Pending = iota
Success
Fail
)
//查询方向
const (
ListDESC = int32(0)
ListASC = int32(1)
ListSeek = int32(2)
)
const (
//Count 单次list还回条数
PageSize = int32(10)
MaxPageSize = 100
)
// log类型id值
const (
TyUnknownLog = iota + 100
TyRegisterLog
TyAuditLog
TyUpdateEventStatusLog
TyEmitInterchainEventLog
)
var (
//BrokerX 执行器名称定义
BrokerX = "broker"
//定义actionMap
actionMap = map[string]int32{
NameRegisterAction: TyRegisterAction,
NameAuditAction: TyAuditAction,
NameUpdateEventStatusAction: TyUpdateEventStatusAction,
NameEmitInterchainEventAction: TyEmitInterchainEventAction,
}
//定义log的id和具体log类型及名称,填入具体自定义log类型
logMap = map[int64]*types.LogInfo{
//LogID: {Ty: reflect.TypeOf(LogStruct), Name: LogName},
}
tlog = log.New("module", "broker.types")
)
// init defines a register function
func init() {
types.AllowUserExec = append(types.AllowUserExec, []byte(BrokerX))
//注册合约启用高度
types.RegFork(BrokerX, InitFork)
types.RegExec(BrokerX, InitExecutor)
}
// InitFork defines register fork
func InitFork(cfg *types.Chain33Config) {
cfg.RegisterDappFork(BrokerX, "Enable", 0)
}
// InitExecutor defines register executor
func InitExecutor(cfg *types.Chain33Config) {
types.RegistorExecutor(BrokerX, NewType(cfg))
}
type brokerType struct {
types.ExecTypeBase
}
func NewType(cfg *types.Chain33Config) *brokerType {
c := &brokerType{}
c.SetChild(c)
c.SetConfig(cfg)
return c
}
// GetPayload 获取合约action结构
func (b *brokerType) GetPayload() types.Message {
return &BrokerAction{}
}
// GeTypeMap 获取合约action的id和name信息
func (b *brokerType) GetTypeMap() map[string]int32 {
return actionMap
}
// GetLogMap 获取合约log相关信息
func (b *brokerType) GetLogMap() map[int64]*types.LogInfo {
return logMap
}
This diff is collapsed.
package types
import "fmt"
// some errors definition
var (
ErrBrokerStorageTx = fmt.Errorf("%s", "The key doesn't exist!")
ErrInterChainEvent= fmt.Errorf("%s", "wrong interchain tx!")
ErrData = fmt.Errorf("%s","err interchain event data.")
)
\ No newline at end of file
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
_ "github.com/33cn/plugin/plugin/dapp/accountmanager" //auto gen _ "github.com/33cn/plugin/plugin/dapp/accountmanager" //auto gen
_ "github.com/33cn/plugin/plugin/dapp/autonomy" //auto gen _ "github.com/33cn/plugin/plugin/dapp/autonomy" //auto gen
_ "github.com/33cn/plugin/plugin/dapp/blackwhite" //auto gen _ "github.com/33cn/plugin/plugin/dapp/blackwhite" //auto gen
_ "github.com/33cn/plugin/plugin/dapp/broker" //auto gen
_ "github.com/33cn/plugin/plugin/dapp/cert" //auto gen _ "github.com/33cn/plugin/plugin/dapp/cert" //auto gen
_ "github.com/33cn/plugin/plugin/dapp/coinsx" //auto gen _ "github.com/33cn/plugin/plugin/dapp/coinsx" //auto gen
_ "github.com/33cn/plugin/plugin/dapp/collateralize" //auto gen _ "github.com/33cn/plugin/plugin/dapp/collateralize" //auto gen
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment