GoLang RabbitMQ实现六种工作模式示例

GoLang RabbitMQ实现六种工作模式示例

RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在实际应用中,我们经常需要使用 RabbitMQ 来实现消息传递功能。本文将介绍如何使用 GoLang 实现 RabbitMQ 的六种工作模式,并提供两个示例说明。

安装 RabbitMQ

首先需要安装 RabbitMQ。可以参考 官方文档 进行安装:https://www.rabbitmq.com/download.html

安装 GoLang 的 RabbitMQ 客户端库

使用以下命令安装 GoLang 的 RabbitMQ 客户端库:

go get github.com/streadway/amqp

六种工作模式

RabbitMQ 支持六种工作模式,分别为简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式和 RPC 模式。下面将分别介绍这六种工作模式的实现方法。

简单模式

在简单模式中,一个生产者向一个队列发送消息,一个消费者从该队列中接收消息。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    body := "Hello World!"
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

工作队列模式

在工作队列模式中,一个生产者向一个队列发送消息,多个消费者从该队列中接收消息。每个消息只能被一个消费者接收。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    body := "Hello World!"
    err = ch.Publish(
        "",           // exchange
        q.Name,       // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "time"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

发布/订阅模式

在发布/订阅模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。每个消息可以被多个消费者接收。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := "Hello World!"
    err = ch.Publish(
        "logs", // exchange
        "",     // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name, // queue name
        "",     // routing key
        "logs", // exchange
        false,
        nil,
    )
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

路由模式

在路由模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。每个消息可以被多个消费者接收,但是只有符合指定路由键的消费者才能接收到该消息。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs", // name
        "direct",      // type
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := "Hello World!"
    err = ch.Publish(
        "direct_logs", // exchange
        "info",        // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs", // name
        "direct",      // type
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    if len(os.Args) < 2 {
        log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
        os.Exit(0)
    }

    for _, s := range os.Args[1:] {
        err = ch.QueueBind(
            q.Name,       // queue name
            s,            // routing key
            "direct_logs", // exchange
            false,
            nil,
        )
        failOnError(err, "Failed to bind a queue")
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

主题模式

在主题模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。每个消息可以被多个消费者接收,但是只有符合指定主题的消费者才能接收到该消息。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "topic_logs", // name
        "topic",      // type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := "Hello World!"
    err = ch.Publish(
        "topic_logs", // exchange
        "anonymous.info", // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

```go
package main

import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
    "topic_logs", // name
    "topic",      // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // no-wait
    nil,          // arguments
)
failOnError(err, "Failed to declare an exchange")

q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:GoLang RabbitMQ实现六种工作模式示例 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • SpringBoot整合Canal与RabbitMQ监听数据变更记录

    以下是“SpringBoot整合Canal与RabbitMQ监听数据变更记录”的完整攻略,包含两个示例。 简介 Canal是一个开源的MySQL数据库增量订阅&消费组件,可以用于实时同步MySQL数据库的数据变更。RabbitMQ是一种流行的消息队列中间件,可以用于实现异步消息处理和调度。本攻略介绍如何使用Spring Boot整合Canal与Rab…

    RabbitMQ 2023年5月15日
    00
  • Spring Boot示例分析讲解自动化装配机制核心注解

    以下是“Spring Boot示例分析讲解自动化装配机制核心注解”的完整攻略,包含两个示例。 简介 在Spring Boot中,自动化装配机制是非常重要的一部分。在本攻略中,我们将介绍Spring Boot自动化装配机制的核心注解,并提供两个示例。 示例一:使用@Configuration注解进行自动化装配 以下是使用@Configuration注解进行自动…

    RabbitMQ 2023年5月15日
    00
  • Spring Boot系列教程之死信队列详解

    以下是“Spring Boot系列教程之死信队列详解”的完整攻略,包含两个示例。 简介 死信队列(Dead Letter Queue,DLQ)是一种特殊的消息队列,用于存储无法被消费的消息。在消息队列中,当消息无法被消费时,通常会将其放入死信队列中,以便后续处理。本攻略将详细介绍如何在Spring Boot中使用死信队列,并提供两个示例,演示如何使用死信队列…

    RabbitMQ 2023年5月15日
    00
  • PHP实现异步定时多任务消息推送

    以下是“PHP实现异步定时多任务消息推送”的完整攻略,包含两个示例。 简介 在本攻略中,我们将介绍如何使用PHP实现异步定时多任务消息推送。通过本攻略的学习,您将了解如何使用PHP创建定时任务,并使用消息队列实现异步消息推送。 示例一:使用PHP创建定时任务 以下是使用PHP创建定时任务的示例: <?php class Timer { private …

    RabbitMQ 2023年5月15日
    00
  • linux contos6.8下部署kafka集群的方法

    以下是“Linux Contos6.8下部署Kafka集群的方法”的完整攻略,包含两个示例。 简介 Kafka是一种高性能、分布式、可扩展的消息队列系统,可以实现大规模数据的实时处理和分发。本攻略将详细讲解如何在Linux Contos6.8下部署Kafka集群,并提供两个示例。 部署Kafka集群的方法 以下是在Linux Contos6.8下部署Kafk…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ消息确认机制剖析

    RabbitMQ消息确认机制剖析 RabbitMQ是一个开源的消息队列系统,支持多种消息递协议。在使用RabbitMQ时,消息确认机制是非常重要的一部分,它可以确保消息被正确地处理和传递。本文将详细讲解RabbitMQ消息确认机制的原理和使用方法,并提供两个示例说明。 RabbitMQ消息确认机制原理 RabbitMQ消息确认机制是指生产者发送消息到队列后,…

    RabbitMQ 2023年5月15日
    00
  • SpringBoot启动时自动执行sql脚本的方法步骤

    以下是“SpringBoot启动时自动执行sql脚本的方法步骤”的完整攻略,包含两个示例。 简介 SpringBoot是一款快速开发框架,常用于Web应用程序的开发。在开发过程中,我们经常需要在应用程序启动时执行一些初始化操作,例如执行SQL脚本。本攻略将详细讲解如何在SpringBoot启动时自动执行SQL脚本,包括使用SpringBoot自带的功能和使用…

    RabbitMQ 2023年5月15日
    00
  • spring boot 监控处理方案实例详解

    以下是“spring boot 监控处理方案实例详解”的完整攻略,包含两个示例说明。 简介 Spring Boot是一个非常流行的Java开发框架,它提供了一套完整的开发工具和框架,可以帮助开发人员快速构建Web应用程序。本攻略将介绍如何使用Spring Boot进行监控处理,并提供相应示例说明。 步骤1:安装监控处理方案 在使用Spring Boot进行监…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部