Commit a85e55b1 authored by vipwzw's avatar vipwzw Committed by vipwzw

fixbug #252

parent f3a87a2f
...@@ -9,7 +9,6 @@ import ( ...@@ -9,7 +9,6 @@ import (
"github.com/33cn/chain33/client" "github.com/33cn/chain33/client"
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
"github.com/33cn/chain33/rpc/grpcclient"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
...@@ -47,9 +46,9 @@ type mainChainAPI struct { ...@@ -47,9 +46,9 @@ type mainChainAPI struct {
} }
//New 新建接口 //New 新建接口
func New(api client.QueueProtocolAPI, grpcaddr string) ExecutorAPI { func New(api client.QueueProtocolAPI, grpcClient types.Chain33Client) ExecutorAPI {
if types.IsPara() { if types.IsPara() {
return newParaChainAPI(api, grpcaddr) return newParaChainAPI(api, grpcClient)
} }
return &mainChainAPI{api: api} return &mainChainAPI{api: api}
} }
...@@ -86,19 +85,7 @@ type paraChainAPI struct { ...@@ -86,19 +85,7 @@ type paraChainAPI struct {
errflag int32 errflag int32
} }
func newParaChainAPI(api client.QueueProtocolAPI, grpcaddr string) ExecutorAPI { func newParaChainAPI(api client.QueueProtocolAPI, grpcClient types.Chain33Client) ExecutorAPI {
paraRemoteGrpcClient := types.Conf("config.consensus.sub.para").GStr("ParaRemoteGrpcClient")
if grpcaddr != "" {
paraRemoteGrpcClient = grpcaddr
}
if paraRemoteGrpcClient == "" {
paraRemoteGrpcClient = "127.0.0.1:8002"
}
conn, err := grpc.Dial(grpcclient.NewMultipleURL(paraRemoteGrpcClient), grpc.WithInsecure())
if err != nil {
panic(err)
}
grpcClient := types.NewChain33Client(conn)
return &paraChainAPI{api: api, grpcClient: grpcClient} return &paraChainAPI{api: api, grpcClient: grpcClient}
} }
......
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"github.com/33cn/chain33/queue" "github.com/33cn/chain33/queue"
qmocks "github.com/33cn/chain33/queue/mocks" qmocks "github.com/33cn/chain33/queue/mocks"
"github.com/33cn/chain33/rpc" "github.com/33cn/chain33/rpc"
"github.com/33cn/chain33/rpc/grpcclient"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
...@@ -16,7 +17,9 @@ import ( ...@@ -16,7 +17,9 @@ import (
func TestAPI(t *testing.T) { func TestAPI(t *testing.T) {
api := new(mocks.QueueProtocolAPI) api := new(mocks.QueueProtocolAPI)
eapi := New(api, "") gapi, err := grpcclient.NewMainChainClient("")
assert.Nil(t, err)
eapi := New(api, gapi)
param := &types.ReqHashes{ param := &types.ReqHashes{
Hashes: [][]byte{[]byte("hello")}, Hashes: [][]byte{[]byte("hello")},
} }
...@@ -54,13 +57,16 @@ func TestAPI(t *testing.T) { ...@@ -54,13 +57,16 @@ func TestAPI(t *testing.T) {
go server.Listen() go server.Listen()
time.Sleep(time.Second) time.Sleep(time.Second)
eapi = New(api, "") eapi = New(api, gapi)
_, err = eapi.GetBlockByHashes(param) _, err = eapi.GetBlockByHashes(param)
assert.Equal(t, true, IsGrpcError(err)) assert.Equal(t, true, IsGrpcError(err))
assert.Equal(t, false, IsGrpcError(nil)) assert.Equal(t, false, IsGrpcError(nil))
assert.Equal(t, false, IsGrpcError(errors.New("xxxx"))) assert.Equal(t, false, IsGrpcError(errors.New("xxxx")))
assert.Equal(t, true, eapi.IsErr()) assert.Equal(t, true, eapi.IsErr())
eapi = New(api, "127.0.0.1:8003")
gapi2, err := grpcclient.NewMainChainClient("127.0.0.1:8003")
assert.Nil(t, err)
eapi = New(api, gapi2)
detail, err = eapi.GetBlockByHashes(param) detail, err = eapi.GetBlockByHashes(param)
assert.Equal(t, err, nil) assert.Equal(t, err, nil)
assert.Equal(t, detail, &types.BlockDetails{}) assert.Equal(t, detail, &types.BlockDetails{})
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package common package common
import ( import (
"fmt"
"testing" "testing"
"time" "time"
...@@ -25,14 +26,17 @@ var NtpHosts = []string{ ...@@ -25,14 +26,17 @@ var NtpHosts = []string{
func TestGetRealTime(t *testing.T) { func TestGetRealTime(t *testing.T) {
hosts := NtpHosts hosts := NtpHosts
nettime := GetRealTimeRetry(hosts, 10) nettime := GetRealTimeRetry(hosts, 10)
now := time.Now()
//get nettime error, ignore //get nettime error, ignore
if nettime.IsZero() { if nettime.IsZero() {
return return
} }
nettime2 := GetRealTimeRetry(hosts, 10) nettime2 := GetRealTimeRetry(hosts, 10)
//get nettime error, ignore //get nettime error, ignore
delt := time.Since(now)
if nettime2.IsZero() { if nettime2.IsZero() {
return return
} }
assert.Equal(t, int(nettime2.Sub(nettime)/time.Second), 0) fmt.Println(nettime, nettime2)
assert.Equal(t, nettime2.Sub(nettime)/time.Second, delt/time.Second)
} }
...@@ -29,6 +29,7 @@ type executor struct { ...@@ -29,6 +29,7 @@ type executor struct {
difficulty uint64 difficulty uint64
txs []*types.Transaction txs []*types.Transaction
api client.QueueProtocolAPI api client.QueueProtocolAPI
gcli types.Chain33Client
execapi api.ExecutorAPI execapi api.ExecutorAPI
receipts []*types.ReceiptData receipts []*types.ReceiptData
} }
...@@ -58,6 +59,8 @@ func newExecutor(ctx *executorCtx, exec *Executor, txs []*types.Transaction, rec ...@@ -58,6 +59,8 @@ func newExecutor(ctx *executorCtx, exec *Executor, txs []*types.Transaction, rec
ctx: ctx, ctx: ctx,
txs: txs, txs: txs,
receipts: receipts, receipts: receipts,
api: exec.qclient,
gcli: exec.grpccli,
} }
e.coinsAccount.SetDB(e.stateDB) e.coinsAccount.SetDB(e.stateDB)
return e return e
...@@ -150,6 +153,7 @@ func (e *executor) setEnv(exec drivers.Driver) { ...@@ -150,6 +153,7 @@ func (e *executor) setEnv(exec drivers.Driver) {
exec.SetEnv(e.height, e.blocktime, e.difficulty) exec.SetEnv(e.height, e.blocktime, e.difficulty)
exec.SetBlockInfo(e.ctx.parentHash, e.ctx.mainHash, e.ctx.mainHeight) exec.SetBlockInfo(e.ctx.parentHash, e.ctx.mainHash, e.ctx.mainHeight)
exec.SetAPI(e.api) exec.SetAPI(e.api)
exec.SetExecutorAPI(e.api, e.gcli)
e.execapi = exec.GetExecutorAPI() e.execapi = exec.GetExecutorAPI()
exec.SetTxs(e.txs) exec.SetTxs(e.txs)
exec.SetReceipt(e.receipts) exec.SetReceipt(e.receipts)
......
...@@ -15,6 +15,7 @@ import ( ...@@ -15,6 +15,7 @@ import (
clog "github.com/33cn/chain33/common/log" clog "github.com/33cn/chain33/common/log"
log "github.com/33cn/chain33/common/log/log15" log "github.com/33cn/chain33/common/log/log15"
"github.com/33cn/chain33/pluginmgr" "github.com/33cn/chain33/pluginmgr"
"github.com/33cn/chain33/rpc/grpcclient"
drivers "github.com/33cn/chain33/system/dapp" drivers "github.com/33cn/chain33/system/dapp"
// register drivers // register drivers
...@@ -40,6 +41,7 @@ func DisableLog() { ...@@ -40,6 +41,7 @@ func DisableLog() {
type Executor struct { type Executor struct {
client queue.Client client queue.Client
qclient client.QueueProtocolAPI qclient client.QueueProtocolAPI
grpccli types.Chain33Client
pluginEnable map[string]bool pluginEnable map[string]bool
alias map[string]string alias map[string]string
} }
...@@ -102,6 +104,10 @@ func (exec *Executor) SetQueueClient(qcli queue.Client) { ...@@ -102,6 +104,10 @@ func (exec *Executor) SetQueueClient(qcli queue.Client) {
if err != nil { if err != nil {
panic(err) panic(err)
} }
exec.grpccli, err = grpcclient.NewMainChainClient("")
if err != nil {
panic(err)
}
//recv 消息的处理 //recv 消息的处理
go func() { go func() {
for msg := range exec.client.Recv() { for msg := range exec.client.Recv() {
...@@ -144,7 +150,7 @@ func (exec *Executor) procExecQuery(msg queue.Message) { ...@@ -144,7 +150,7 @@ func (exec *Executor) procExecQuery(msg queue.Message) {
db.(*StateDB).enableMVCC() db.(*StateDB).enableMVCC()
driver.SetStateDB(db) driver.SetStateDB(db)
driver.SetAPI(exec.qclient) driver.SetAPI(exec.qclient)
driver.SetExecutorAPI(exec.qclient, exec.grpccli)
//查询的情况下下,执行器不做严格校验,allow,尽可能的加载执行器,并且做查询 //查询的情况下下,执行器不做严格校验,allow,尽可能的加载执行器,并且做查询
ret, err := driver.Query(data.FuncName, data.Param) ret, err := driver.Query(data.FuncName, data.Param)
...@@ -168,7 +174,6 @@ func (exec *Executor) procExecCheckTx(msg queue.Message) { ...@@ -168,7 +174,6 @@ func (exec *Executor) procExecCheckTx(msg queue.Message) {
} }
execute := newExecutor(ctx, exec, datas.Txs, nil) execute := newExecutor(ctx, exec, datas.Txs, nil)
execute.enableMVCC() execute.enableMVCC()
execute.api = exec.qclient
//返回一个列表表示成功还是失败 //返回一个列表表示成功还是失败
result := &types.ReceiptCheckTxList{} result := &types.ReceiptCheckTxList{}
for i := 0; i < len(datas.Txs); i++ { for i := 0; i < len(datas.Txs); i++ {
...@@ -200,7 +205,6 @@ func (exec *Executor) procExecTxList(msg queue.Message) { ...@@ -200,7 +205,6 @@ func (exec *Executor) procExecTxList(msg queue.Message) {
} }
execute := newExecutor(ctx, exec, datas.Txs, nil) execute := newExecutor(ctx, exec, datas.Txs, nil)
execute.enableMVCC() execute.enableMVCC()
execute.api = exec.qclient
var receipts []*types.Receipt var receipts []*types.Receipt
index := 0 index := 0
for i := 0; i < len(datas.Txs); i++ { for i := 0; i < len(datas.Txs); i++ {
...@@ -270,7 +274,6 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) { ...@@ -270,7 +274,6 @@ func (exec *Executor) procExecAddBlock(msg queue.Message) {
} }
execute := newExecutor(ctx, exec, b.Txs, datas.Receipts) execute := newExecutor(ctx, exec, b.Txs, datas.Receipts)
execute.enableMVCC() execute.enableMVCC()
execute.api = exec.qclient
var kvset types.LocalDBSet var kvset types.LocalDBSet
for _, kv := range datas.KV { for _, kv := range datas.KV {
execute.stateDB.Set(kv.Key, kv.Value) execute.stateDB.Set(kv.Key, kv.Value)
...@@ -331,7 +334,6 @@ func (exec *Executor) procExecDelBlock(msg queue.Message) { ...@@ -331,7 +334,6 @@ func (exec *Executor) procExecDelBlock(msg queue.Message) {
} }
execute := newExecutor(ctx, exec, b.Txs, nil) execute := newExecutor(ctx, exec, b.Txs, nil)
execute.enableMVCC() execute.enableMVCC()
execute.api = exec.qclient
var kvset types.LocalDBSet var kvset types.LocalDBSet
for _, kv := range datas.KV { for _, kv := range datas.KV {
execute.stateDB.Set(kv.Key, kv.Value) execute.stateDB.Set(kv.Key, kv.Value)
......
...@@ -45,3 +45,33 @@ func TestMultipleGRPC(t *testing.T) { ...@@ -45,3 +45,33 @@ func TestMultipleGRPC(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, reply.Hash, []byte("hello")) assert.Equal(t, reply.Hash, []byte("hello"))
} }
func TestNewParaClient(t *testing.T) {
qapi := new(mocks.QueueProtocolAPI)
qapi.On("Query", "ticket", "RandNumHash", mock.Anything).Return(&types.ReplyHash{Hash: []byte("hello")}, nil)
//testnode setup
rpcCfg := new(types.RPC)
rpcCfg.GrpcBindAddr = "127.0.0.1:8003"
rpcCfg.JrpcBindAddr = "127.0.0.1:8004"
rpcCfg.MainnetJrpcAddr = rpcCfg.JrpcBindAddr
rpcCfg.Whitelist = []string{"127.0.0.1", "0.0.0.0"}
rpcCfg.JrpcFuncWhitelist = []string{"*"}
rpcCfg.GrpcFuncWhitelist = []string{"*"}
rpc.InitCfg(rpcCfg)
server := rpc.NewGRpcServer(&qmocks.Client{}, qapi)
assert.NotNil(t, server)
go server.Listen()
time.Sleep(time.Second)
//一个IP 有效,一个IP 无效
paraRemoteGrpcClient := "127.0.0.1:8004,127.0.0.1:8003,127.0.0.1"
grpcClient, err := grpcclient.NewMainChainClient(paraRemoteGrpcClient)
assert.Nil(t, err)
param := &types.ReqRandHash{
ExecName: "ticket",
BlockNum: 5,
Hash: []byte("hello"),
}
reply, err := grpcClient.QueryRandNum(context.Background(), param)
assert.Nil(t, err)
assert.Equal(t, reply.Hash, []byte("hello"))
}
package grpcclient
import (
"github.com/33cn/chain33/types"
"google.golang.org/grpc"
)
//NewMainChainClient 创建一个平行链的 主链 grpc chain33 客户端
func NewMainChainClient(grpcaddr string) (types.Chain33Client, error) {
paraRemoteGrpcClient := types.Conf("config.consensus.sub.para").GStr("ParaRemoteGrpcClient")
if grpcaddr != "" {
paraRemoteGrpcClient = grpcaddr
}
if paraRemoteGrpcClient == "" {
paraRemoteGrpcClient = "127.0.0.1:8002"
}
conn, err := grpc.Dial(NewMultipleURL(paraRemoteGrpcClient), grpc.WithInsecure())
if err != nil {
return nil, err
}
grpcClient := types.NewChain33Client(conn)
return grpcClient, nil
}
...@@ -58,6 +58,7 @@ type Driver interface { ...@@ -58,6 +58,7 @@ type Driver interface {
Query(funcName string, params []byte) (types.Message, error) Query(funcName string, params []byte) (types.Message, error)
IsFree() bool IsFree() bool
SetAPI(client.QueueProtocolAPI) SetAPI(client.QueueProtocolAPI)
SetExecutorAPI(queueapi client.QueueProtocolAPI, chain33api types.Chain33Client)
SetTxs(txs []*types.Transaction) SetTxs(txs []*types.Transaction)
SetReceipt(receipts []*types.ReceiptData) SetReceipt(receipts []*types.ReceiptData)
...@@ -129,7 +130,11 @@ func (d *DriverBase) GetFuncMap() map[string]reflect.Method { ...@@ -129,7 +130,11 @@ func (d *DriverBase) GetFuncMap() map[string]reflect.Method {
// SetAPI set queue protocol api // SetAPI set queue protocol api
func (d *DriverBase) SetAPI(queueapi client.QueueProtocolAPI) { func (d *DriverBase) SetAPI(queueapi client.QueueProtocolAPI) {
d.api = queueapi d.api = queueapi
d.execapi = api.New(queueapi, "") }
// SetExecutorAPI set queue protocol api
func (d *DriverBase) SetExecutorAPI(queueapi client.QueueProtocolAPI, chain33api types.Chain33Client) {
d.execapi = api.New(queueapi, chain33api)
} }
// GetAPI return queue protocol api // GetAPI return queue protocol api
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/33cn/chain33/client/mocks" "github.com/33cn/chain33/client/mocks"
"github.com/33cn/chain33/rpc/grpcclient"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
"github.com/33cn/chain33/util" "github.com/33cn/chain33/util"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
...@@ -81,6 +82,9 @@ func TestDriverAPI(t *testing.T) { ...@@ -81,6 +82,9 @@ func TestDriverAPI(t *testing.T) {
demo.SetLocalDB(kvdb) demo.SetLocalDB(kvdb)
demo.SetStateDB(kvdb) demo.SetStateDB(kvdb)
demo.SetAPI(&mocks.QueueProtocolAPI{}) demo.SetAPI(&mocks.QueueProtocolAPI{})
gcli, err := grpcclient.NewMainChainClient("")
assert.Nil(t, err)
demo.SetExecutorAPI(&mocks.QueueProtocolAPI{}, gcli)
assert.NotNil(t, demo.GetAPI()) assert.NotNil(t, demo.GetAPI())
assert.NotNil(t, demo.GetExecutorAPI()) assert.NotNil(t, demo.GetExecutorAPI())
types.SetTitleOnlyForTest("chain33") types.SetTitleOnlyForTest("chain33")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment