GoLang RabbitMQ实现六种工作模式示例
RabbitMQ 是一个开源的消息队列系统,支持多种消息传递协议。在实际应用中,我们经常需要使用 RabbitMQ 来实现消息传递功能。本文将介绍如何使用 GoLang 实现 RabbitMQ 的六种工作模式,并提供两个示例说明。
安装 RabbitMQ
首先需要安装 RabbitMQ。可以参考 官方文档 进行安装:https://www.rabbitmq.com/download.html
安装 GoLang 的 RabbitMQ 客户端库
使用以下命令安装 GoLang 的 RabbitMQ 客户端库:
go get github.com/streadway/amqp
RabbitMQ 支持六种工作模式,分别为简单模式、工作队列模式、发布/订阅模式、路由模式、主题模式和 RPC 模式。下面将分别介绍这六种工作模式的实现方法。
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
ContentType: "text/plain",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"task_queue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
failOnError(err, "Failed to set QoS")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare an exchange")
body := "Hello World!"
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
ContentType: "text/plain",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"direct_logs", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare an exchange")
body := "Hello World!"
err = ch.Publish(
"direct_logs", // exchange
"info", // routing key
false, // mandatory
false, // immediate
ContentType: "text/plain",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"direct_logs", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare a queue")
if len(os.Args) < 2 {
log.Printf("Usage: %s [info] [warning] [error]", os.Args[0])
for _, s := range os.Args[1:] {
err = ch.QueueBind(
q.Name, // queue name
s, // routing key
"direct_logs", // exchange
failOnError(err, "Failed to bind a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"topic_logs", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare an exchange")
body := "Hello World!"
err = ch.Publish(
"topic_logs", // exchange
"anonymous.info", // routing key
false, // mandatory
false, // immediate
ContentType: "text/plain",
Body: []byte(body),
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
package main
import (
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"topic_logs", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
failOnError(err, "Failed to declare an exchange")
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
