Commit 5d98ed5a authored by shajiaiming's avatar shajiaiming

import amqp

parent d04eb576
...@@ -3,17 +3,56 @@ module slg ...@@ -3,17 +3,56 @@ module slg
go 1.16 go 1.16
require ( require (
github.com/astaxie/beego v1.12.3 github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Knetic/govaluate v3.0.0+incompatible // indirect
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6 // indirect
github.com/alicebob/miniredis v2.5.0+incompatible // indirect
github.com/astaxie/beego v1.10.1
github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd // indirect
github.com/beego/x2j v0.0.0-20131220205130-a0352aadc542 // indirect
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737 // indirect
github.com/casbin/casbin v1.7.0 // indirect
github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect
github.com/couchbase/go-couchbase v0.0.0-20200519150804-63f3cdb75e0d // indirect
github.com/couchbase/gomemcached v0.0.0-20200526233749-ec430f949808 // indirect
github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a // indirect
github.com/cupcake/rdb v0.0.0-20161107195141-43ba34106c76 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712 // indirect
github.com/elastic/go-elasticsearch/v6 v6.8.5 // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect
github.com/gin-gonic/gin v1.7.7 github.com/gin-gonic/gin v1.7.7
github.com/glendc/gopher-json v0.0.0-20170414221815-dc4743023d0c // indirect
github.com/go-ini/ini v1.66.4 github.com/go-ini/ini v1.66.4
github.com/go-redis/redis v6.14.2+incompatible // indirect
github.com/go-sql-driver/mysql v1.6.0 github.com/go-sql-driver/mysql v1.6.0
github.com/gomodule/redigo v2.0.0+incompatible github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/gomodule/redigo v1.8.8
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/jinzhu/gorm v1.9.16 github.com/jinzhu/gorm v1.9.16
github.com/koding/logging v0.0.0-20160720134017-8b5a689ed69b // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/peterh/liner v1.0.1-0.20171122030339-3681c2a91233 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.7.0 // indirect
github.com/robfig/cron v1.2.0 github.com/robfig/cron v1.2.0
github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect
github.com/siddontang/go v0.0.0-20170517070808-cb568a3e5cc0 // indirect
github.com/siddontang/goredis v0.0.0-20150324035039-760763f78400 // indirect
github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d // indirect
github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec // indirect
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/testify v1.7.0 // indirect github.com/stretchr/testify v1.7.0 // indirect
github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c // indirect
github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b // indirect
github.com/xinliangnote/go-util v0.0.0-20210703052933-7f9f6d961276 github.com/xinliangnote/go-util v0.0.0-20210703052933-7f9f6d961276
github.com/yuin/gopher-lua v0.0.0-20171031051903-609c9cd26973 // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0 gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 // indirect
) )
...@@ -8,6 +8,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF ...@@ -8,6 +8,7 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= github.com/alicebob/gopher-json v0.0.0-20180125190556-5a6b3ba71ee6/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk= github.com/alicebob/miniredis v2.5.0+incompatible/go.mod h1:8HZjEj4yU0dwhYHky+DxYx+6BMjkBbe5ONFIF1MXffk=
github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y= github.com/andybalholm/cascadia v1.1.0/go.mod h1:GsXiBklL0woXo1j/WYWtSYYC4ouU9PqHO0sqidkEA4Y=
github.com/astaxie/beego v1.10.1/go.mod h1:0R4++1tUqERR0WYFWdfkcrsyoVBCG4DgpDGokT3yb+U=
github.com/astaxie/beego v1.12.3 h1:SAQkdD2ePye+v8Gn1r4X6IKZM1wd28EyUOVQ3PDSOOQ= github.com/astaxie/beego v1.12.3 h1:SAQkdD2ePye+v8Gn1r4X6IKZM1wd28EyUOVQ3PDSOOQ=
github.com/astaxie/beego v1.12.3/go.mod h1:p3qIm0Ryx7zeBHLljmd7omloyca1s4yu1a8kM1FkpIA= github.com/astaxie/beego v1.12.3/go.mod h1:p3qIm0Ryx7zeBHLljmd7omloyca1s4yu1a8kM1FkpIA=
github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ= github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ=
...@@ -76,6 +77,8 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 ...@@ -76,6 +77,8 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E=
github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
...@@ -96,6 +99,8 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u ...@@ -96,6 +99,8 @@ github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/koding/logging v0.0.0-20160720134017-8b5a689ed69b h1:Ix1hwcOtW6e0KG1+Fn1blMih1O4td/fa9Q2Br0/zPBo=
github.com/koding/logging v0.0.0-20160720134017-8b5a689ed69b/go.mod h1:km9Clt+22fAbEvoPJSRufXDN110ZA6xLNU7oe4dwRHk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
...@@ -155,6 +160,8 @@ github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z ...@@ -155,6 +160,8 @@ github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE= github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
......
...@@ -4,6 +4,7 @@ import ( ...@@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"net/http" "net/http"
"slg/pkg/rabbitmq"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"slg/models" "slg/models"
...@@ -20,6 +21,7 @@ func init() { ...@@ -20,6 +21,7 @@ func init() {
gredis.Setup() gredis.Setup()
util.Setup() util.Setup()
cron.Setup() cron.Setup()
rabbitmq.Setup()
} }
func main() { func main() {
......
package models
import (
"slg/pkg/setting"
)
type Bank struct {
id uint32 `gorm:"type:int(11);primary key"`
BankName string `gorm:"type:varchar(64);not null"`
}
func (b Bank) TableName() string {
return setting.DatabaseSetting.Name + ".bank"
}
func AddBank(data map[string]interface{}) (error) {
bank := Bank{
BankName: data["bankname"].(string),
}
if err := db.Create(&bank).Error; err != nil {
return err
}
return nil
}
\ No newline at end of file
package rabbitmq
import "github.com/streadway/amqp"
type Consumer struct {
// Base struct for Producer
*RabbitMQ
// The communication channel over connection
channel *amqp.Channel
// All deliveries from server will send to this channel
deliveries <-chan amqp.Delivery
// This handler will be called when a
handler func(amqp.Delivery)
// A notifiyng channel for publishings
// will be used for sync. between close channel and consume handler
done chan error
// Current producer connection settings
session Session
}
func (c *Consumer) Deliveries() <-chan amqp.Delivery {
return c.deliveries
}
// NewConsumer is a constructor for consumer creation
// Accepts Exchange, Queue, BindingOptions and ConsumerOptions
func (r *RabbitMQ) NewConsumer(e Exchange, q Queue, bo BindingOptions, co ConsumerOptions) (*Consumer, error) {
rmq, err := r.Connect()
if err != nil {
return nil, err
}
// getting a channel
channel, err := r.conn.Channel()
if err != nil {
return nil, err
}
c := &Consumer{
RabbitMQ: rmq,
channel: channel,
done: make(chan error),
session: Session{
Exchange: e,
Queue: q,
ConsumerOptions: co,
BindingOptions: bo,
},
}
err = c.connect()
if err != nil {
return nil, err
}
return c, nil
}
// connect internally declares the exchanges and queues
func (c *Consumer) connect() error {
e := c.session.Exchange
q := c.session.Queue
bo := c.session.BindingOptions
var err error
// declaring Exchange
if err = c.channel.ExchangeDeclare(
e.Name, // name of the exchange
e.Type, // type
e.Durable, // durable
e.AutoDelete, // delete when complete
e.Internal, // internal
e.NoWait, // noWait
e.Args, // arguments
); err != nil {
return err
}
// declaring Queue
_, err = c.channel.QueueDeclare(
q.Name, // name of the queue
q.Durable, // durable
q.AutoDelete, // delete when usused
q.Exclusive, // exclusive
q.NoWait, // noWait
q.Args, // arguments
)
if err != nil {
return err
}
// binding Exchange to Queue
if err = c.channel.QueueBind(
// bind to real queue
q.Name, // name of the queue
bo.RoutingKey, // bindingKey
e.Name, // sourceExchange
bo.NoWait, // noWait
bo.Args, // arguments
); err != nil {
return err
}
return nil
}
// Consume accepts a handler function for every message streamed from RabbitMq
// will be called within this handler func
func (c *Consumer) Consume(handler func(delivery amqp.Delivery)) error {
co := c.session.ConsumerOptions
q := c.session.Queue
// Exchange bound to Queue, starting Consume
deliveries, err := c.channel.Consume(
// consume from real queue
q.Name, // name
co.Tag, // consumerTag,
co.AutoAck, // autoAck
co.Exclusive, // exclusive
co.NoLocal, // noLocal
co.NoWait, // noWait
co.Args, // arguments
)
if err != nil {
return err
}
// should we stop streaming, in order not to consume from server?
c.deliveries = deliveries
c.handler = handler
c.RabbitMQ.log.Info("handle: deliveries channel starting")
// handle all consumer errors, if required re-connect
// there are problems with reconnection logic for now
for delivery := range c.deliveries {
handler(delivery)
}
c.RabbitMQ.log.Info("handle: deliveries channel closed")
c.done <- nil
return nil
}
// QOS controls how many messages the server will try to keep on the network for
// consumers before receiving delivery acks. The intent of Qos is to make sure
// the network buffers stay full between the server and client.
func (c *Consumer) QOS(messageCount int) error {
return c.channel.Qos(messageCount, 0, false)
}
// ConsumeMessage accepts a handler function and only consumes one message
// stream from RabbitMq
func (c *Consumer) Get(handler func(delivery amqp.Delivery)) error {
co := c.session.ConsumerOptions
q := c.session.Queue
message, ok, err := c.channel.Get(q.Name, co.AutoAck)
if err != nil {
return err
}
c.handler = handler
if ok {
c.RabbitMQ.log.Debug("Message received")
handler(message)
} else {
c.RabbitMQ.log.Debug("No message received")
}
// TODO maybe we should return ok too?
return nil
}
// Shutdown gracefully closes all connections and waits
// for handler to finish its messages
func (c *Consumer) Shutdown() error {
co := c.session.ConsumerOptions
if err := shutdownChannel(c.channel, co.Tag); err != nil {
return err
}
defer c.RabbitMQ.log.Info("Consumer shutdown OK")
c.RabbitMQ.log.Info("Waiting for Consumer handler to exit")
// if we have not called the Consume yet, we can return here
if c.deliveries == nil {
close(c.done)
}
// this channel is here for finishing the consumer's ranges of
// delivery chans. We need every delivery to be processed, here make
// sure to wait for all consumers goroutines to finish before exiting our
// process.
return <-c.done
}
package rabbitmq
import "github.com/streadway/amqp"
type Producer struct {
// Base struct for Producer
*RabbitMQ
// The communication channel over connection
channel *amqp.Channel
// A notifiyng channel for publishings
done chan error
// Current producer connection settings
session Session
}
type PublishingOptions struct {
// The key that when publishing a message to a exchange/queue will be only delivered to
// given routing key listeners
RoutingKey string
// Publishing tag
Tag string
// Queue should be on the server/broker
Mandatory bool
// Consumer should be bound to server
Immediate bool
}
// NewProducer is a constructor function for producer creation Accepts Exchange,
// Queue, PublishingOptions. On the other hand we are not declaring our topology
// on both the publisher and consumer to be able to change the settings only in
// one place. We can declare those settings on both place to ensure they are
// same. But this package will not support it.
func (r *RabbitMQ) NewProducer(e Exchange, q Queue, po PublishingOptions) (*Producer, error) {
rmq, err := r.Connect()
if err != nil {
return nil, err
}
// getting a channel
channel, err := r.conn.Channel()
if err != nil {
return nil, err
}
return &Producer{
RabbitMQ: rmq,
channel: channel,
session: Session{
Exchange: e,
Queue: q,
PublishingOptions: po,
},
}, nil
}
// Publish sends a Publishing from the client to an exchange on the server.
func (p *Producer) Publish(publishing amqp.Publishing) error {
e := p.session.Exchange
q := p.session.Queue
po := p.session.PublishingOptions
routingKey := po.RoutingKey
// if exchange name is empty, this means we are gonna publish
// this mesage to a queue, every queue has a binding to default exchange
if e.Name == "" {
routingKey = q.Name
}
err := p.channel.Publish(
e.Name, // publish to an exchange(it can be default exchange)
routingKey, // routing to 0 or more queues
po.Mandatory, // mandatory, if no queue than err
po.Immediate, // immediate, if no consumer than err
publishing,
// amqp.Publishing {
// // Application or exchange specific fields,
// // the headers exchange will inspect this field.
// Headers Table
// // Properties
// ContentType string // MIME content type
// ContentEncoding string // MIME content encoding
// DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
// Priority uint8 // 0 to 9
// CorrelationId string // correlation identifier
// ReplyTo string // address to to reply to (ex: RPC)
// Expiration string // message expiration spec
// MessageId string // message identifier
// Timestamp time.Time // message timestamp
// Type string // message type name
// UserId string // creating user id - ex: "guest"
// AppId string // creating application id
// // The application specific payload of the message
// Body []byte
// }
)
return err
}
// NotifyReturn captures a message when a Publishing is unable to be
// delivered either due to the `mandatory` flag set
// and no route found, or `immediate` flag set and no free consumer.
func (p *Producer) NotifyReturn(notifier func(message amqp.Return)) {
go func() {
for res := range p.channel.NotifyReturn(make(chan amqp.Return)) {
notifier(res)
}
}()
}
// Shutdown gracefully closes all connections
func (p *Producer) Shutdown() error {
co := p.session.ConsumerOptions
if err := shutdownChannel(p.channel, co.Tag); err != nil {
return err
}
// Since publishing is asynchronous this can happen
// instantly without waiting for a done message.
defer p.RabbitMQ.log.Info("Producer shutdown OK")
return nil
}
package rabbitmq
import (
"errors"
"fmt"
"github.com/koding/logging"
"github.com/streadway/amqp"
"os"
"os/signal"
"strings"
"syscall"
)
type Config struct {
Host string
Port int
Username string
Password string
Vhost string
}
var AMQP *RabbitMQ
func Setup() error {
AMQP = &RabbitMQ{
config:&Config{
Host: "139.9.150.174",
Port: 5672,
Username: "guest",
Password: "guest",
Vhost: "/",
},
log: logging.NewLogger("producer"),
}
return nil
}
type RabbitMQ struct {
// The connection between client and the server
conn *amqp.Connection
// config stores the current koding configuration based on the given profile
config *Config
// logger interface
log logging.Logger
}
type Exchange struct {
// Exchange name
Name string
// Exchange type
Type string
// Durable exchanges will survive server restarts
Durable bool
// Will remain declared when there are no remaining bindings.
AutoDelete bool
// Exchanges declared as `internal` do not accept accept publishings.Internal
// exchanges are useful for when you wish to implement inter-exchange topologies
// that should not be exposed to users of the broker.
Internal bool
// When noWait is true, declare without waiting for a confirmation from the server.
NoWait bool
// amqp.Table of arguments that are specific to the server's implementation of
// the exchange can be sent for exchange types that require extra parameters.
Args amqp.Table
}
type Queue struct {
// The queue name may be empty, in which the server will generate a unique name
// which will be returned in the Name field of Queue struct.
Name string
// Check Exchange comments for durable
Durable bool
// Check Exchange comments for autodelete
AutoDelete bool
// Exclusive queues are only accessible by the connection that declares them and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting declare, bind, consume, purge or delete a
// queue with the same name.
Exclusive bool
// When noWait is true, the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
NoWait bool
// Check Exchange comments for Args
Args amqp.Table
}
type ConsumerOptions struct {
// The consumer is identified by a string that is unique and scoped for all
// consumers on this channel.
Tag string
// When autoAck (also known as noAck) is true, the server will acknowledge
// deliveries to this consumer prior to writing the delivery to the network. When
// autoAck is true, the consumer should not call Delivery.Ack
AutoAck bool // autoAck
// Check Queue struct documentation
Exclusive bool // exclusive
// When noLocal is true, the server will not deliver publishing sent from the same
// connection to this consumer. (Do not use Publish and Consume from same channel)
NoLocal bool // noLocal
// Check Queue struct documentation
NoWait bool // noWait
// Check Exchange comments for Args
Args amqp.Table // arguments
}
type BindingOptions struct {
// Publishings messages to given Queue with matching -RoutingKey-
// Every Queue has a default binding to Default Exchange with their Qeueu name
// So you can send messages to a queue over default exchange
RoutingKey string
// Do not wait for a consumer
NoWait bool
// App specific data
Args amqp.Table
}
// Returns RMQ connection
func (r *RabbitMQ) Conn() *amqp.Connection {
return r.conn
}
// Dial dials the RMQ server
func (r *RabbitMQ) Dial() error {
// if config is nil do not continue
if r.config == nil {
return errors.New("config is nil")
}
conf := amqp.URI{
Scheme: "amqp",
Host: r.config.Host,
Port: r.config.Port,
Username: r.config.Username,
Password: r.config.Password,
Vhost: r.config.Vhost,
}.String()
var err error
// Connects opens an AMQP connection from the credentials in the URL.
r.conn, err = amqp.Dial(conf)
if err != nil {
return err
}
r.handleErrors(r.conn)
return nil
}
// Connect opens a connection to RabbitMq. This function is idempotent
//
// TODO this should not return RabbitMQ struct - cihangir,arslan config changes
func (r *RabbitMQ) Connect() (*RabbitMQ, error) {
// if we alredy connected do not re-connect
if r.conn != nil {
return r, nil
}
// r.Dial sets the conn variable
if err := r.Dial(); err != nil {
return nil, err
}
return r, nil
}
// Session is holding the current Exchange, Queue,
// Binding Consuming and Publishing settings for enclosed
// rabbitmq connection
type Session struct {
// Exchange declaration settings
Exchange Exchange
// Queue declaration settings
Queue Queue
// Binding options for current exchange to queue binding
BindingOptions BindingOptions
// Consumer options for a queue or exchange
ConsumerOptions ConsumerOptions
// Publishing options for a queue or exchange
PublishingOptions PublishingOptions
}
// NotifyClose registers a listener for close events either initiated by an error
// accompaning a connection.close method or by a normal shutdown.
// On normal shutdowns, the chan will be closed.
// To reconnect after a transport or protocol error, we should register a listener here and
// re-connect to server
// Reconnection is -not- working by now
func (r *RabbitMQ) handleErrors(conn *amqp.Connection) {
go func() {
for amqpErr := range conn.NotifyClose(make(chan *amqp.Error)) {
// if the computer sleeps then wakes longer than a heartbeat interval,
// the connection will be closed by the client.
// https://github.com/streadway/amqp/issues/82
r.log.Fatal(amqpErr.Error())
if strings.Contains(amqpErr.Error(), "NOT_FOUND") {
// do not continue
}
// CRITICAL Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
// CRITICAL Exception (501) Reason: "read tcp 127.0.0.1:5672: i/o timeout"
// CRITICAL Exception (503) Reason: "COMMAND_INVALID - unimplemented method"
if amqpErr.Code == 501 {
// reconnect
}
if amqpErr.Code == 320 {
// fmt.Println("tryin to reconnect")
// c.reconnect()
}
}
}()
go func() {
for b := range conn.NotifyBlocked(make(chan amqp.Blocking)) {
if b.Active {
r.log.Info("TCP blocked: %q", b.Reason)
} else {
r.log.Info("TCP unblocked")
}
}
}()
}
// reconnect re-connects to rabbitmq after a disconnection
func (c *Consumer) reconnect() {
err := c.Shutdown()
if err != nil {
panic(err)
}
err = c.connect()
if err != nil {
panic(err)
}
c.Consume(c.handler)
}
// Shutdown closes the RabbitMQ connection
func (r *RabbitMQ) Shutdown() error {
return shutdown(r.conn)
}
// RegisterSignalHandler watchs for interrupt signals
// and gracefully closes connection
func (r *RabbitMQ) RegisterSignalHandler() {
registerSignalHandler(r)
}
// Closer interface is for handling reconnection logic in a sane way
// Every reconnection supported struct should implement those methods
// in order to work properly
type Closer interface {
RegisterSignalHandler()
Shutdown() error
}
// shutdown is a general closer function for handling close gracefully
// Mostly here for both consumers and producers
// After a reconnection scenerio we are gonna call shutdown before connection
func shutdown(conn *amqp.Connection) error {
if err := conn.Close(); err != nil {
if amqpError, isAmqpError := err.(*amqp.Error); isAmqpError && amqpError.Code != 504 {
return fmt.Errorf("AMQP connection close error: %s", err)
}
}
return nil
}
// shutdownChannel is a general closer function for channels
func shutdownChannel(channel *amqp.Channel, tag string) error {
// This waits for a server acknowledgment which means the sockets will have
// flushed all outbound publishings prior to returning. It's important to
// block on Close to not lose any publishings.
if err := channel.Cancel(tag, true); err != nil {
if amqpError, isAmqpError := err.(*amqp.Error); isAmqpError && amqpError.Code != 504 {
return fmt.Errorf("AMQP connection close error: %s", err)
}
}
if err := channel.Close(); err != nil {
return err
}
return nil
}
// registerSignalHandler helper function for stopping consumer or producer from
// operating further
// Watchs for SIGINT, SIGTERM, SIGQUIT, SIGSTOP and closes connection
func registerSignalHandler(c Closer) {
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals)
for {
signal := <-signals
switch signal {
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
err := c.Shutdown()
if err != nil {
panic(err)
}
os.Exit(1)
}
}
}()
}
package h5
import (
"github.com/gin-gonic/gin"
"slg/pkg/errno"
"slg/pkg/handler"
"slg/service/bank_service"
"slg/validate_service"
"strings"
)
func GetBank(c *gin.Context) {
handler.SendResponse(c, nil, nil)
}
func AddBank(c *gin.Context) {
bank := validate_service.Bank{}
c.ShouldBindJSON(&bank)
if ok, errors := validate_service.ValidateInputs(bank); !ok {
for _, err := range errors {
handler.SendResponse(c, errno.ErrBind, strings.Join(err, " "))
return
}
}
bank_service := bank_service.Bank{
BankName: bank.BankName,
}
if err := bank_service.Add(); err != nil {
handler.SendResponse(c, errno.ErrAddBank, nil)
return
}
handler.SendResponse(c, nil, nil)
}
func UpdateBank(c *gin.Context) {
handler.SendResponse(c, nil, nil)
}
func DeleteBank(c *gin.Context) {
handler.SendResponse(c, nil, nil)
}
\ No newline at end of file
package bank_service
import "slg/models"
type Bank struct {
BankName string
}
func (b *Bank) Add() error {
bank := map[string]interface{}{
"bankname": b.BankName,
}
if err := models.AddBank(bank); err != nil {
return err
}
return nil
}
\ No newline at end of file
package rabbitmq_service
import (
"fmt"
"github.com/streadway/amqp"
"slg/pkg/rabbitmq"
)
type AmqpConsumer struct {
ExchangeName string
QueueName string
RoutingKey string
Tag string
}
func (a AmqpConsumer) Consumer() {
exchange := rabbitmq.Exchange{
Name: a.ExchangeName,
Type: "fanout",
Durable: true,
}
queue := rabbitmq.Queue{
Name: a.QueueName,
Durable: true,
}
binding := rabbitmq.BindingOptions{
RoutingKey: a.RoutingKey,
}
consumerOptions := rabbitmq.ConsumerOptions{
Tag: a.Tag,
}
consumer, err := rabbitmq.AMQP.NewConsumer(exchange, queue, binding, consumerOptions)
if err != nil {
return
}
defer consumer.Shutdown()
err = consumer.QOS(3)
if err != nil {
panic(err)
}
consumer.RegisterSignalHandler()
consumer.Consume(handler)
}
var handler = func(delivery amqp.Delivery) {
message := string(delivery.Body)
fmt.Println("消费------------------------:", message)
delivery.Ack(true)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment