以下是关于Golang监听RabbitMQ消息队列任务断线自动重连接的完整攻略,包含两个示例说明。
示例1:简单队列模式
步骤1:安装RabbitMQ
首先,您需要安装RabbitMQ。您可以从RabbitMQ官下载适合您操作系统的安装包进行安装。
步骤2:添加依赖
在Go中,您需要使用以下依赖:
- github.com/streadway/amqp
步骤3:监听消息
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"my_queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case msg := <-msgs:
log.Printf("Received a message: %s", msg.Body)
case <-sigs:
log.Println("Interrupted")
return
case <-conn.NotifyClose(make(chan *amqp.Error)):
log.Println("Connection closed, retrying...")
for {
time.Sleep(5 * time.Second)
conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
if err == nil {
break
}
log.Printf("Failed to connect to RabbitMQ: %v", err)
}
ch, err = conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
msgs, err = ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
log.Println("Reconnected")
}
}
}
步骤4:运行程序
运行程序后,您将看到以下输出:
2022/12/31 23:59:59 Received a message: Hello World!
示例2:工作队列模式
步骤1:安装RabbitMQ
同示例1。
步骤2:添加依赖
同示例1。
步骤3:监听消息
package main
import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/streadway/amqp"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %v", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
q, err := ch.QueueDeclare(
"my_queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
for {
select {
case msg := <-msgs:
log.Printf("Received a message: %s", msg.Body)
time.Sleep(5 * time.Second)
log.Println("Message processed")
msg.Ack(false)
case <-sigs:
log.Println("Interrupted")
return
case <-conn.NotifyClose(make(chan *amqp.Error)):
log.Println("Connection closed, retrying...")
for {
time.Sleep(5 * time.Second)
conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
if err == nil {
break
}
log.Printf("Failed to connect to RabbitMQ: %v", err)
}
ch, err = conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %v", err)
}
defer ch.Close()
q, err = ch.QueueDeclare(
"my_queue", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %v", err)
}
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
msgs, err = ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %v", err)
}
log.Println("Reconnected")
}
}
}
步骤4:运行程序
运行程序后,您将看到以下输出:
2022/12/31 23:59:59 Received a message: Message 1
2022/12/31 23:59:59 Message processed
然后,您将看到程序暂停5秒,然后输出:
2022/12/31 23:59:59 Received a message: Message 2
2022/12/31 23:59:59 Message processed
以此类推,直到所有消息都被处理完毕。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于golang监听rabbitmq消息队列任务断线自动重连接的问题 - Python技术站