GoLang RabbitMQ实现六种工作模式示例

GoLang RabbitMQ实现六种工作模式示例

RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在实际应用中,我们经常需要使用 RabbitMQ 来实现消息传递功能。本文将介绍如何使用 GoLang 实现 RabbitMQ 的六种工作模式,并提供两个示例说明。

安装 RabbitMQ

首先需要安装 RabbitMQ。可以参考 官方文档 进行安装:https://www.rabbitmq.com/download.html

安装 GoLang 的 RabbitMQ 客户端库

使用以下命令安装 GoLang 的 RabbitMQ 客户端库:

go get github.com/streadway/amqp

六种工作模式

RabbitMQ 支持六种工作模式,分别为简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式和 RPC 模式。下面将分别介绍这六种工作模式的实现方法。

简单模式

在简单模式中,一个生产者向一个队列发送消息,一个消费者从该队列中接收消息。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    body := "Hello World!"
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

工作队列模式

在工作队列模式中,一个生产者向一个队列发送消息,多个消费者从该队列中接收消息。每个消息只能被一个消费者接收。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    body := "Hello World!"
    err = ch.Publish(
        "",           // exchange
        q.Name,       // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            DeliveryMode: amqp.Persistent,
            ContentType:  "text/plain",
            Body:         []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "time"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task_queue", // name
        true,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    failOnError(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

发布/订阅模式

在发布/订阅模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。每个消息可以被多个消费者接收。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := "Hello World!"
    err = ch.Publish(
        "logs", // exchange
        "",     // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "logs",   // name
        "fanout", // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.QueueBind(
        q.Name, // queue name
        "",     // routing key
        "logs", // exchange
        false,
        nil,
    )
    failOnError(err, "Failed to bind a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

路由模式

在路由模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。每个消息可以被多个消费者接收,但是只有符合指定路由键的消费者才能接收到该消息。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs", // name
        "direct",      // type
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := "Hello World!"
    err = ch.Publish(
        "direct_logs", // exchange
        "info",        // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

package main

import (
    "fmt"
    "log"
    "os"
    "strings"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "direct_logs", // name
        "direct",      // type
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    q, err := ch.QueueDeclare(
        "",    // name
        false, // durable
        false, // delete when unused
        true,  // exclusive
        false, // no-wait
        nil,   // arguments
    )
    failOnError(err, "Failed to declare a queue")

    if len(os.Args) < 2 {
        log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
        os.Exit(0)
    }

    for _, s := range os.Args[1:] {
        err = ch.QueueBind(
            q.Name,       // queue name
            s,            // routing key
            "direct_logs", // exchange
            false,
            nil,
        )
        failOnError(err, "Failed to bind a queue")
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

主题模式

在主题模式中,一个生产者向一个交换机发送消息,多个消费者从该交换机中接收消息。每个消息可以被多个消费者接收,但是只有符合指定主题的消费者才能接收到该消息。

生产者代码

package main

import (
    "fmt"
    "log"
    "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    err = ch.ExchangeDeclare(
        "topic_logs", // name
        "topic",      // type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )
    failOnError(err, "Failed to declare an exchange")

    body := "Hello World!"
    err = ch.Publish(
        "topic_logs", // exchange
        "anonymous.info", // routing key
        false,        // mandatory
        false,        // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")

    log.Printf(" [x] Sent %s", body)
}

消费者代码

```go
package main

import (
"fmt"
"log"
"os"
"strings"
"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}

func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

err = ch.ExchangeDeclare(
    "topic_logs", // name
    "topic",      // type
    true,         // durable
    false,        // auto-deleted
    false,        // internal
    false,        // no-wait
    nil,          // arguments
)
failOnError(err, "Failed to declare an exchange")

q, err := ch.QueueDeclare(
    "",    // name
    false, // durable
    false, // delete when unused
    true,  // exclusive
    false, // no-wait
    nil,   // arguments

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:GoLang RabbitMQ实现六种工作模式示例 - Python技术站

(0)
上一篇 2023年5月15日
下一篇 2023年5月15日

相关文章

  • python使用pika库调用rabbitmq交换机模式详解

    Python使用Pika库调用RabbitMQ交换机模式详解 在本文中,我们将介绍如何使用Python的Pika库调用RabbitMQ交换机模式,并提供两个示例说明。 环境准备 在开始之前,需要确保已安装了以下环境: Python 3.x Pika库 RabbitMQ 示例一:使用直接交换机发送和接收消息 在本例中,我们将使用直接交换机发送和接收消息。具体步…

    RabbitMQ 2023年5月15日
    00
  • Preload基础使用方法详解

    以下是“Preload基础使用方法详解”的完整攻略,包含两个示例。 简介 Preload是一种优化网站性能的技术,它可以在页面加载时预加载资源,以提高页面加载速度和用户体验。本攻略将介绍Preload的基础使用方法。 示例1:使用Preload预加载CSS文件 以下是一个使用Preload预加载CSS文件的示例: <!DOCTYPE html> …

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ如何监视队列?

    RabbitMQ是一个开源的消息代理软件,它可以用于构建分布式系统中的消息传递架构。在RabbitMQ中,消息是通过队列进行传递和处理的。为了确保RabbitMQ的正常运行,我们需要监视队列的状态。本文将详细介绍如何监视RabbitMQ队列,并提供两个示例说明。 监视RabbitMQ队列的步骤 以下是监视RabbitMQ队列的步骤: 安装RabbitMQ 我…

    云计算 2023年5月5日
    00
  • 使用Vert.x Maven插件快速创建项目的方法

    以下是“使用Vert.x Maven插件快速创建项目的方法”的完整攻略,包含两个示例。 简介 在本攻略中,我们将详细讲解如何使用Vert.x Maven插件快速创建项目。通过攻略的学习,您将了解Vert.x Maven插件的基本概念、如何配置Vert.x Maven插件以及如何使用Vert.x Maven插件快速创建项目。 示例一:配置Vert.x Mave…

    RabbitMQ 2023年5月15日
    00
  • java开源项目jeecgboot的超详细解析

    以下是“Java开源项目JEECGBOOT的超详细解析”的完整攻略,包含两个示例说明。 简介 JEECGBOOT是一个基于Spring Boot和Jeecg快速开发平台的开源项目,旨在提供一套完整的企业级开发解决方案。本攻略将介绍如何使用JEECGBOOT进行快速开发,并提供相应的示例说明。 步骤1:安装JEECGBOOT 1. 下载JEECGBOOT 可以…

    RabbitMQ 2023年5月15日
    00
  • docker启动elasticsearch时内存不足问题及解决方法

    以下是“docker启动elasticsearch时内存不足问题及解决方法”的完整攻略,包含两个示例。 简介 在使用Docker启动Elasticsearch时,可能会遇到内存不足的问题。这是因为Elasticsearch默认使用的JVM内存较大,而Docker默认分配的内存较小。本攻略将介绍如何解决Docker启动Elasticsearch时内存不足的问题…

    RabbitMQ 2023年5月15日
    00
  • node.js中TCP Socket多进程间的消息推送示例详解

    以下是“node.js中TCP Socket多进程间的消息推送示例详解”的完整攻略,包含两个示例说明。 简介 在node.js中,可以使用TCP Socket实现多进程间的消息推送。本教程将介绍如何使用TCP Socket实现多进程间的消息推送,并提供相应的示例说明。 示例1:使用cluster模块实现多进程间的消息推送 以下是一个使用cluster模块实现…

    RabbitMQ 2023年5月15日
    00
  • Docker安装RabbitMQ的超详细步骤

    以下是Docker安装RabbitMQ的超详细步骤: 首先,确保您已经安装了Docker。如果您还没有安装,请根据您的操作系统下载并安装Docker。 打开终端或命令行界面,并输入以下命令来拉取RabbitMQ的Docker镜像: docker pull rabbitmq:3-management 这将从Docker Hub上下载RabbitMQ的最新版本,…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部