Commit ffb576dd authored by suyanlong's avatar suyanlong

Add blockchain lite client package

parent c438a3f2
Pipeline #8226 failed with stages
package direct_lite
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestMockLite(t *testing.T) {
lite := MockLite{}
require.Nil(t, lite.Start())
require.Nil(t, lite.Stop())
_, err := lite.QueryHeader(uint64(1))
require.Nil(t, err)
}
package direct_lite
import "gitlab.33.cn/link33/sidecar/model/pb"
type MockLite struct {
}
func (lite *MockLite) Start() error {
return nil
}
func (lite *MockLite) Stop() error {
return nil
}
func (lite *MockLite) QueryHeader(height uint64) (*pb.BlockHeader, error) {
return nil, nil
}
package hub_lite
import (
"fmt"
"strconv"
"gitlab.33.cn/link33/sidecar/model/pb"
)
func (lite *HubLite) persist(h *pb.BlockHeader) error {
batch := lite.storage.NewBatch()
data, err := h.Marshal()
if err != nil {
return fmt.Errorf("marshal header: %w", err)
}
batch.Put(headerKey(h.Number), data)
batch.Put(headerHeightKey(), []byte(strconv.FormatUint(lite.height, 10)))
batch.Commit()
return nil
}
// getLastHeight gets the current working height of lite
func (lite *HubLite) getLastHeight() (uint64, error) {
v := lite.storage.Get(headerHeightKey())
if v == nil {
// if header height is not set, return default 0
return 0, nil
}
return strconv.ParseUint(string(v), 10, 64)
}
package hub_lite
import (
"time"
"github.com/Rican7/retry"
"github.com/Rican7/retry/strategy"
"github.com/sirupsen/logrus"
"gitlab.33.cn/link33/sidecar/model/pb"
)
func (lite *HubLite) handleBlockHeader(header *pb.BlockHeader) {
if header == nil {
lite.logger.WithField("height", lite.height).Error("empty block header")
return
}
if header.Number < lite.getDemandHeight() {
return
}
if ok, err := lite.verifyHeader(header); !ok {
lite.logger.WithFields(logrus.Fields{
"height": header.Number,
"error": err,
}).Warn("Invalid header")
return
}
if err := lite.persist(header); err != nil {
lite.logger.WithFields(logrus.Fields{
"height": header.Number,
"error": err,
}).Error("Persist block header")
}
lite.updateHeight()
lite.logger.WithFields(logrus.Fields{
"height": header.Number,
}).Info("Persist block header")
}
func (lite *HubLite) syncBlock() {
loop := func(ch chan *pb.BlockHeader) {
for {
select {
case header, ok := <-ch:
if !ok {
lite.logger.Warn("Unexpected closed channel while syncing block header")
return
}
lite.handleBlockHeader(header)
case <-lite.ctx.Done():
return
}
}
}
for {
headerCh := lite.getHeaderChannel()
err := retry.Retry(func(attempt uint) error {
chainMeta, err := lite.client.GetChainMeta()
if err != nil {
lite.logger.WithField("error", err).Error("Get chain meta")
return err
}
if chainMeta.Height > lite.height {
lite.recover(lite.getDemandHeight(), chainMeta.Height)
}
return nil
}, strategy.Wait(1*time.Second))
if err != nil {
lite.logger.Panic(err)
}
loop(headerCh)
}
}
func (lite *HubLite) getHeaderChannel() chan *pb.BlockHeader {
ch := make(chan *pb.BlockHeader, maxChSize)
if err := retry.Retry(func(attempt uint) error {
if err := lite.syncBlockHeader(ch); err != nil {
return err
}
return nil
}, strategy.Wait(2*time.Second)); err != nil {
panic(err)
}
return ch
}
func (lite *HubLite) syncBlockHeader(headerCh chan<- *pb.BlockHeader) error {
ch, err := lite.client.Subscribe(lite.ctx, pb.SubscriptionRequest_BLOCK_HEADER, nil)
if err != nil {
return err
}
go func() {
for {
select {
case <-lite.ctx.Done():
return
case h, ok := <-ch:
if !ok {
close(headerCh)
return
}
headerCh <- h.(*pb.BlockHeader)
}
}
}()
return nil
}
package hub_lite
import (
"context"
"fmt"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/sirupsen/logrus"
rpcx "gitlab.33.cn/link33/sidecar/hub/client"
"gitlab.33.cn/link33/sidecar/model/pb"
)
const maxChSize = 1024
type HubLite struct {
client rpcx.Client
storage storage.Storage
logger logrus.FieldLogger
height uint64
ctx context.Context
cancel context.CancelFunc
}
func New(client rpcx.Client, storage storage.Storage, logger logrus.FieldLogger) (*HubLite, error) {
return &HubLite{
client: client,
storage: storage,
logger: logger,
}, nil
}
func (lite *HubLite) Start() error {
ctx, cancel := context.WithCancel(context.Background())
lite.ctx = ctx
lite.cancel = cancel
meta, err := lite.client.GetChainMeta()
if err != nil {
return fmt.Errorf("get chain meta from bitxhub: %w", err)
}
// recover the block height which has latest unfinished interchain tx
height, err := lite.getLastHeight()
if err != nil {
return fmt.Errorf("get last height: %w", err)
}
lite.height = height
if meta.Height > height {
lite.recover(lite.getDemandHeight(), meta.Height)
}
go lite.syncBlock()
lite.logger.WithFields(logrus.Fields{
"current_height": lite.height,
"bitxhub_height": meta.Height,
}).Info("BitXHub lite started")
return nil
}
func (lite *HubLite) Stop() error {
lite.cancel()
lite.logger.Info("BitXHub lite stopped")
return nil
}
func (lite *HubLite) QueryHeader(height uint64) (*pb.BlockHeader, error) {
v := lite.storage.Get(headerKey(height))
if v == nil {
return nil, fmt.Errorf("header at %d not found", height)
}
header := &pb.BlockHeader{}
if err := header.Unmarshal(v); err != nil {
return nil, err
}
return header, nil
}
// recover will recover those missing merkle wrapper when pier is down
func (lite *HubLite) recover(begin, end uint64) {
lite.logger.WithFields(logrus.Fields{
"begin": begin,
"end": end,
}).Info("BitXHub lite recover")
headerCh := make(chan *pb.BlockHeader, maxChSize)
if err := lite.client.GetBlockHeader(lite.ctx, begin, end, headerCh); err != nil {
lite.logger.WithFields(logrus.Fields{
"begin": begin,
"end": end,
"error": err,
}).Warn("Get block header")
}
for h := range headerCh {
lite.handleBlockHeader(h)
}
}
package hub_lite
import (
"context"
"fmt"
"io/ioutil"
"testing"
"time"
"github.com/cbergoon/merkletree"
"github.com/golang/mock/gomock"
"github.com/meshplus/bitxhub-kit/crypto"
"github.com/meshplus/bitxhub-kit/crypto/asym"
"github.com/meshplus/bitxhub-kit/storage/leveldb"
"github.com/meshplus/bitxhub-kit/types"
"github.com/stretchr/testify/require"
"gitlab.33.cn/link33/sidecar/hub/client/mock_client"
"gitlab.33.cn/link33/sidecar/model/pb"
"gitlab.33.cn/link33/sidecar/pkg/log"
)
const (
from = "0x3f9d18f7c3a6e5e4c0b877fe3e688ab08840b997"
)
func TestSyncHeader(t *testing.T) {
lite, client, _ := prepare(t)
defer lite.storage.Close()
// expect mock module returns
txs := make([]*pb.BxhTransaction, 0, 2)
txs = append(txs, getTx(t), getTx(t))
h1 := getBlockHeader(t, txs, 1)
h2 := getBlockHeader(t, txs, 2)
// mock invalid block header
h3 := getBlockHeader(t, txs, 3)
h3.TxRoot = types.NewHashByStr(from)
syncHeaderCh := make(chan interface{}, 2)
meta := &pb.ChainMeta{
Height: 1,
BlockHash: types.NewHashByStr(from),
}
client.EXPECT().Subscribe(gomock.Any(), pb.SubscriptionRequest_BLOCK_HEADER, gomock.Any()).Return(syncHeaderCh, nil).AnyTimes()
client.EXPECT().GetBlockHeader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(ctx context.Context, begin, end uint64, ch chan<- *pb.BlockHeader) {
ch <- h1
close(ch)
}).AnyTimes()
client.EXPECT().GetChainMeta().Return(meta, nil).AnyTimes()
go func() {
syncHeaderCh <- h2
}()
done := make(chan bool, 1)
go func() {
err := lite.Start()
require.Nil(t, err)
<-done
}()
time.Sleep(1 * time.Second)
// recover should have persist height 1 wrapper
receivedHeader, err := lite.QueryHeader(2)
require.Nil(t, err)
require.Equal(t, h2, receivedHeader)
done <- true
require.Equal(t, uint64(2), lite.height)
require.Nil(t, lite.Stop())
// query nonexistent header
_, err = lite.QueryHeader(10)
require.NotNil(t, err)
// query unmarshal error header
lite.storage.Put(headerKey(20), []byte("a"))
_, err = lite.QueryHeader(20)
require.NotNil(t, err)
// start with get last height error
lite.storage.Put(headerHeightKey(), []byte("a"))
err = lite.Start()
require.NotNil(t, err)
}
func TestSyncHeader_Start_GetChainMateError(t *testing.T) {
lite, client, _ := prepare(t)
defer lite.storage.Close()
client.EXPECT().GetChainMeta().Return(nil, fmt.Errorf("get chain meta error")).AnyTimes()
err := lite.Start()
require.NotNil(t, err)
}
func TestSyncHeader_Recover_GetBlockHeaderError(t *testing.T) {
lite, client, _ := prepare(t)
defer lite.storage.Close()
// expect mock module returns
txs := make([]*pb.BxhTransaction, 0, 2)
txs = append(txs, getTx(t), getTx(t))
h1 := getBlockHeader(t, txs, 1)
syncHeaderCh := make(chan interface{}, 2)
meta := &pb.ChainMeta{
Height: 1,
BlockHash: types.NewHashByStr(from),
}
client.EXPECT().Subscribe(gomock.Any(), pb.SubscriptionRequest_BLOCK_HEADER, gomock.Any()).Return(syncHeaderCh, nil).AnyTimes()
client.EXPECT().GetChainMeta().Return(meta, nil).AnyTimes()
client.EXPECT().GetBlockHeader(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
func(ctx context.Context, begin, end uint64, ch chan<- *pb.BlockHeader) {
ch <- h1
close(ch)
}).Return(fmt.Errorf("get block header error")).AnyTimes()
err := lite.Start()
require.Nil(t, err)
}
func TestHubLite_HandleBlockHeaderError(t *testing.T) {
lite, _, _ := prepare(t)
defer lite.storage.Close()
txs := make([]*pb.BxhTransaction, 0, 2)
txs = append(txs, getTx(t), getTx(t))
h1 := getBlockHeader(t, txs, 1)
lite.height = 2
// handle nil header
lite.handleBlockHeader(nil)
require.Equal(t, uint64(2), lite.height)
// handle header with number less than demand height
lite.handleBlockHeader(h1)
require.Equal(t, uint64(2), lite.height)
}
func TestHubLite_GetHeaderChannelError(t *testing.T) {
lite, client, _ := prepare(t)
defer lite.storage.Close()
syncHeaderCh := make(chan interface{}, 0)
client.EXPECT().Subscribe(gomock.Any(), pb.SubscriptionRequest_BLOCK_HEADER, gomock.Any()).Return(nil, fmt.Errorf("subscribe error")).Times(1)
client.EXPECT().Subscribe(gomock.Any(), pb.SubscriptionRequest_BLOCK_HEADER, gomock.Any()).Return(syncHeaderCh, nil).AnyTimes()
go func() {
lite.getHeaderChannel()
}()
}
func TestHubLite_SyncBlockHeaderError(t *testing.T) {
lite, client, _ := prepare(t)
defer lite.storage.Close()
client.EXPECT().Subscribe(gomock.Any(), pb.SubscriptionRequest_BLOCK_HEADER, gomock.Any()).Return(nil, fmt.Errorf("subscribe error")).AnyTimes()
ch := make(chan *pb.BlockHeader, maxChSize)
require.NotNil(t, lite.syncBlockHeader(ch))
}
func prepare(t *testing.T) (*HubLite, *mock_client.MockClient, []crypto.PrivateKey) {
mockCtl := gomock.NewController(t)
mockCtl.Finish()
client := mock_client.NewMockClient(mockCtl)
tmpDir, err := ioutil.TempDir("", "storage")
require.Nil(t, err)
storage, err := leveldb.New(tmpDir)
require.Nil(t, err)
keys := getVlts(t)
lite, err := New(client, storage, log.NewWithModule("lite"))
require.Nil(t, err)
return lite, client, keys
}
func getBlockHeader(t *testing.T, txs []*pb.BxhTransaction, number uint64) *pb.BlockHeader {
hashes := make([]merkletree.Content, 0, len(txs))
for i := 0; i < len(txs); i++ {
hash := txs[i].Hash()
hashes = append(hashes, hash)
}
tree, err := merkletree.NewTree(hashes)
require.Nil(t, err)
root := tree.MerkleRoot()
wrapper := &pb.BlockHeader{
Number: number,
Timestamp: time.Now().UnixNano(),
ParentHash: types.NewHashByStr(from),
TxRoot: types.NewHash(root),
}
return wrapper
}
func getVlts(t *testing.T) []crypto.PrivateKey {
var keys []crypto.PrivateKey
for i := 0; i < 4; i++ {
priv, err := asym.GenerateKeyPair(crypto.Secp256k1)
require.Nil(t, err)
keys = append(keys, priv)
}
return keys
}
func getTx(t *testing.T) *pb.BxhTransaction {
ibtp := getIBTP(t, 1, pb.IBTP_INTERCHAIN)
body, err := ibtp.Marshal()
require.Nil(t, err)
tmpIP := &pb.InvokePayload{
Method: "set",
Args: []*pb.Arg{{Value: body}},
}
pd, err := tmpIP.Marshal()
require.Nil(t, err)
td := &pb.TransactionData{
Type: pb.TransactionData_INVOKE,
Payload: pd,
}
data, err := td.Marshal()
require.Nil(t, err)
faddr := &types.Address{}
faddr.SetBytes([]byte(from))
tx := &pb.BxhTransaction{
From: faddr,
To: faddr,
Payload: data,
IBTP: ibtp,
}
return tx
}
func getIBTP(t *testing.T, index uint64, typ pb.IBTP_Type) *pb.IBTP {
ct := &pb.Content{
Func: "set",
Args: [][]byte{[]byte("Alice")},
}
c, err := ct.Marshal()
require.Nil(t, err)
pd := pb.Payload{
Encrypted: false,
Content: c,
}
ibtppd, err := pd.Marshal()
require.Nil(t, err)
return &pb.IBTP{
From: from,
To: from,
Payload: ibtppd,
Nonce: index,
Type: typ,
}
}
package hub_lite
import (
"fmt"
"sync/atomic"
)
func (lite *HubLite) getDemandHeight() uint64 {
return atomic.LoadUint64(&lite.height) + 1
}
func (lite *HubLite) updateHeight() {
atomic.AddUint64(&lite.height, 1)
}
func headerHeightKey() []byte {
return []byte("lite-height")
}
func headerKey(height uint64) []byte {
return []byte(fmt.Sprintf("header-%d", height))
}
package hub_lite
import (
"gitlab.33.cn/link33/sidecar/model/pb"
)
func (lite *HubLite) verifyHeader(h *pb.BlockHeader) (bool, error) {
// TODO: blocked by signature mechanism implementation of Hub
return true, nil
}
package lite
import (
"gitlab.33.cn/link33/sidecar/internal"
"gitlab.33.cn/link33/sidecar/model/pb"
)
//go:generate mockgen -destination mock_lite/mock_lite.go -package mock_lite -source interface.go
type Lite interface {
internal.Launcher
// QueryHeader gets block header given block height
QueryHeader(height uint64) (*pb.BlockHeader, error)
}
// Code generated by MockGen. DO NOT EDIT.
// Source: interface.go
// Package mock_lite is a generated GoMock package.
package mock_lite
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
pb "gitlab.33.cn/link33/sidecar/model/pb"
)
// MockLite is a mock of Lite interface.
type MockLite struct {
ctrl *gomock.Controller
recorder *MockLiteMockRecorder
}
// MockLiteMockRecorder is the mock recorder for MockLite.
type MockLiteMockRecorder struct {
mock *MockLite
}
// NewMockLite creates a new mock instance.
func NewMockLite(ctrl *gomock.Controller) *MockLite {
mock := &MockLite{ctrl: ctrl}
mock.recorder = &MockLiteMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockLite) EXPECT() *MockLiteMockRecorder {
return m.recorder
}
// QueryHeader mocks base method.
func (m *MockLite) QueryHeader(height uint64) (*pb.BlockHeader, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "QueryHeader", height)
ret0, _ := ret[0].(*pb.BlockHeader)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// QueryHeader indicates an expected call of QueryHeader.
func (mr *MockLiteMockRecorder) QueryHeader(height interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryHeader", reflect.TypeOf((*MockLite)(nil).QueryHeader), height)
}
// Start mocks base method.
func (m *MockLite) Start() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Start")
ret0, _ := ret[0].(error)
return ret0
}
// Start indicates an expected call of Start.
func (mr *MockLiteMockRecorder) Start() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockLite)(nil).Start))
}
// Stop mocks base method.
func (m *MockLite) Stop() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Stop")
ret0, _ := ret[0].(error)
return ret0
}
// Stop indicates an expected call of Stop.
func (mr *MockLiteMockRecorder) Stop() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockLite)(nil).Stop))
}
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
rpcx "gitlab.33.cn/link33/sidecar/hub/client" rpcx "gitlab.33.cn/link33/sidecar/hub/client"
"gitlab.33.cn/link33/sidecar/internal/lite"
"gitlab.33.cn/link33/sidecar/model/pb" "gitlab.33.cn/link33/sidecar/model/pb"
) )
...@@ -41,6 +42,8 @@ type WrapperSyncer struct { ...@@ -41,6 +42,8 @@ type WrapperSyncer struct {
appchainDID string appchainDID string
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
lite lite.Lite
} }
type SubscriptionKey struct { type SubscriptionKey struct {
......
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
rpcx "gitlab.33.cn/link33/sidecar/hub/client" rpcx "gitlab.33.cn/link33/sidecar/hub/client"
"gitlab.33.cn/link33/sidecar/hub/client/mock_client" "gitlab.33.cn/link33/sidecar/hub/client/mock_client"
"gitlab.33.cn/link33/sidecar/internal/lite/mock_lite"
"gitlab.33.cn/link33/sidecar/internal/repo" "gitlab.33.cn/link33/sidecar/internal/repo"
"gitlab.33.cn/link33/sidecar/model/constant" "gitlab.33.cn/link33/sidecar/model/constant"
"gitlab.33.cn/link33/sidecar/model/pb" "gitlab.33.cn/link33/sidecar/model/pb"
...@@ -221,7 +222,7 @@ func TestQueryIBTP(t *testing.T) { ...@@ -221,7 +222,7 @@ func TestQueryIBTP(t *testing.T) {
origin := &pb.IBTP{ origin := &pb.IBTP{
From: from, From: from,
Index: 1, Nonce: 1,
Timestamp: time.Now().UnixNano(), Timestamp: time.Now().UnixNano(),
} }
receiptData, err := origin.Marshal() receiptData, err := origin.Marshal()
......
...@@ -14,6 +14,7 @@ type Launcher interface { ...@@ -14,6 +14,7 @@ type Launcher interface {
Start() error Start() error
Stop() error Stop() error
} }
// Client defines the interface that interacts with appchain // Client defines the interface that interacts with appchain
//go:generate mockgen -destination mock_client/mock_client.go -package mock_client -source interface.go //go:generate mockgen -destination mock_client/mock_client.go -package mock_client -source interface.go
type Client interface { // 业务实现委托接口。 type Client interface { // 业务实现委托接口。
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment