Java RabbitMQ的三种Exchange模式

yizhihongxing

下面是Java RabbitMQ的三种Exchange模式的完整攻略,包含两个示例说明。

简介

在RabbitMQ中,Exchange是消息路由器,它将消息路由到一个或多个队列中。Exchange有三种类型:Direct、Topic和Fanout。本文将详细介绍这三种Exchange类型的使用方法和示例。

Direct Exchange

Direct Exchange是最简单的Exchange类型,它将消息路由到与Routing Key完全匹配的队列中。在Direct Exchange中,Routing Key是一个字符串,用于指定消息应该被路由到哪个队列中。

示例一:发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final String ROUTING_KEY = "direct_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());

        System.out.println("Sent message: " + MESSAGE);

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为direct_exchange的Direct Exchange。然后,我们使用basicPublish方法发送一条消息,该消息的Routing Key为direct_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final String QUEUE_NAME = "direct_queue";
    private static final String ROUTING_KEY = "direct_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("Waiting for messages...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为direct_exchange的Direct Exchange和一个名为direct_queue的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume方法开始接收消息。在接收到消息时,我们打印消息内容。

示例二:使用Spring Boot发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    private static final String EXCHANGE_NAME = "direct_exchange";
    private static final String ROUTING_KEY = "direct_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, MESSAGE);
        System.out.println("Sent message: " + MESSAGE);
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage方法中,我们使用RabbitTemplate发送一条消息,该消息的Routing Key为direct_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    private static final String QUEUE_NAME = "direct_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage方法中,我们使用@RabbitListener注解指定要监听的队列,并打印消息内容。

Topic Exchange

Topic Exchange是一种更灵活的Exchange类型,它将消息路由到与Routing Key匹配的队列中。在Topic Exchange中,Routing Key是一个字符串,可以包含通配符*#,用于指定消息应该被路由到哪个队列中。

示例一:发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "topic_exchange";
    private static final String ROUTING_KEY = "topic_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());

        System.out.println("Sent message: " + MESSAGE);

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为topic_exchange的Topic Exchange。然后,我们使用basicPublish方法发送一条消息,该消息的Routing Key为topic_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String EXCHANGE_NAME = "topic_exchange";
    private static final String QUEUE_NAME = "topic_queue";
    private static final String ROUTING_KEY = "topic.#";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

        System.out.println("Waiting for messages...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为topic_exchange的Topic Exchange和一个名为topic_queue的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume方法开始接收消息。在接收到消息时,我们打印消息内容。

示例二:使用Spring Boot发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    private static final String EXCHANGE_NAME = "topic_exchange";
    private static final String ROUTING_KEY = "topic_routing_key";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, MESSAGE);
        System.out.println("Sent message: " + MESSAGE);
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage方法中,我们使用RabbitTemplate发送一条消息,该消息的Routing Key为topic_routing_key,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    private static final String QUEUE_NAME = "topic_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage方法中,我们使用@RabbitListener注解指定要监听的队列,并打印消息内容。

Fanout Exchange

Fanout Exchange是一种将消息广播到所有队列的Exchange类型。在Fanout Exchange中,Routing Key被忽略,消息将被路由到所有与Exchange绑定的队列中。

示例一:发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {

    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        channel.basicPublish(EXCHANGE_NAME, "", null, MESSAGE.getBytes());

        System.out.println("Sent message: " + MESSAGE);

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为fanout_exchange的Fanout Exchange。然后,我们使用basicPublish方法发送一条消息,该消息的Routing Key为空字符串,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {

    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String QUEUE_NAME = "fanout_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        System.out.println("Waiting for messages...");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("Received message: " + message);
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});

        channel.close();
        connection.close();
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main方法中,我们创建了一个连接和一个通道,并声明了一个名为fanout_exchange的Fanout Exchange和一个名为fanout_queue的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume方法开始接收消息。在接收到消息时,我们打印消息内容。

示例二:使用Spring Boot发送和接收消息

步骤1:添加依赖

在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

步骤2:发送消息

创建一个名为Producer的类,用于发送消息。代码如下:

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Producer {

    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String MESSAGE = "Hello, RabbitMQ!";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", MESSAGE);
        System.out.println("Sent message: " + MESSAGE);
    }
}

在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage方法中,我们使用RabbitTemplate发送一条消息,该消息的Routing Key为空字符串,消息内容为Hello, RabbitMQ!

步骤3:接收消息

创建一个名为Consumer的类,用于接收消息。代码如下:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    private static final String QUEUE_NAME = "fanout_queue";

    @RabbitListener(queues = QUEUE_NAME)
    public void receiveMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage方法中,我们使用@RabbitListener注解指定要监听的队列,并打印消息内容。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java RabbitMQ的三种Exchange模式 - Python技术站

(0)
上一篇 2023年5月16日
下一篇 2023年5月16日

相关文章

  • Java RabbitMQ的持久化和发布确认详解

    Java RabbitMQ的持久化和发布确认详解 在本文中,我们将详细讲解Java RabbitMQ的持久化和发布确认。我们将介绍RabbitMQ的基本概念和使用方法,并提供两个示例说明。 RabbitMQ基本概念 在使用RabbitMQ之前,需要了解一些基本概念: 生产者(Producer):发送消息的应用程序。 消费者(Consumer):接收消息的应用…

    RabbitMQ 2023年5月15日
    00
  • linux contos6.8下部署kafka集群的方法

    以下是“Linux Contos6.8下部署Kafka集群的方法”的完整攻略,包含两个示例。 简介 Kafka是一种高性能、分布式、可扩展的消息队列系统,可以实现大规模数据的实时处理和分发。本攻略将详细讲解如何在Linux Contos6.8下部署Kafka集群,并提供两个示例。 部署Kafka集群的方法 以下是在Linux Contos6.8下部署Kafk…

    RabbitMQ 2023年5月15日
    00
  • PHP Swoole异步Redis客户端实现方法示例

    以下是“PHP Swoole异步Redis客户端实现方法示例”的完整攻略,包含两个示例。 简介 在本攻略中,我们将详细讲解如何使用PHP Swoole异步Redis客户端实现异步Redis操作。通过攻略的学习,您将了解PHP Swoole的基本概念、如何使用PHP Swoole异步Redis客户端以及如何使用PHP Swoole实现异步Redis操作。 示例…

    RabbitMQ 2023年5月15日
    00
  • Redis如何实现延迟队列

    以下是Redis如何实现延迟队列的完整攻略,包含两个示例。 简介 Redis是一个流行的内存数据库,它支持多种数据结构,包括字符串、哈希表、列表、集合和有序集合。Redis可以使用有序集合来实现延迟队列,以便在分布式系统中处理延迟任务。本攻略将详细讲解Redis如何实现延迟队列,并提供两个示例。 示例一:使用Redis实现延迟队列 以下是使用Redis实现延…

    RabbitMQ 2023年5月15日
    00
  • RabbitMQ之什么是ACK?

    ACK是RabbitMQ中的一个重要概念,它用于确保消息已被正确处理。以下是RabbitMQ如何处理ACK的完整攻略: 消息确认机制 在RabbitMQ中,消息确认是一种机制,用于确保消息已被消费者正确处理。当消费者从队列中获取消息时,它可以向RabbitMQ发送确认消息,告诉RabbitMQ已经成功处理了该消息。如果消费者无法处理消息,则可以拒绝消息并将其…

    云计算 2023年5月5日
    00
  • RabbitMQ如何实现消息过滤?

    RabbitMQ可以通过Binding Key来实现消息过滤。Binding Key是一个字符串,它与Exchange和Queue绑定在一起,用于确定Exchange应该将消息发送到哪个Queue。通过设置不同的Binding Key,可以将消息路由到不同的Queue中,从而实现消息过滤。以下是RabbitMQ实现消息过滤的完整攻略: 创建Exchange和…

    云计算 2023年5月5日
    00
  • Rabbitmq消息推送功能实现示例

    以下是“RabbitMQ消息推送功能实现示例”的完整攻略,包含两个示例。 简介 RabbitMQ是一个开源的消息代理,用于实现高效的消息传递。它支持多种消息推送方式,包括广播、单播和多播。本攻略将详细讲解RabbitMQ的消息推送功能原理、应用场景和实现方法,包括示例说明。 示例一:广播推送 以下是广播推送的示例: 创建一个生产者,向RabbitMQ发送消息…

    RabbitMQ 2023年5月15日
    00
  • 分布式之全面了解Kafka的使用与特性

    以下是“分布式之全面了解Kafka的使用与特性”的完整攻略,包含两个示例。 简介 Kafka是一个分布式的、高吞吐量的消息队列系统,可以用于处理大量的实时数据。Kafka具有高可靠性、高扩展性、高性能等特点,被广泛应用于大数据、云计算、物联网等领域。本攻略将介绍如何使用Kafka以及Kafka的特性。 示例1:使用Kafka实现消息生产和消费 以下是使用Ka…

    RabbitMQ 2023年5月15日
    00
合作推广
合作推广
分享本页
返回顶部