下面是Java RabbitMQ的三种Exchange模式的完整攻略,包含两个示例说明。
简介
在RabbitMQ中,Exchange是消息路由器,它将消息路由到一个或多个队列中。Exchange有三种类型:Direct、Topic和Fanout。本文将详细介绍这三种Exchange类型的使用方法和示例。
Direct Exchange
Direct Exchange是最简单的Exchange类型,它将消息路由到与Routing Key完全匹配的队列中。在Direct Exchange中,Routing Key是一个字符串,用于指定消息应该被路由到哪个队列中。
示例一:发送和接收消息
步骤1:添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
步骤2:发送消息
创建一个名为Producer的类,用于发送消息。代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String ROUTING_KEY = "direct_routing_key";
private static final String MESSAGE = "Hello, RabbitMQ!";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
System.out.println("Sent message: " + MESSAGE);
channel.close();
connection.close();
}
}
在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main
方法中,我们创建了一个连接和一个通道,并声明了一个名为direct_exchange
的Direct Exchange。然后,我们使用basicPublish
方法发送一条消息,该消息的Routing Key为direct_routing_key
,消息内容为Hello, RabbitMQ!
。
步骤3:接收消息
创建一个名为Consumer的类,用于接收消息。代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String QUEUE_NAME = "direct_queue";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received message: " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
channel.close();
connection.close();
}
}
在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main
方法中,我们创建了一个连接和一个通道,并声明了一个名为direct_exchange
的Direct Exchange和一个名为direct_queue
的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume
方法开始接收消息。在接收到消息时,我们打印消息内容。
示例二:使用Spring Boot发送和接收消息
步骤1:添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步骤2:发送消息
创建一个名为Producer的类,用于发送消息。代码如下:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
private static final String EXCHANGE_NAME = "direct_exchange";
private static final String ROUTING_KEY = "direct_routing_key";
private static final String MESSAGE = "Hello, RabbitMQ!";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, MESSAGE);
System.out.println("Sent message: " + MESSAGE);
}
}
在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage
方法中,我们使用RabbitTemplate
发送一条消息,该消息的Routing Key为direct_routing_key
,消息内容为Hello, RabbitMQ!
。
步骤3:接收消息
创建一个名为Consumer的类,用于接收消息。代码如下:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private static final String QUEUE_NAME = "direct_queue";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage
方法中,我们使用@RabbitListener
注解指定要监听的队列,并打印消息内容。
Topic Exchange
Topic Exchange是一种更灵活的Exchange类型,它将消息路由到与Routing Key匹配的队列中。在Topic Exchange中,Routing Key是一个字符串,可以包含通配符*
和#
,用于指定消息应该被路由到哪个队列中。
示例一:发送和接收消息
步骤1:添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
步骤2:发送消息
创建一个名为Producer的类,用于发送消息。代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String EXCHANGE_NAME = "topic_exchange";
private static final String ROUTING_KEY = "topic_routing_key";
private static final String MESSAGE = "Hello, RabbitMQ!";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, MESSAGE.getBytes());
System.out.println("Sent message: " + MESSAGE);
channel.close();
connection.close();
}
}
在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main
方法中,我们创建了一个连接和一个通道,并声明了一个名为topic_exchange
的Topic Exchange。然后,我们使用basicPublish
方法发送一条消息,该消息的Routing Key为topic_routing_key
,消息内容为Hello, RabbitMQ!
。
步骤3:接收消息
创建一个名为Consumer的类,用于接收消息。代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String EXCHANGE_NAME = "topic_exchange";
private static final String QUEUE_NAME = "topic_queue";
private static final String ROUTING_KEY = "topic.#";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
System.out.println("Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received message: " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
channel.close();
connection.close();
}
}
在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main
方法中,我们创建了一个连接和一个通道,并声明了一个名为topic_exchange
的Topic Exchange和一个名为topic_queue
的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume
方法开始接收消息。在接收到消息时,我们打印消息内容。
示例二:使用Spring Boot发送和接收消息
步骤1:添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步骤2:发送消息
创建一个名为Producer的类,用于发送消息。代码如下:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
private static final String EXCHANGE_NAME = "topic_exchange";
private static final String ROUTING_KEY = "topic_routing_key";
private static final String MESSAGE = "Hello, RabbitMQ!";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, MESSAGE);
System.out.println("Sent message: " + MESSAGE);
}
}
在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage
方法中,我们使用RabbitTemplate
发送一条消息,该消息的Routing Key为topic_routing_key
,消息内容为Hello, RabbitMQ!
。
步骤3:接收消息
创建一个名为Consumer的类,用于接收消息。代码如下:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private static final String QUEUE_NAME = "topic_queue";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage
方法中,我们使用@RabbitListener
注解指定要监听的队列,并打印消息内容。
Fanout Exchange
Fanout Exchange是一种将消息广播到所有队列的Exchange类型。在Fanout Exchange中,Routing Key被忽略,消息将被路由到所有与Exchange绑定的队列中。
示例一:发送和接收消息
步骤1:添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
步骤2:发送消息
创建一个名为Producer的类,用于发送消息。代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String MESSAGE = "Hello, RabbitMQ!";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.basicPublish(EXCHANGE_NAME, "", null, MESSAGE.getBytes());
System.out.println("Sent message: " + MESSAGE);
channel.close();
connection.close();
}
}
在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在main
方法中,我们创建了一个连接和一个通道,并声明了一个名为fanout_exchange
的Fanout Exchange。然后,我们使用basicPublish
方法发送一条消息,该消息的Routing Key为空字符串,消息内容为Hello, RabbitMQ!
。
步骤3:接收消息
创建一个名为Consumer的类,用于接收消息。代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String QUEUE_NAME = "fanout_queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("Waiting for messages...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Received message: " + message);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
channel.close();
connection.close();
}
}
在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在main
方法中,我们创建了一个连接和一个通道,并声明了一个名为fanout_exchange
的Fanout Exchange和一个名为fanout_queue
的队列。然后,我们将队列绑定到Exchange上,并使用basicConsume
方法开始接收消息。在接收到消息时,我们打印消息内容。
示例二:使用Spring Boot发送和接收消息
步骤1:添加依赖
在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
步骤2:发送消息
创建一个名为Producer的类,用于发送消息。代码如下:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class Producer {
private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String MESSAGE = "Hello, RabbitMQ!";
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", MESSAGE);
System.out.println("Sent message: " + MESSAGE);
}
}
在上面的代码中,我们定义了一个名为Producer的类,用于发送消息。在sendMessage
方法中,我们使用RabbitTemplate
发送一条消息,该消息的Routing Key为空字符串,消息内容为Hello, RabbitMQ!
。
步骤3:接收消息
创建一个名为Consumer的类,用于接收消息。代码如下:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private static final String QUEUE_NAME = "fanout_queue";
@RabbitListener(queues = QUEUE_NAME)
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
在上面的代码中,我们定义了一个名为Consumer的类,用于接收消息。在receiveMessage
方法中,我们使用@RabbitListener
注解指定要监听的队列,并打印消息内容。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java RabbitMQ的三种Exchange模式 - Python技术站