关于golang监听rabbitmq消息队列任务断线自动重连接的问题

yizhihongxing

以下是关于Golang监听RabbitMQ消息队列任务断线自动重连接的完整攻略,包含两个示例说明。

示例1:简单队列模式

步骤1:安装RabbitMQ

首先,您需要安装RabbitMQ。您可以从RabbitMQ官下载适合您操作系统的安装包进行安装。

步骤2:添加依赖

在Go中,您需要使用以下依赖:

  • github.com/streadway/amqp

步骤3:监听消息

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "my_queue", // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatalf("Failed to register a consumer: %v", err)
    }

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    for {
        select {
        case msg := <-msgs:
            log.Printf("Received a message: %s", msg.Body)
        case <-sigs:
            log.Println("Interrupted")
            return
        case <-conn.NotifyClose(make(chan *amqp.Error)):
            log.Println("Connection closed, retrying...")
            for {
                time.Sleep(5 * time.Second)
                conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
                if err == nil {
                    break
                }
                log.Printf("Failed to connect to RabbitMQ: %v", err)
            }
            ch, err = conn.Channel()
            if err != nil {
                log.Fatalf("Failed to open a channel: %v", err)
            }
            defer ch.Close()
            msgs, err = ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
            )
            if err != nil {
                log.Fatalf("Failed to register a consumer: %v", err)
            }
            log.Println("Reconnected")
        }
    }
}

步骤4:运行程序

运行程序后,您将看到以下输出:

2022/12/31 23:59:59 Received a message: Hello World!

示例2:工作队列模式

步骤1:安装RabbitMQ

同示例1。

步骤2:添加依赖

同示例1。

步骤3:监听消息

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/streadway/amqp"
)

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        log.Fatalf("Failed to connect to RabbitMQ: %v", err)
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        log.Fatalf("Failed to open a channel: %v", err)
    }
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "my_queue", // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    if err != nil {
        log.Fatalf("Failed to declare a queue: %v", err)
    }

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    if err != nil {
        log.Fatalf("Failed to set QoS: %v", err)
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatalf("Failed to register a consumer: %v", err)
    }

    sigs := make(chan os.Signal, 1)
    signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

    for {
        select {
        case msg := <-msgs:
            log.Printf("Received a message: %s", msg.Body)
            time.Sleep(5 * time.Second)
            log.Println("Message processed")
            msg.Ack(false)
        case <-sigs:
            log.Println("Interrupted")
            return
        case <-conn.NotifyClose(make(chan *amqp.Error)):
            log.Println("Connection closed, retrying...")
            for {
                time.Sleep(5 * time.Second)
                conn, err = amqp.Dial("amqp://guest:guest@localhost:5672/")
                if err == nil {
                    break
                }
                log.Printf("Failed to connect to RabbitMQ: %v", err)
            }
            ch, err = conn.Channel()
            if err != nil {
                log.Fatalf("Failed to open a channel: %v", err)
            }
            defer ch.Close()
            q, err = ch.QueueDeclare(
                "my_queue", // name
                false,     // durable
                false,     // delete when unused
                false,     // exclusive
                false,     // no-wait
                nil,       // arguments
            )
            if err != nil {
                log.Fatalf("Failed to declare a queue: %v", err)
            }
            err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
            )
            if err != nil {
                log.Fatalf("Failed to set QoS: %v", err)
            }
            msgs, err = ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
            )
            if err != nil {
                log.Fatalf("Failed to register a consumer: %v", err)
            }
            log.Println("Reconnected")
        }
    }
}

步骤4:运行程序

运行程序后,您将看到以下输出:

2022/12/31 23:59:59 Received a message: Message 1
2022/12/31 23:59:59 Message processed

然后,您将看到程序暂停5秒,然后输出:

2022/12/31 23:59:59 Received a message: Message 2
2022/12/31 23:59:59 Message processed

以此类推,直到所有消息都被处理完毕。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于golang监听rabbitmq消息队列任务断线自动重连接的问题 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • RabbitMQ如何配置基于时间的死信?

    在RabbitMQ中,基于时间的死信是一种Dead Letter Exchange(DLX)的类型,它是通过设置消息的过期时间来实现的。当消息过期时,它将被发送到DLX中,然后可以被重新路由到其他队列中进行处理。以下是RabbitMQ如何配置基于时间的死信的完整攻略: 创建DLX 首先,我们需要创建一个DLX,用于处理过期的消息。我们使用exchange_d…

    云计算 2023年5月5日
    00
  • Python通过RabbitMQ服务器实现交换机功能的实例教程

    下面是Python通过RabbitMQ服务器实现交换机功能的实例教程的完整攻略,包含两个示例说明。 简介 在分布式系统中,消息队列是一种常见的通信方式,它可以让不同的服务之间进行通信和协作。RabbitMQ是一个开源的消息队列系统,它支持多种消息协议,包括AMQP、STOMP、MQTT等。在Python中,我们可以使用pika库来实现与RabbitMQ的交互…

    RabbitMQ 2023年5月16日
    00
  • 解决python3 pika之连接断开的问题

    下面是解决Python3 Pika连接断开的问题的完整攻略,包含两个示例说明。 简介 Pika是一个Python编写的AMQP客户端库,用于与RabbitMQ进行通信。在使用Pika时,有时会遇到连接断开的问题。本文将介绍如何解决Python3 Pika连接断开的问题。 方法一:使用心跳检测 步骤1:安装Pika库 使用pip安装Pika库。在命令行中执行以…

    RabbitMQ 2023年5月16日
    00
  • CentOS 7.6 Telnet服务搭建过程(Openssh升级之战 第一任务备用运输线搭建)

    以下是“CentOS 7.6 Telnet服务搭建过程(Openssh升级之战 第一任务备用运输线搭建)”的完整攻略,包含两个示例。 简介 Telnet是一种远程登录协议,可以在本地计算机上登录到远程计算机上执行命令。本攻略将介绍在CentOS 7.6上搭建Telnet服务的过程。 CentOS 7.6 Telnet服务搭建过程 以下是在CentOS 7.6…

    RabbitMQ 2023年5月15日
    00
  • 1小时快速上手RabbitMQ(简介及安装过程)

    1小时快速上手RabbitMQ(简介及安装过程) RabbitMQ是一个开源的消息队列系统,支持多种消息传递协议。本文将详细讲解RabbitMQ的简介及安装过程,并提供两个示例说明。 RabbitMQ的简介 RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的消息队列系统,它可以实现不同应用程序之间的…

    RabbitMQ 2023年5月15日
    00
  • java轻量级规则引擎easy-rules使用介绍

    以下是“Java轻量级规则引擎Easy Rules使用介绍”的完整攻略,包含两个示例。 简介 Easy Rules是一个轻量级的Java规则引擎,它可以帮助开发人员快速实现业务规则。Easy Rules提供了简单易用的API,支持规则的定义、执行和管理。本攻略将详细介绍Easy Rules的使用方法,包括规则的定义、执行和管理,并提供两个示例,演示如何使用E…

    RabbitMQ 2023年5月15日
    00
  • springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    SpringBoot + RabbitMQ 如何实现消息确认机制(踩坑经验) 在本文中,我们将详细讲解如何使用SpringBoot和RabbitMQ实现消息确认机制。我们将提供两个示例说明,并分享一些踩坑经验。 环境准备 在开始本文之前,需要确保已经安装以下软件: JDK 1.8或更高版本 RabbitMQ服务器 Maven 示例一:使用SpringBoot…

    RabbitMQ 2023年5月15日
    00
  • Apache负载均衡设置方法 mod_proxy使用介绍

    以下是“Apache负载均衡设置方法 mod_proxy使用介绍”的完整攻略,包含两个示例。 简介 Apache是一款流行的Web服务器软件,它支持多种模块,其中mod_proxy是一个用于反向代理和负载均衡的模块。本攻略将详细介绍如何使用mod_proxy模块实现Apache的负载均衡功能,并提供两个示例,演示如何使用mod_proxy模块实现负载均衡。 …

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部