以下是Golang实现RabbitMQ中死信队列几种情况的完整攻略,包含两个示例说明。
示例1:消息过期
步骤1:创建死信队列
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
dlxExchange := "dlx_exchange"
dlxQueue := "dlx_queue"
dlxRoutingKey := "dlx_routing_key"
_, err = ch.QueueDeclare(dlxQueue, true, false, false, false, amqp.Table{
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "normal_queue",
"x-message-ttl": 5000,
})
if err != nil {
panic(err)
}
err = ch.ExchangeDeclare(dlxExchange, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ch.QueueBind(dlxQueue, dlxRoutingKey, dlxExchange, false, nil)
if err != nil {
panic(err)
}
fmt.Println("Dead letter queue created")
}
步骤2:发送消息
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
exchange := "normal_exchange"
queue := "normal_queue"
routingKey := "normal_routing_key"
err = ch.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
_, err = ch.QueueDeclare(queue, true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ch.QueueBind(queue, routingKey, exchange, false, nil)
if err != nil {
panic(err)
}
message := "Hello World!"
err = ch.Publish(exchange, routingKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
Expiration: "1000",
})
if err != nil {
panic(err)
}
fmt.Println("Message sent: ", message)
time.Sleep(2 * time.Second)
}
步骤3:接收消息
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
exchange := "normal_exchange"
queue := "normal_queue"
routingKey := "normal_routing_key"
err = ch.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
_, err = ch.QueueDeclare(queue, true, false, false, false, nil)
if err != nil {
panic(err)
}
err = ch.QueueBind(queue, routingKey, exchange, false, nil)
if err != nil {
panic(err)
}
messages, err := ch.Consume(queue, "", false, false, false, false, nil)
if err != nil {
panic(err)
}
for message := range messages {
fmt.Println("Message received: ", string(message.Body))
message.Ack(false)
}
}
步骤4:运行程序
运行程序后,您将看到以下输出:
Dead letter queue created
Message sent: Hello World!
Message received: Hello World!
示例2:消息被拒绝
步骤1:创建死信队列
同示例1。
步骤2:发送消息
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
exchange := "normal_exchange"
queue := "normal_queue"
routingKey := "normal_routing_key"
err = ch.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
_, err = ch.QueueDeclare(queue, true, false, false, false, amqp.Table{
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "dlx_routing_key",
})
if err != nil {
panic(err)
}
err = ch.QueueBind(queue, routingKey, exchange, false, nil)
if err != nil {
panic(err)
}
message := "Hello World!"
err = ch.Publish(exchange, routingKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
})
if err != nil {
panic(err)
}
fmt.Println("Message sent: ", message)
}
步骤3:接收消息
package main
import (
"fmt"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
exchange := "normal_exchange"
queue := "normal_queue"
routingKey := "normal_routing_key"
err = ch.ExchangeDeclare(exchange, "direct", true, false, false, false, nil)
if err != nil {
panic(err)
}
_, err = ch.QueueDeclare(queue, true, false, false, false, amqp.Table{
"x-dead-letter-exchange": "",
"x-dead-letter-routing-key": "dlx_routing_key",
})
if err != nil {
panic(err)
}
err = ch.QueueBind(queue, routingKey, exchange, false, nil)
if err != nil {
panic(err)
}
messages, err := ch.Consume(queue, "", false, false, false, false, nil)
if err != nil {
panic(err)
}
for message := range messages {
fmt.Println("Message received: ", string(message.Body))
message.Nack(false, false)
}
}
步骤4:运行程序
运行程序后,您将看到以下输出:
Message sent: Hello World!
Message received: Hello World!
以上就是Golang实现RabbitMQ中死信队列几种情况的完整攻略,包含两个示例说明。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Golang实现RabbitMQ中死信队列几种情况 - Python技术站