Commit 25fd813a authored by suyanlong's avatar suyanlong

Add manager function and fixed bug

parent d3dddfdb
Pipeline #8324 failed with stages
...@@ -104,11 +104,7 @@ func httpPProf(port int64) { ...@@ -104,11 +104,7 @@ func httpPProf(port int64) {
go func() { go func() {
addr := fmt.Sprintf("localhost:%d", port) addr := fmt.Sprintf("localhost:%d", port)
fmt.Printf("Pprof on localhost:%d\n\n", port) fmt.Printf("Pprof on localhost:%d\n\n", port)
err := http.ListenAndServe(addr, nil) tool.Asset(http.ListenAndServe(addr, nil))
if err != nil {
fmt.Println(err)
panic(err)
}
}() }()
} }
......
...@@ -64,10 +64,9 @@ func NewSidecar(repoRoot string, config *repo.Config) (internal.Launcher, error) ...@@ -64,10 +64,9 @@ func NewSidecar(repoRoot string, config *repo.Config) (internal.Launcher, error)
// TODO hub client // TODO hub client
// privateKey DH // privateKey DH
cryptor := txcrypto.NewCryptor(nil, appchainMgr, privateKey) cryptor := txcrypto.NewCryptor(nil, appchainMgr, privateKey)
clientPort := appchain.NewPorts(clients, cryptor, logger) managerPort := appchain.NewManager(store, clients, cryptor, logger)
r.Adds(clientPort) r.Adds(managerPort.Ports())
rule, err := rulemgr.New(store, loggers.Logger(loggers.RuleMgr)) rule := rulemgr.New(store, loggers.Logger(loggers.RuleMgr))
tool.Asset(err)
mg, err := manager.NewManager(addr.String(), pm, appchainMgr, rule, loggers.Logger(loggers.Manager)) mg, err := manager.NewManager(addr.String(), pm, appchainMgr, rule, loggers.Logger(loggers.Manager))
tool.Asset(err) tool.Asset(err)
apiServer := api.NewServer(config, r, loggers.Logger(loggers.ApiServer)) apiServer := api.NewServer(config, r, loggers.Logger(loggers.ApiServer))
......
...@@ -16,7 +16,6 @@ import ( ...@@ -16,7 +16,6 @@ import (
"gitlab.33.cn/link33/sidecar/internal/txcrypto" "gitlab.33.cn/link33/sidecar/internal/txcrypto"
"gitlab.33.cn/link33/sidecar/model/pb" "gitlab.33.cn/link33/sidecar/model/pb"
"gitlab.33.cn/link33/sidecar/pkg/plugins" "gitlab.33.cn/link33/sidecar/pkg/plugins"
"gitlab.33.cn/link33/sidecar/tool"
) )
type AppChain interface { type AppChain interface {
...@@ -35,7 +34,7 @@ type appChain struct { ...@@ -35,7 +34,7 @@ type appChain struct {
msgCh chan *pb.Message msgCh chan *pb.Message
} }
func NewPort(client plugins.Client, cryptor txcrypto.Cryptor, logger logrus.FieldLogger) (port.Port, error) { func NewPort(client plugins.Client, cryptor txcrypto.Cryptor, logger logrus.FieldLogger) port.Port {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &appChain{ return &appChain{
client: client, client: client,
...@@ -45,14 +44,13 @@ func NewPort(client plugins.Client, cryptor txcrypto.Cryptor, logger logrus.Fiel ...@@ -45,14 +44,13 @@ func NewPort(client plugins.Client, cryptor txcrypto.Cryptor, logger logrus.Fiel
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
msgCh: make(chan *pb.Message, port.Capacity), msgCh: make(chan *pb.Message, port.Capacity),
}, nil }
} }
func NewPorts(clients []plugins.Client, cryptor txcrypto.Cryptor, logger logrus.FieldLogger) []port.Port { func NewPorts(clients []plugins.Client, cryptor txcrypto.Cryptor, logger logrus.FieldLogger) []port.Port {
var ps []port.Port var ps []port.Port
for _, c := range clients { for _, c := range clients {
p, err := NewPort(c, cryptor, logger) p := NewPort(c, cryptor, logger)
tool.Asset(err)
ps = append(ps, p) ps = append(ps, p)
} }
......
package appchain
import (
"context"
"github.com/meshplus/bitxhub-kit/storage"
"github.com/sirupsen/logrus"
"gitlab.33.cn/link33/sidecar/internal/port"
"gitlab.33.cn/link33/sidecar/internal/txcrypto"
"gitlab.33.cn/link33/sidecar/pkg/plugins"
"gitlab.33.cn/link33/sidecar/tool"
)
type Manager struct {
ctx context.Context
g *tool.Group
clients []plugins.Client
store storage.Storage
cryptor txcrypto.Cryptor
logger logrus.FieldLogger
}
func NewManager(store storage.Storage, clients []plugins.Client, cryptor txcrypto.Cryptor, logger logrus.FieldLogger) *Manager {
g, ctx := tool.WithContext(context.Background())
return &Manager{
ctx: ctx,
g: g,
clients: clients,
store: store,
cryptor: cryptor,
logger: logger,
}
}
func (m *Manager) Start() error {
for _, client := range m.clients {
m.g.Go(func() error {
return client.Start()
})
}
return m.g.Wait()
}
func (m *Manager) Stop() error {
for _, client := range m.clients {
m.g.Go(func() error {
return client.Stop()
})
}
return m.g.Wait()
}
func (m *Manager) Ports() []port.Port {
return NewPorts(m.clients, m.cryptor, m.logger)
}
...@@ -94,10 +94,7 @@ func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKe ...@@ -94,10 +94,7 @@ func New(config *repo.Config, router router.Router, nodePrivKey crypto.PrivateKe
} }
func (swarm *Swarm) add(p port.Port) { func (swarm *Swarm) add(p port.Port) {
err := swarm.router.Add(p) swarm.router.Add(p)
if err != nil {
swarm.logger.Error(err)
}
} }
func (swarm *Swarm) Start() error { func (swarm *Swarm) Start() error {
......
...@@ -10,9 +10,9 @@ import ( ...@@ -10,9 +10,9 @@ import (
type Router interface { type Router interface {
internal.Launcher internal.Launcher
Add(p port.Port) error Add(p port.Port)
Adds(p []port.Port) error Adds(p []port.Port)
Route(ibtp *pb.Message) error Route(ibtp *pb.Message) error
......
...@@ -56,7 +56,7 @@ func (r *router) Stop() error { ...@@ -56,7 +56,7 @@ func (r *router) Stop() error {
} }
// TODO // TODO
func (r *router) Add(p port.Port) error { func (r *router) Add(p port.Port) {
r.portMap.Add(p) r.portMap.Add(p)
go func() { go func() {
c := p.ListenIBTPX() c := p.ListenIBTPX()
...@@ -83,14 +83,12 @@ func (r *router) Add(p port.Port) error { ...@@ -83,14 +83,12 @@ func (r *router) Add(p port.Port) error {
} }
} }
}() }()
return nil
} }
func (r *router) Adds(p []port.Port) error { func (r *router) Adds(p []port.Port) {
for _, pt := range p { for _, pt := range p {
r.Add(pt) r.Add(pt)
} }
return nil
} }
func (r *router) Load(key string) (value port.Port, ok bool) { func (r *router) Load(key string) (value port.Port, ok bool) {
......
...@@ -25,7 +25,7 @@ type RuleMgr struct { ...@@ -25,7 +25,7 @@ type RuleMgr struct {
logger logrus.FieldLogger logger logrus.FieldLogger
} }
func New(storage storage.Storage, logger logrus.FieldLogger) (*RuleMgr, error) { func New(storage storage.Storage, logger logrus.FieldLogger) *RuleMgr {
ledger := &CodeLedger{ ledger := &CodeLedger{
storage: storage, storage: storage,
} }
...@@ -36,10 +36,9 @@ func New(storage storage.Storage, logger logrus.FieldLogger) (*RuleMgr, error) { ...@@ -36,10 +36,9 @@ func New(storage storage.Storage, logger logrus.FieldLogger) (*RuleMgr, error) {
Ve: ve, Ve: ve,
logger: logger, logger: logger,
} }
return rm, nil return rm
} }
func (rm *RuleMgr) Validate(address, from string, proof, payload []byte, validators string) (bool, error) { func (rm *RuleMgr) Validate(address, from string, proof, payload []byte, validators string) (bool, error) {
ok, err := rm.Ve.Validate(address, from, proof, payload, validators) return rm.Ve.Validate(address, from, proof, payload, validators)
return ok, err
} }
...@@ -165,6 +165,9 @@ func (g *GRPCClient) Stop() error { ...@@ -165,6 +165,9 @@ func (g *GRPCClient) Stop() error {
if err != nil { if err != nil {
return err return err
} }
if !g.kernel.Exited() {
g.kernel.Kill()
}
g.doneContext.Done() g.doneContext.Done()
return nil return nil
} }
...@@ -332,14 +335,6 @@ func (g *GRPCClient) Type() string { ...@@ -332,14 +335,6 @@ func (g *GRPCClient) Type() string {
return response.Type return response.Type
} }
func (g *GRPCClient) Kill() {
g.kernel.Kill()
}
func (g *GRPCClient) Exited() bool {
return g.kernel.Exited()
}
func (g *GRPCClient) Bind(kern Kernel) { func (g *GRPCClient) Bind(kern Kernel) {
g.kernel = kern g.kernel = kern
} }
......
...@@ -19,7 +19,6 @@ type Launcher interface { ...@@ -19,7 +19,6 @@ type Launcher interface {
//go:generate mockgen -destination mock_client/mock_client.go -package mock_client -source interface.go //go:generate mockgen -destination mock_client/mock_client.go -package mock_client -source interface.go
type Client interface { // 业务实现委托接口。 type Client interface { // 业务实现委托接口。
Launcher Launcher
Kernel
Bind(kern Kernel) Bind(kern Kernel)
// Initialize initialize plugin client // Initialize initialize plugin client
......
...@@ -99,20 +99,19 @@ func CreateClients(appchainConfigs []repo.Appchain, extra []byte) []Client { ...@@ -99,20 +99,19 @@ func CreateClients(appchainConfigs []repo.Appchain, extra []byte) []Client {
// Request the plugin // Request the plugin
raw, err := rpcClient.Dispense(appchainConfig.Plugin) raw, err := rpcClient.Dispense(appchainConfig.Plugin)
tool.Asset(err) tool.Asset(err)
var appchain Client var client Client
switch raw.(type) { switch raw.(type) {
case *GRPCClient: case *GRPCClient:
appchain = raw.(*GRPCClient) client = raw.(*GRPCClient)
default: default:
tool.Asset(fmt.Errorf("unsupported kernel type")) tool.Asset(fmt.Errorf("unsupported kernel type"))
} }
// initialize our kernel plugin // initialize our kernel plugin
err = appchain.Initialize(pluginConfigPath, appchainConfig.DID, extra) err = client.Initialize(pluginConfigPath, appchainConfig.DID, extra)
tool.Asset(err) tool.Asset(err)
appchain.Bind(kernel) client.Bind(kernel)
tool.Asset(appchain.Start()) clients = append(clients, client)
clients = append(clients, appchain)
} }
return clients return clients
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment