Commit 2e41497b authored by kingwang's avatar kingwang Committed by 33cn

update chain33

parent 5677c872
...@@ -71,6 +71,10 @@ func (mock *mockClient) Wait(msg *queue.Message) (*queue.Message, error) { ...@@ -71,6 +71,10 @@ func (mock *mockClient) Wait(msg *queue.Message) (*queue.Message, error) {
return mock.c.Wait(msg) return mock.c.Wait(msg)
} }
func (mock *mockClient) Reply(msg *queue.Message) {
mock.c.Reply(msg)
}
func (mock *mockClient) WaitTimeout(msg *queue.Message, timeout time.Duration) (*queue.Message, error) { func (mock *mockClient) WaitTimeout(msg *queue.Message, timeout time.Duration) (*queue.Message, error) {
return mock.c.WaitTimeout(msg, timeout) return mock.c.WaitTimeout(msg, timeout)
} }
......
...@@ -34,6 +34,7 @@ type Client interface { ...@@ -34,6 +34,7 @@ type Client interface {
Wait(msg *Message) (*Message, error) //等待消息处理完成 Wait(msg *Message) (*Message, error) //等待消息处理完成
WaitTimeout(msg *Message, timeout time.Duration) (*Message, error) //等待消息处理完成 WaitTimeout(msg *Message, timeout time.Duration) (*Message, error) //等待消息处理完成
Recv() chan *Message Recv() chan *Message
Reply(msg *Message)
Sub(topic string) //订阅消息 Sub(topic string) //订阅消息
Close() Close()
CloseQueue() (*types.Reply, error) CloseQueue() (*types.Reply, error)
...@@ -101,6 +102,16 @@ func (client *client) NewMessage(topic string, ty int64, data interface{}) (msg ...@@ -101,6 +102,16 @@ func (client *client) NewMessage(topic string, ty int64, data interface{}) (msg
return NewMessage(id, topic, ty, data) return NewMessage(id, topic, ty, data)
} }
func (client *client) Reply(msg *Message) {
if msg.chReply != nil {
msg.Reply(msg)
return
}
if msg.callback != nil {
client.q.callback <- msg
}
}
// WaitTimeout 等待时间 msg 消息 timeout 超时时间 // WaitTimeout 等待时间 msg 消息 timeout 超时时间
func (client *client) WaitTimeout(msg *Message, timeout time.Duration) (*Message, error) { func (client *client) WaitTimeout(msg *Message, timeout time.Duration) (*Message, error) {
if msg.chReply == nil { if msg.chReply == nil {
...@@ -183,10 +194,10 @@ func (client *client) isEnd(data *Message, ok bool) bool { ...@@ -183,10 +194,10 @@ func (client *client) isEnd(data *Message, ok bool) bool {
if !ok { if !ok {
return true return true
} }
if atomic.LoadInt32(&client.isClosed) == 1 { if data.Data == nil && data.ID == 0 && data.Ty == 0 {
return true return true
} }
if data.Data == nil && data.ID == 0 && data.Ty == 0 { if atomic.LoadInt32(&client.isClosed) == 1 {
return true return true
} }
return false return false
......
...@@ -72,6 +72,11 @@ func (_m *Client) Recv() chan *queue.Message { ...@@ -72,6 +72,11 @@ func (_m *Client) Recv() chan *queue.Message {
return r0 return r0
} }
// Reply provides a mock function with given fields: msg
func (_m *Client) Reply(msg *queue.Message) {
_m.Called(msg)
}
// Send provides a mock function with given fields: msg, waitReply // Send provides a mock function with given fields: msg, waitReply
func (_m *Client) Send(msg *queue.Message, waitReply bool) error { func (_m *Client) Send(msg *queue.Message, waitReply bool) error {
ret := _m.Called(msg, waitReply) ret := _m.Called(msg, waitReply)
......
...@@ -66,13 +66,33 @@ type queue struct { ...@@ -66,13 +66,33 @@ type queue struct {
mu sync.Mutex mu sync.Mutex
done chan struct{} done chan struct{}
interrupt chan struct{} interrupt chan struct{}
callback chan *Message
isClose int32 isClose int32
name string name string
} }
// New new queue struct // New new queue struct
func New(name string) Queue { func New(name string) Queue {
q := &queue{chanSubs: make(map[string]*chanSub), name: name, done: make(chan struct{}, 1), interrupt: make(chan struct{}, 1)} q := &queue{
chanSubs: make(map[string]*chanSub),
name: name,
done: make(chan struct{}, 1),
interrupt: make(chan struct{}, 1),
callback: make(chan *Message, 1024),
}
go func() {
for {
select {
case <-q.done:
fmt.Println("closing chain33 callback")
return
case msg := <-q.callback:
if msg.callback != nil {
msg.callback(msg)
}
}
}
}()
return q return q
} }
...@@ -88,7 +108,8 @@ func (q *queue) Start() { ...@@ -88,7 +108,8 @@ func (q *queue) Start() {
// Block until a signal is received. // Block until a signal is received.
select { select {
case <-q.done: case <-q.done:
atomic.StoreInt32(&q.isClose, 1) fmt.Println("closing chain33 done")
//atomic.StoreInt32(&q.isClose, 1)
break break
case <-q.interrupt: case <-q.interrupt:
fmt.Println("closing chain33") fmt.Println("closing chain33")
...@@ -130,7 +151,11 @@ func (q *queue) chanSub(topic string) *chanSub { ...@@ -130,7 +151,11 @@ func (q *queue) chanSub(topic string) *chanSub {
defer q.mu.Unlock() defer q.mu.Unlock()
_, ok := q.chanSubs[topic] _, ok := q.chanSubs[topic]
if !ok { if !ok {
q.chanSubs[topic] = &chanSub{make(chan *Message, defaultChanBuffer), make(chan *Message, defaultLowChanBuffer), 0} q.chanSubs[topic] = &chanSub{
high: make(chan *Message, defaultChanBuffer),
low: make(chan *Message, defaultLowChanBuffer),
isClose: 0,
}
} }
return q.chanSubs[topic] return q.chanSubs[topic]
} }
...@@ -157,6 +182,10 @@ func (q *queue) send(msg *Message, timeout time.Duration) (err error) { ...@@ -157,6 +182,10 @@ func (q *queue) send(msg *Message, timeout time.Duration) (err error) {
if sub.isClose == 1 { if sub.isClose == 1 {
return types.ErrChannelClosed return types.ErrChannelClosed
} }
if timeout == -1 {
sub.high <- msg
return nil
}
defer func() { defer func() {
res := recover() res := recover()
if res != nil { if res != nil {
...@@ -172,10 +201,6 @@ func (q *queue) send(msg *Message, timeout time.Duration) (err error) { ...@@ -172,10 +201,6 @@ func (q *queue) send(msg *Message, timeout time.Duration) (err error) {
return ErrQueueChannelFull return ErrQueueChannelFull
} }
} }
if timeout == -1 {
sub.high <- msg
return nil
}
t := time.NewTimer(timeout) t := time.NewTimer(timeout)
defer t.Stop() defer t.Stop()
select { select {
...@@ -212,13 +237,13 @@ func (q *queue) sendLowTimeout(msg *Message, timeout time.Duration) error { ...@@ -212,13 +237,13 @@ func (q *queue) sendLowTimeout(msg *Message, timeout time.Duration) error {
if sub.isClose == 1 { if sub.isClose == 1 {
return types.ErrChannelClosed return types.ErrChannelClosed
} }
if timeout == 0 {
return q.sendAsyn(msg)
}
if timeout == -1 { if timeout == -1 {
sub.low <- msg sub.low <- msg
return nil return nil
} }
if timeout == 0 {
return q.sendAsyn(msg)
}
t := time.NewTimer(timeout) t := time.NewTimer(timeout)
defer t.Stop() defer t.Stop()
select { select {
...@@ -242,6 +267,7 @@ type Message struct { ...@@ -242,6 +267,7 @@ type Message struct {
ID int64 ID int64
Data interface{} Data interface{}
chReply chan *Message chReply chan *Message
callback func(msg *Message)
} }
// NewMessage new message // NewMessage new message
...@@ -255,6 +281,17 @@ func NewMessage(id int64, topic string, ty int64, data interface{}) (msg *Messag ...@@ -255,6 +281,17 @@ func NewMessage(id int64, topic string, ty int64, data interface{}) (msg *Messag
return msg return msg
} }
// NewMessageCallback reply block
func NewMessageCallback(id int64, topic string, ty int64, data interface{}, callback func(msg *Message)) (msg *Message) {
msg = &Message{}
msg.ID = id
msg.Ty = ty
msg.Data = data
msg.Topic = topic
msg.callback = callback
return msg
}
// GetData get message data // GetData get message data
func (msg *Message) GetData() interface{} { func (msg *Message) GetData() interface{} {
if _, ok := msg.Data.(error); ok { if _, ok := msg.Data.(error); ok {
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
package queue package queue
import ( import (
"fmt"
"testing" "testing"
"time" "time"
...@@ -132,6 +133,7 @@ func TestHighLow(t *testing.T) { ...@@ -132,6 +133,7 @@ func TestHighLow(t *testing.T) {
msg := client.NewMessage("mempool", types.EventTx, "hello") msg := client.NewMessage("mempool", types.EventTx, "hello")
err := client.SendTimeout(msg, false, 0) err := client.SendTimeout(msg, false, 0)
if err != nil { if err != nil {
fmt.Println(err)
break break
} }
} }
...@@ -211,24 +213,50 @@ func TestPrintMessage(t *testing.T) { ...@@ -211,24 +213,50 @@ func TestPrintMessage(t *testing.T) {
t.Log(msg) t.Log(msg)
} }
func TestChanSubCallback(t *testing.T) {
q := New("channel")
client := q.Client()
client.Sub("hello")
done := make(chan struct{}, 1025)
go func() {
for i := 0; i < 1025; i++ {
sub := q.(*queue).chanSub("hello")
msg := NewMessageCallback(1, "", 0, nil, func(msg *Message) {
done <- struct{}{}
})
sub.high <- msg
}
}()
for i := 0; i < 1025; i++ {
msg := <-client.Recv()
client.Reply(msg)
}
for i := 0; i < 1025; i++ {
<-done
}
}
func BenchmarkSendMessage(b *testing.B) { func BenchmarkSendMessage(b *testing.B) {
q := New("channel") q := New("channel")
//mempool //mempool
b.ReportAllocs()
go func() { go func() {
client := q.Client() client := q.Client()
client.Sub("mempool") client.Sub("mempool")
defer client.Close() defer client.Close()
for msg := range client.Recv() { for msg := range client.Recv() {
go func(msg *Message) {
if msg.Ty == types.EventTx { if msg.Ty == types.EventTx {
msg.Reply(client.NewMessage("mempool", types.EventReply, types.Reply{IsOk: true, Msg: []byte("word")})) msg.Reply(client.NewMessage("mempool", types.EventReply, types.Reply{IsOk: true, Msg: []byte("word")}))
} }
}(msg)
} }
}() }()
go q.Start() go q.Start()
client := q.Client() client := q.Client()
//high 优先级 //high 优先级
for i := 0; i < b.N; i++ {
msg := client.NewMessage("mempool", types.EventTx, "hello") msg := client.NewMessage("mempool", types.EventTx, "hello")
for i := 0; i < b.N; i++ {
err := client.Send(msg, true) err := client.Send(msg, true)
if err != nil { if err != nil {
b.Error(err) b.Error(err)
...@@ -277,3 +305,121 @@ func BenchmarkIntChan(b *testing.B) { ...@@ -277,3 +305,121 @@ func BenchmarkIntChan(b *testing.B) {
ch <- 1 ch <- 1
} }
} }
func BenchmarkChanSub(b *testing.B) {
q := New("channel")
done := make(chan struct{})
go func() {
for i := 0; i < b.N; i++ {
q.(*queue).chanSub("hello")
}
done <- struct{}{}
}()
for i := 0; i < b.N; i++ {
q.(*queue).chanSub("hello")
}
<-done
}
func BenchmarkChanSub2(b *testing.B) {
q := New("channel")
client := q.Client()
client.Sub("hello")
go func() {
for i := 0; i < b.N; i++ {
sub := q.(*queue).chanSub("hello")
msg := NewMessage(1, "", 0, nil)
sub.high <- msg
_, err := client.Wait(msg)
if err != nil {
b.Fatal(err)
}
}
}()
for i := 0; i < b.N; i++ {
msg := <-client.Recv()
msg.Reply(msg)
}
}
func BenchmarkChanSub3(b *testing.B) {
q := New("channel")
client := q.Client()
client.Sub("hello")
go func() {
for i := 0; i < b.N; i++ {
sub := q.(*queue).chanSub("hello")
msg := NewMessage(1, "", 0, nil)
sub.high <- msg
}
}()
for i := 0; i < b.N; i++ {
msg := <-client.Recv()
msg.Reply(msg)
}
}
func BenchmarkChanSub4(b *testing.B) {
q := New("channel")
client := q.Client()
client.Sub("hello")
go func() {
for i := 0; i < b.N; i++ {
sub := q.(*queue).chanSub("hello")
msg := &Message{ID: 1}
sub.high <- msg
}
}()
for i := 0; i < b.N; i++ {
<-client.Recv()
}
}
func BenchmarkChanSubCallback(b *testing.B) {
q := New("channel")
client := q.Client()
client.Sub("hello")
done := make(chan struct{}, 1024)
go func() {
for i := 0; i < b.N; i++ {
sub := q.(*queue).chanSub("hello")
msg := NewMessageCallback(1, "", 0, nil, func(msg *Message) {
done <- struct{}{}
})
sub.high <- msg
}
}()
go func() {
for i := 0; i < b.N; i++ {
msg := <-client.Recv()
client.Reply(msg)
}
}()
for i := 0; i < b.N; i++ {
<-done
}
}
func BenchmarkChanSubCallback2(b *testing.B) {
q := New("channel")
client := q.Client()
client.Sub("hello")
go func() {
for i := 0; i < b.N; i++ {
sub := q.(*queue).chanSub("hello")
done := make(chan struct{}, 1)
msg := NewMessageCallback(1, "", 0, nil, func(msg *Message) {
done <- struct{}{}
})
sub.high <- msg
<-done
}
}()
for i := 0; i < b.N; i++ {
msg := <-client.Recv()
client.Reply(msg)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment