Java RabbitMQ的三种Exchange模式

下面是Java RabbitMQ的三种Exchange模式的完整攻略,包含两个示例说明。

简介

在RabbitMQ中,Exchange是消息路由器,它将消息路由到一个或多个队列中。Exchange有三种类型:Direct、Topic和Fanout。本文将详细介绍这三种Exchange类型的使用方法和示例。

Direct Exchange

Direct Exchange是最简单的Exchange类型,它将消息路由到与Routing Key完全匹配的队列中。在Direct Exchange中,Routing Key是一个字符串,用于指定消息应该被路由到哪个队列中。

示例一:发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final String ROUTING_KEY = "direct_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());

        System.out.println("Sent message: " + MESSAGE);

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为direct_exchange的Direct Exchange。然后,我们使用basicPublish方法发送一条消息,该消息的Routing Key为direct_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final String QUEUE_NAME = "direct_queue";
    private static final String ROUTING_KEY = "direct_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("Waiting for messages...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为direct_exchange的Direct Exchange和一个名为direct_queue的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume方法开始接收消息。在接收到消息时,我们打印消息内容。

示例二:使用Spring Boot发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final String ROUTING_KEY = "direct_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, MESSAGE);
        System.out.println("Sent message: " + MESSAGE);
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage方法中,我们使用RabbitTemplate发送一条消息,该消息的Routing Key为direct_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    private static final String QUEUE_NAME = "direct_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage方法中,我们使用@RabbitListener注解指定要监听的队列,并打印消息内容。

Topic Exchange

Topic Exchange是一种更灵活的Exchange类型,它将消息路由到与Routing Key匹配的队列中。在Topic Exchange中,Routing Key是一个字符串,可以包含通配符*#,用于指定消息应该被路由到哪个队列中。

示例一:发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "topic_exchange";
    private static final String ROUTING_KEY = "topic_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());

        System.out.println("Sent message: " + MESSAGE);

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为topic_exchange的Topic Exchange。然后,我们使用basicPublish方法发送一条消息,该消息的Routing Key为topic_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String EXCHANGE_NAME = "topic_exchange";
    private static final String QUEUE_NAME = "topic_queue";
    private static final String ROUTING_KEY = "topic.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("Waiting for messages...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为topic_exchange的Topic Exchange和一个名为topic_queue的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume方法开始接收消息。在接收到消息时,我们打印消息内容。

示例二:使用Spring Boot发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    private static final String EXCHANGE_NAME = "topic_exchange";
    private static final String ROUTING_KEY = "topic_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, MESSAGE);
        System.out.println("Sent message: " + MESSAGE);
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage方法中,我们使用RabbitTemplate发送一条消息,该消息的Routing Key为topic_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    private static final String QUEUE_NAME = "topic_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage方法中,我们使用@RabbitListener注解指定要监听的队列,并打印消息内容。

Fanout Exchange

Fanout Exchange是一种将消息广播到所有队列的Exchange类型。在Fanout Exchange中,Routing Key被忽略,消息将被路由到所有与Exchange绑定的队列中。

示例一:发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        channel.basicPublish(EXCHANGE_NAME, "", null, MESSAGE.getBytes());

        System.out.println("Sent message: " + MESSAGE);

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为fanout_exchange的Fanout Exchange。然后,我们使用basicPublish方法发送一条消息,该消息的Routing Key为空字符串,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String QUEUE_NAME = "fanout_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        System.out.println("Waiting for messages...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为fanout_exchange的Fanout Exchange和一个名为fanout_queue的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume方法开始接收消息。在接收到消息时,我们打印消息内容。

示例二:使用Spring Boot发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", MESSAGE);
        System.out.println("Sent message: " + MESSAGE);
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage方法中,我们使用RabbitTemplate发送一条消息,该消息的Routing Key为空字符串,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    private static final String QUEUE_NAME = "fanout_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage方法中,我们使用@RabbitListener注解指定要监听的队列,并打印消息内容。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java RabbitMQ的三种Exchange模式 - Python技术站

(0)
上一篇 2023年5月16日
下一篇 2023年5月16日

相关文章

  • Spring Boot 使用 Disruptor 做内部高性能消息队列

    以下是“Spring Boot 使用 Disruptor 做内部高性能消息队列”的完整攻略,包含两个示例。 简介 Disruptor是一个高性能的内存消息队列,可以用于解决高并发场景下的消息处理问题。在Spring Boot中,可以使用Disruptor实现内部高性能消息队列。本攻略将介绍如何在Spring Boot中使用Disruptor。 配置Disru…

    RabbitMQ 2023年5月15日
    00
  • Go实现共享库的方法

    以下是“Go实现共享库的方法”的完整攻略,包含两个示例。 简介 共享库是一种可重用的代码组件,可以在多个程序中使用。在Go语言中,可以使用一些方法来实现共享库,本攻略将详细介绍这些方法。 步骤 以下是Go实现共享库的方法: 使用Go Modules Go Modules是Go语言的官方依赖管理工具,可以用于管理项目的依赖关系和版本控制。可以使用以下命令创建一…

    RabbitMQ 2023年5月15日
    00
  • 使用golang编写一个并发工作队列

    下面是使用golang编写一个并发工作队列的完整攻略,包含两个示例说明。 简介 并发工作队列是一种常见的并发编程模式,用于处理大量的任务。在本文中,我们将介绍如何使用golang编写一个并发工作队列。 步骤1:创建任务 在并发工作队列中,我们需要处理大量的任务。在本文中,我们将使用一个简单的任务来演示如何使用并发工作队列。代码如下: type Task st…

    RabbitMQ 2023年5月16日
    00
  • 大数据Kafka:消息队列和Kafka基本介绍

    以下是“大数据Kafka:消息队列和Kafka基本介绍”的完整攻略,包含两个示例。 简介 Kafka是一种高吞吐量的分布式消息队列,可以用于处理大量的实时数据。本攻略介绍了消息队列和Kafka的基本概念,并提供了两个示例。 消息队列 消息队列是一种用于在应用程序之间传递消息的技术。消息队列通常由生产者、消费者和队列组成。生产者将消息发送到队列中,消费者从队列…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ如何创建一个队列?

    RabbitMQ是一个开源的消息代理,它提供了可靠的消息传递机制。在RabbitMQ中,队列是存储消息的地方,它接收来自生产者的消息并将其保存在队列中,直到消费者准备好接收它们。以下是RabbitMQ创建队列的步骤: 创建连接 在创建队列之前,需要创建到RabbitMQ代理的连接。连接可以使用RabbitMQ提供的客户端库来创建。以下是一个使用Python客…

    云计算 2023年5月5日
    00
  • Springboot项目全局异常统一处理案例代码

    以下是“Spring Boot项目全局异常统一处理案例代码”的完整攻略,包含两个示例。 简介 在Spring Boot应用程序中,可以使用@ControllerAdvice和@ExceptionHandler注释来实现全局异常处理。这些注释允许开发人员定义一个或多个异常处理程序,以便在应用程序中捕获和处理异常。本攻略将介绍如何使用@ControllerAdv…

    RabbitMQ 2023年5月15日
    00
  • SpringBoot整合RabbitMQ及生产全场景高级特性实战

    SpringBoot整合RabbitMQ及生产全场景高级特性实战 本文将详细讲解如何使用SpringBoot整合RabbitMQ,并实现生产全场景高级特性。本文将提供两个示例说明。 环境准备 在开始本文之前,需要确保已经安装软件: JDK 1.8或更高版本 RabbitMQ服务器 Maven 示例一:使用SpringBoot发送和接收消息 在本示例中,我们将…

    RabbitMQ 2023年5月15日
    00
  • springboot2.0集成rabbitmq的示例代码

    以下是详细讲解Spring Boot 2.0集成RabbitMQ的示例代码的完整攻略,包含两个示例说明。 示例1:发送和接收简单的消息 步骤1:添加依赖 在您的Spring Boot项目中,您需要添加以下依赖: <dependency> <groupId>org.springframework.boot</groupId>…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部