Commit 23b237bf authored by 张振华's avatar 张振华

Merge branch 'master' into guess

parents 13813d32 f4d8ec54
...@@ -60,7 +60,7 @@ type client struct { ...@@ -60,7 +60,7 @@ type client struct {
conn *grpc.ClientConn conn *grpc.ClientConn
grpcClient types.Chain33Client grpcClient types.Chain33Client
paraClient paracross.ParacrossClient paraClient paracross.ParacrossClient
isCatchingUp bool isCaughtUp bool
commitMsgClient *commitMsgClient commitMsgClient *commitMsgClient
authAccount string authAccount string
privateKey crypto.PrivKey privateKey crypto.PrivKey
...@@ -119,6 +119,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module { ...@@ -119,6 +119,7 @@ func New(cfg *types.Consensus, sub []byte) queue.Module {
paraClient: paraCli, paraClient: paraCli,
authAccount: cfg.AuthAccount, authAccount: cfg.AuthAccount,
privateKey: priKey, privateKey: priKey,
isCaughtUp: false,
} }
if cfg.WaitBlocks4CommitMsg < 2 { if cfg.WaitBlocks4CommitMsg < 2 {
...@@ -444,9 +445,9 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type ...@@ -444,9 +445,9 @@ func (client *client) RequestTx(currSeq int64, preMainBlockHash []byte) ([]*type
plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy) plog.Info("GetCurrentSeq", "Len of txs", len(txs), "seqTy", seqTy)
if lastSeq-currSeq > emptyBlockInterval { if lastSeq-currSeq > emptyBlockInterval {
client.isCatchingUp = true client.isCaughtUp = false
} else { } else {
client.isCatchingUp = false client.isCaughtUp = true
} }
if client.authAccount != "" { if client.authAccount != "" {
...@@ -654,7 +655,7 @@ func (client *client) CreateBlock() { ...@@ -654,7 +655,7 @@ func (client *client) CreateBlock() {
plog.Error("Incorrect sequence type") plog.Error("Incorrect sequence type")
incSeqFlag = false incSeqFlag = false
} }
if !client.isCatchingUp { if client.isCaughtUp {
time.Sleep(time.Second * time.Duration(blockSec)) time.Sleep(time.Second * time.Duration(blockSec))
} }
} }
...@@ -766,6 +767,14 @@ func (client *client) DelBlock(block *types.Block, seq int64) error { ...@@ -766,6 +767,14 @@ func (client *client) DelBlock(block *types.Block, seq int64) error {
return nil return nil
} }
//IsCaughtUp 是否追上最新高度,
func (client *client) Query_IsCaughtUp(req *types.ReqNil) (types.Message, error) {
if client == nil {
return nil, fmt.Errorf("%s", "client not bind message queue.")
}
return &types.IsCaughtUp{Iscaughtup: client.isCaughtUp}, nil
}
func checkMinerTx(current *types.BlockDetail) error { func checkMinerTx(current *types.BlockDetail) error {
//检查第一个笔交易的execs, 以及执行状态 //检查第一个笔交易的execs, 以及执行状态
if len(current.Block.Txs) == 0 { if len(current.Block.Txs) == 0 {
......
...@@ -83,7 +83,7 @@ out: ...@@ -83,7 +83,7 @@ out:
} }
case block := <-client.mainBlockAdd: case block := <-client.mainBlockAdd:
if client.currentTx != nil && !client.paraClient.isCatchingUp { if client.currentTx != nil && client.paraClient.isCaughtUp {
exist := checkTxInMainBlock(client.currentTx, block) exist := checkTxInMainBlock(client.currentTx, block)
if exist { if exist {
finishHeight = sendingHeight finishHeight = sendingHeight
......
...@@ -11,6 +11,7 @@ import ( ...@@ -11,6 +11,7 @@ import (
"os" "os"
"strings" "strings"
"github.com/33cn/chain33/rpc/jsonclient"
"github.com/33cn/chain33/types" "github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types" pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/spf13/cobra" "github.com/spf13/cobra"
...@@ -29,6 +30,7 @@ func ParcCmd() *cobra.Command { ...@@ -29,6 +30,7 @@ func ParcCmd() *cobra.Command {
CreateRawTransferCmd(), CreateRawTransferCmd(),
CreateRawWithdrawCmd(), CreateRawWithdrawCmd(),
CreateRawTransferToExecCmd(), CreateRawTransferToExecCmd(),
IsSyncCmd(),
) )
return cmd return cmd
} }
...@@ -281,3 +283,20 @@ func createTransferTx(cmd *cobra.Command, isWithdraw bool) (string, error) { ...@@ -281,3 +283,20 @@ func createTransferTx(cmd *cobra.Command, isWithdraw bool) (string, error) {
txHex := types.Encode(tx) txHex := types.Encode(tx)
return hex.EncodeToString(txHex), nil return hex.EncodeToString(txHex), nil
} }
// IsSyncCmd query parachain is sync
func IsSyncCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "is_sync",
Short: "query parachain is sync",
Run: isSync,
}
return cmd
}
func isSync(cmd *cobra.Command, args []string) {
rpcLaddr, _ := cmd.Flags().GetString("rpc_laddr")
var res bool
ctx := jsonclient.NewRPCCtx(rpcLaddr, "paracross.IsSync", nil, &res)
ctx.Run()
}
...@@ -2,6 +2,7 @@ syntax = "proto3"; ...@@ -2,6 +2,7 @@ syntax = "proto3";
import "transaction.proto"; import "transaction.proto";
import "common.proto"; import "common.proto";
import "blockchain.proto";
package types; package types;
...@@ -130,4 +131,5 @@ service paracross { ...@@ -130,4 +131,5 @@ service paracross {
rpc ListTitles(ReqNil) returns (RespParacrossTitles) {} rpc ListTitles(ReqNil) returns (RespParacrossTitles) {}
rpc GetTitleHeight(ReqParacrossTitleHeight) returns (ReceiptParacrossDone) {} rpc GetTitleHeight(ReqParacrossTitleHeight) returns (ReceiptParacrossDone) {}
rpc GetAssetTxResult(ReqHash) returns (ParacrossAsset) {} rpc GetAssetTxResult(ReqHash) returns (ParacrossAsset) {}
rpc IsSync(ReqNil) returns (IsCaughtUp) {}
} }
\ No newline at end of file
...@@ -91,3 +91,26 @@ func (c *Jrpc) GetAssetTxResult(req *types.ReqHash, result *interface{}) error { ...@@ -91,3 +91,26 @@ func (c *Jrpc) GetAssetTxResult(req *types.ReqHash, result *interface{}) error {
*result = data *result = data
return err return err
} }
// IsSync query is sync
func (c *channelClient) IsSync(ctx context.Context, in *types.ReqNil) (*types.IsCaughtUp, error) {
data, err := c.QueryConsensusFunc("para", "IsCaughtUp", &types.ReqNil{})
if err != nil {
return nil, err
}
return data.(*types.IsCaughtUp), nil
}
// IsSync query is sync
func (c *Jrpc) IsSync(in *types.ReqNil, result *interface{}) error {
//TODO consensus and paracross are not the same registered names ?
data, err := c.cli.QueryConsensusFunc("para", "IsCaughtUp", &types.ReqNil{})
*result = false
if err != nil {
return err
}
if reply, ok := data.(*types.IsCaughtUp); ok {
*result = reply.Iscaughtup
}
return nil
}
/*
* Copyright Fuzamei Corp. 2018 All Rights Reserved.
* Use of this source code is governed by a BSD-style
* license that can be found in the LICENSE file.
*/
package rpc
//only load all plugin and system
import (
"testing"
"github.com/33cn/chain33/client/mocks"
rpctypes "github.com/33cn/chain33/rpc/types"
"github.com/33cn/chain33/types"
pt "github.com/33cn/plugin/plugin/dapp/paracross/types"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
)
func newGrpc(api *mocks.QueueProtocolAPI) *channelClient {
return &channelClient{
ChannelClient: rpctypes.ChannelClient{QueueProtocolAPI: api},
}
}
func newJrpc(api *mocks.QueueProtocolAPI) *Jrpc {
return &Jrpc{cli: newGrpc(api)}
}
func TestChannelClient_GetTitle(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("paracross", nil, nil, nil)
req := &types.ReqString{Data: "xxxxxxxxxxx"}
api.On("Query", pt.GetExecName(), "GetTitle", req).Return(&pt.ParacrossStatus{}, nil)
_, err := client.GetTitle(context.Background(), req)
assert.Nil(t, err)
}
func TestJrpc_GetTitle(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
j := newJrpc(api)
req := &types.ReqString{Data: "xxxxxxxxxxx"}
var result interface{}
api.On("Query", pt.GetExecName(), "GetTitle", req).Return(&pt.ParacrossStatus{}, nil)
err := j.GetHeight(req, &result)
assert.Nil(t, err)
}
func TestChannelClient_ListTitles(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("paracross", nil, nil, nil)
req := &types.ReqNil{}
api.On("Query", pt.GetExecName(), "ListTitles", req).Return(&pt.RespParacrossTitles{}, nil)
_, err := client.ListTitles(context.Background(), req)
assert.Nil(t, err)
}
func TestJrpc_ListTitles(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
j := newJrpc(api)
req := &types.ReqNil{}
var result interface{}
api.On("Query", pt.GetExecName(), "ListTitles", req).Return(&pt.RespParacrossTitles{}, nil)
err := j.ListTitles(req, &result)
assert.Nil(t, err)
}
func TestChannelClient_GetTitleHeight(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("paracross", nil, nil, nil)
req := &pt.ReqParacrossTitleHeight{}
api.On("Query", pt.GetExecName(), "GetTitleHeight", req).Return(&pt.ReceiptParacrossDone{}, nil)
_, err := client.GetTitleHeight(context.Background(), req)
assert.Nil(t, err)
}
func TestJrpc_GetTitleHeight(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
j := newJrpc(api)
req := &pt.ReqParacrossTitleHeight{}
var result interface{}
api.On("Query", pt.GetExecName(), "GetTitleHeight", req).Return(&pt.ReceiptParacrossDone{}, nil)
err := j.GetTitleHeight(req, &result)
assert.Nil(t, err)
}
func TestChannelClient_GetAssetTxResult(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("paracross", nil, nil, nil)
req := &types.ReqHash{}
api.On("Query", pt.GetExecName(), "GetAssetTxResult", req).Return(&pt.ParacrossAsset{}, nil)
_, err := client.GetAssetTxResult(context.Background(), req)
assert.Nil(t, err)
}
func TestJrpc_GetAssetTxResult(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
j := newJrpc(api)
req := &types.ReqHash{}
var result interface{}
api.On("Query", pt.GetExecName(), "GetAssetTxResult", req).Return(&pt.ParacrossAsset{}, nil)
err := j.GetAssetTxResult(req, &result)
assert.Nil(t, err)
}
func TestChannelClient_IsSync(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
client := newGrpc(api)
client.Init("paracross", nil, nil, nil)
req := &types.ReqNil{}
api.On("QueryConsensusFunc", "para", "IsCaughtUp", req).Return(&types.IsCaughtUp{}, nil)
_, err := client.IsSync(context.Background(), req)
assert.Nil(t, err)
}
func TestJrpc_IsSync(t *testing.T) {
api := new(mocks.QueueProtocolAPI)
J := newJrpc(api)
req := &types.ReqNil{}
var result interface{}
api.On("QueryConsensusFunc", "para", "IsCaughtUp", req).Return(&types.IsCaughtUp{}, nil)
err := J.IsSync(req, &result)
assert.Nil(t, err)
}
//TODO wait finish
//func TestRPC_CallTestNode(t *testing.T) {
// api := new(mocks.QueueProtocolAPI)
// cfg, sub := testnode.GetDefaultConfig()
// // para consensus
// cfg.Consensus.Name = "para"
// cfg.Title="user.p.test."
// cfg.BlockChain.IsParaChain=true
// cfg.Store.Name="mavl"
// mock33 := testnode.NewWithConfig(cfg, sub, api)
// defer func() {
// mock33.Close()
// mock.AssertExpectationsForObjects(t, api)
// }()
// g := newGrpc(api)
// g.Init(pt.GetExecName(), mock33.GetRPC(), newJrpc(api), g)
// time.Sleep(time.Millisecond)
// mock33.Listen()
// time.Sleep(time.Millisecond)
// api.On("Query", pt.GetExecName(), "GetTitle", &types.ReqString{}).Return(&pt.ParacrossStatus{Title:"test"}, nil)
// api.On("Query", pt.GetExecName(), "ListTitles", &types.ReqNil{}).Return(&pt.RespParacrossTitles{Titles:[]*pt.ReceiptParacrossDone{&pt.ReceiptParacrossDone{Title:"test1"},&pt.ReceiptParacrossDone{Title:"test2"}}}, nil)
// api.On("Query", pt.GetExecName(), "GetTitleHeight", &pt.ReqParacrossTitleHeight{}).Return(&pt.ReceiptParacrossDone{Height:10}, nil)
// api.On("Query", pt.GetExecName(), "GetAssetTxResult", &types.ReqHash{}).Return(&pt.ParacrossAsset{Symbol:"test"}, nil)
// api.On("QueryConsensusFunc", "para", "IsCaughtUp",&types.ReqNil{}).Return(&types.IsCaughtUp{Iscaughtup:true}, nil)
// //test jrpc
// rpcCfg := mock33.GetCfg().RPC
// jsonClient, err := jsonclient.NewJSONClient("http://" + rpcCfg.JrpcBindAddr + "/")
// assert.Nil(t, err)
// assert.NotNil(t, jsonClient)
// var result pt.ParacrossStatus
// err = jsonClient.Call("paracross.GetHeight", nil, &result)
// fmt.Println(err)
// assert.Nil(t, err)
// assert.Equal(t, "test", result.Title)
//
// var reply types.IsCaughtUp
// err = jsonClient.Call("paracross.IsSync", &types.ReqNil{}, &reply)
// assert.Nil(t, err)
// assert.Equal(t, true, reply.Iscaughtup)
//
// var res pt.RespParacrossTitles
// err = jsonClient.Call("paracross.ListTitles", &types.ReqNil{}, &res)
// assert.Nil(t, err)
// assert.Equal(t, 2, len(res.Titles))
//
// //test grpc
//
// ctx := context.Background()
// c, err := grpc.DialContext(ctx, rpcCfg.GrpcBindAddr, grpc.WithInsecure())
// assert.Nil(t, err)
// assert.NotNil(t, c)
//
// client := pt.NewParacrossClient(c)
// issync, err := client.IsSync(ctx, &types.ReqNil{})
// assert.Nil(t, err)
// assert.Equal(t, true, issync.Iscaughtup)
//}
This diff is collapsed.
...@@ -91,7 +91,7 @@ func NewJoinTable(left *Table, right *Table, indexes []string) (*JoinTable, erro ...@@ -91,7 +91,7 @@ func NewJoinTable(left *Table, right *Table, indexes []string) (*JoinTable, erro
join.leftIndex = append(join.leftIndex, joinindex[0]) join.leftIndex = append(join.leftIndex, joinindex[0])
} }
if joinindex[1] == "" || !right.canGet(joinindex[1]) { if joinindex[1] == "" || !right.canGet(joinindex[1]) {
return nil, errors.New("jointable: left table can not get: " + joinindex[1]) return nil, errors.New("jointable: right table can not get: " + joinindex[1])
} }
if joinindex[1] != "" { if joinindex[1] != "" {
join.rightIndex = append(join.rightIndex, joinindex[1]) join.rightIndex = append(join.rightIndex, joinindex[1])
...@@ -160,6 +160,7 @@ func (join *JoinTable) GetData(primaryKey []byte) (*Row, error) { ...@@ -160,6 +160,7 @@ func (join *JoinTable) GetData(primaryKey []byte) (*Row, error) {
} }
rowjoin := join.meta.CreateRow() rowjoin := join.meta.CreateRow()
rowjoin.Ty = None rowjoin.Ty = None
rowjoin.Primary = leftrow.Primary
rowjoin.Data.(*JoinData).Left = leftrow.Data rowjoin.Data.(*JoinData).Left = leftrow.Data
rowjoin.Data.(*JoinData).Right = rightrow.Data rowjoin.Data.(*JoinData).Right = rightrow.Data
return rowjoin, nil return rowjoin, nil
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment