Commit f2fd7f36 authored by suyanlong's avatar suyanlong

split port package file

parent b84f1a6e
Pipeline #7995 canceled with stages
package port
// port 类型:主要是sider peer、plugin、blockchain peer。
type Type int
const (
Hub = "hub" //Hub: 同步数据,同步元数据等。
Sidecar = "sidecar" //SideCar节点
Appchain = "appchain" //区块链客户端
Local = "local"
)
package port
import (
"math/rand"
"sync"
)
//获取唯一hub
//根据类型获取port
//根据ID获取
type PortMap struct {
rw sync.RWMutex
peerPort map[string]Port
appchainPort map[string]Port
hubPort Port
local Port
}
func NewPortMap() *PortMap {
return &PortMap{
rw: sync.RWMutex{},
peerPort: map[string]Port{},
appchainPort: map[string]Port{},
hubPort: nil,
}
}
func (p *PortMap) Adds(pp []Port) {
p.rw.Lock()
p.rw.Unlock()
for _, pt := range pp {
p.add(pt)
}
}
func (p *PortMap) Add(pt Port) {
p.rw.Lock()
p.rw.Unlock()
p.add(pt)
}
func (p *PortMap) add(pt Port) {
switch pt.Type() {
case Hub:
if p.hubPort == nil {
p.hubPort = pt
}
case Local:
if p.local == nil {
p.local = pt
}
case Appchain:
p.appchainPort[pt.ID()] = pt
case Sidecar:
p.peerPort[pt.ID()] = pt
}
}
func (p *PortMap) GetHub() (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.hubPort == nil {
return nil, false
}
return p.hubPort, true
}
func (p *PortMap) GetLocal() (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.local == nil {
return nil, false
}
return p.local, true
}
func (p *PortMap) Port(id string) (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.hubPort.ID() == id {
return p.hubPort, true
}
if p.local.ID() == id {
return p.local, true
}
if pt, is := p.peerPort[id]; is {
return pt, is
}
if pt, is := p.appchainPort[id]; is {
return pt, is
}
return nil, false
}
func (p *PortMap) RouterPortByID(ids []string) []Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var ports []Port
for _, id := range ids {
if pt, is := p.peerPort[id]; is {
ports = append(ports, pt)
}
}
return ports
}
func (p *PortMap) RouterPortByTag(tag string) []Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var ports []Port
for _, pt := range p.peerPort {
if tag == pt.Tag() {
ports = append(ports, pt)
}
}
return ports
}
func (p *PortMap) AllRouterPort() []Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var ports []Port
for _, pt := range p.peerPort {
ports = append(ports, pt)
}
return ports
}
func (p *PortMap) RandRouterPort() Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var randPort Port
l := len(p.peerPort)
i := rand.Intn(l + 1)
j := 0
for _, pt := range p.peerPort {
j++
if i == j {
randPort = pt
}
}
return randPort
}
func (p *PortMap) Remove(pt Port) {
p.rw.Lock()
p.rw.Unlock()
p.remove(pt)
}
func (p *PortMap) remove(pt Port) {
switch pt.Type() {
case Hub:
if p.hubPort.ID() == pt.ID() {
p.hubPort = nil
}
case Local:
if p.local.ID() == pt.ID() {
p.local = nil
}
case Appchain:
delete(p.appchainPort, pt.ID())
case Sidecar:
delete(p.peerPort, pt.ID())
}
}
func (p *PortMap) Removes(ppt []Port) {
p.rw.Lock()
p.rw.Unlock()
for _, pt := range ppt {
p.remove(pt)
}
}
func (p *PortMap) Store(id string, port Port) {
p.Add(port)
}
func (p *PortMap) Load(key string) (value Port, ok bool) {
return p.Port(key)
}
func (p *PortMap) IsExist(id string) bool {
_, is := p.Load(id)
return is
}
package port
type Message interface {
Marshal() ([]byte, error)
Unmarshal([]byte) error
}
...@@ -2,18 +2,6 @@ package port ...@@ -2,18 +2,6 @@ package port
import ( import (
"github.com/link33/sidecar/model/pb" "github.com/link33/sidecar/model/pb"
"math/rand"
"sync"
)
// port 类型:主要是sider peer、plugin、blockchain peer。
type Type int
const (
Hub = "hub" //Hub: 同步数据,同步元数据等。
Sidecar = "sidecar" //SideCar节点
Appchain = "appchain" //区块链客户端
Local = "local"
) )
// 设计一套port管理机制:包括各种的管理模块。 // 设计一套port管理机制:包括各种的管理模块。
...@@ -40,7 +28,7 @@ type Port interface { ...@@ -40,7 +28,7 @@ type Port interface {
Tag() string Tag() string
// Send 同步发送给绑定的对应的port dev // Send 同步发送给绑定的对应的port dev
Send(msg *pb.Message) (*pb.Message, error) //TODO 如何区别IBTPX与Message Send(msg *pb.Message) (*pb.Message, error)
// AsyncSend 异步发送给绑定的对应的port dev // AsyncSend 异步发送给绑定的对应的port dev
AsyncSend(msg *pb.Message) error AsyncSend(msg *pb.Message) error
...@@ -48,191 +36,3 @@ type Port interface { ...@@ -48,191 +36,3 @@ type Port interface {
// ListenIBTPX 从绑定的对应的port dev接收数据 // ListenIBTPX 从绑定的对应的port dev接收数据
ListenIBTPX() <-chan *pb.Message ListenIBTPX() <-chan *pb.Message
} }
type Message interface {
Marshal() ([]byte, error)
Unmarshal([]byte) error
}
//获取唯一hub
//根据类型获取port
//根据ID获取
type PortMap struct {
rw sync.RWMutex
peerPort map[string]Port
appchainPort map[string]Port
hubPort Port
local Port
}
func NewPortMap() *PortMap {
return &PortMap{
rw: sync.RWMutex{},
peerPort: map[string]Port{},
appchainPort: map[string]Port{},
hubPort: nil,
}
}
func (p *PortMap) Adds(pp []Port) {
p.rw.Lock()
p.rw.Unlock()
for _, pt := range pp {
p.add(pt)
}
}
func (p *PortMap) Add(pt Port) {
p.rw.Lock()
p.rw.Unlock()
p.add(pt)
}
func (p *PortMap) add(pt Port) {
switch pt.Type() {
case Hub:
if p.hubPort == nil {
p.hubPort = pt
}
case Local:
if p.local == nil {
p.local = pt
}
case Appchain:
p.appchainPort[pt.ID()] = pt
case Sidecar:
p.peerPort[pt.ID()] = pt
}
}
func (p *PortMap) GetHub() (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.hubPort == nil {
return nil, false
}
return p.hubPort, true
}
func (p *PortMap) GetLocal() (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.local == nil {
return nil, false
}
return p.local, true
}
func (p *PortMap) Port(id string) (Port, bool) {
p.rw.RLocker()
defer p.rw.RUnlock()
if p.hubPort.ID() == id {
return p.hubPort, true
}
if p.local.ID() == id {
return p.local, true
}
if pt, is := p.peerPort[id]; is {
return pt, is
}
if pt, is := p.appchainPort[id]; is {
return pt, is
}
return nil, false
}
func (p *PortMap) RouterPortByID(ids []string) []Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var ports []Port
for _, id := range ids {
if pt, is := p.peerPort[id]; is {
ports = append(ports, pt)
}
}
return ports
}
func (p *PortMap) RouterPortByTag(tag string) []Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var ports []Port
for _, pt := range p.peerPort {
if tag == pt.Tag() {
ports = append(ports, pt)
}
}
return ports
}
func (p *PortMap) AllRouterPort() []Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var ports []Port
for _, pt := range p.peerPort {
ports = append(ports, pt)
}
return ports
}
func (p *PortMap) RandRouterPort() Port {
p.rw.RLocker()
defer p.rw.RUnlock()
var randPort Port
l := len(p.peerPort)
i := rand.Intn(l + 1)
j := 0
for _, pt := range p.peerPort {
j++
if i == j {
randPort = pt
}
}
return randPort
}
func (p *PortMap) Remove(pt Port) {
p.rw.Lock()
p.rw.Unlock()
p.remove(pt)
}
func (p *PortMap) remove(pt Port) {
switch pt.Type() {
case Hub:
if p.hubPort.ID() == pt.ID() {
p.hubPort = nil
}
case Local:
if p.local.ID() == pt.ID() {
p.local = nil
}
case Appchain:
delete(p.appchainPort, pt.ID())
case Sidecar:
delete(p.peerPort, pt.ID())
}
}
func (p *PortMap) Removes(ppt []Port) {
p.rw.Lock()
p.rw.Unlock()
for _, pt := range ppt {
p.remove(pt)
}
}
func (p *PortMap) Store(id string, port Port) {
p.Add(port)
}
func (p *PortMap) Load(key string) (value Port, ok bool) {
return p.Port(key)
}
func (p *PortMap) IsExist(id string) bool {
_, is := p.Load(id)
return is
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment