接下来我将详细讲解Java进程间通信之消息队列的完整攻略。
什么是消息队列
消息队列是一种通过在应用程序之间异步地传输数据来解决耦合问题的技术。它允许发送者,通常是独立的应用程序,将消息发送到队列中而不需要实时处理它。相反,接收者从队列中接收消息并在合适的时候进行处理。
消息队列的作用
使用消息队列可以将应用程序之间的通信和解耦,提高了系统的可靠性、可扩展性和在消息发送端和接收端处理速度不一致的情况下的中间缓冲功能。
消息队列的流程
消息队列的流程大致如下:
-
发送者将消息发送到队列中。
-
队列将消息存储在队列中,并等待接收者获取消息。
-
接收者从队列中获取消息并进行处理。
Java中的消息队列
在Java中使用消息队列可以使用JMS(Java消息服务)或AMQP(高级消息队列协议)。
本文着重讲解使用JMS的过程。
JMS消息模型
JMS定义了两个消息模型:点对点(Point-to-Point,P2P)和发布订阅(Publish-Subscribe,Pub/Sub)。
对于点对点消息,一个消息只能被一个接收者获取。对于发布订阅消息,一个消息可以被多个接收者获取。
使用JMS消息队列
1. 配置Maven
在使用JMS之前,需要在Maven中配置ActiveMQ的依赖。在pom.xml文件中添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
</dependencies>
2. 发送消息
发送方需要使用以下步骤来发送消息:
- 创建一个ConnectionFactory对象,指定ActiveMQ的地址和端口号。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- 创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
- 启动Connection。
connection.start();
- 创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- 创建一个Destination(队列或主题)对象,该对象表示应该将消息发送到的目标。
Destination destination = session.createQueue("example.queue");
- 创建一个MessageProducer对象,该对象将消息发送到队列或主题。
MessageProducer producer = session.createProducer(destination);
- 创建一个消息。
TextMessage message = session.createTextMessage("Hello world!");
- 把消息发送到队列或主题。
producer.send(message);
- 关闭所有的JMS对象。
producer.close();
session.close();
connection.close();
3. 接收消息
接收方需要使用以下步骤来接收消息:
- 创建一个ConnectionFactory对象,指定ActiveMQ的地址和端口号。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
- 创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
- 启动Connection。
connection.start();
- 创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- 创建一个Destination(队列或主题)对象,该对象表示应该将消息发送到的目标。
Destination destination = session.createQueue("example.queue");
- 创建一个MessageConsumer对象,该对象可以从队列或主题中接收消息。
MessageConsumer consumer = session.createConsumer(destination);
- 创建一个消息处理类。
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println("Received message: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 注册消息处理类到MessageConsumer对象。
consumer.setMessageListener(listener);
- 等待消息到达。
需要注意的是,这个步骤不会阻塞程序,所以需要让程序休眠一段时间。
Thread.sleep(1000);
- 关闭所有的JMS对象。
consumer.close();
session.close();
connection.close();
4. 示例说明
下面是一个简单的示例,在该示例中,发送方将一条简单的文本消息发送到队列中,接收方接收该消息并将其打印到控制台上。
发送方示例
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
public static void main(String[] args) throws Exception {
// 1. 创建一个ConnectionFactory对象,指定ActiveMQ的地址和端口号。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 3. 启动Connection。
connection.start();
// 4. 创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个Destination(队列或主题)对象,该对象表示应该将消息发送到的目标。
Destination destination = session.createQueue("example.queue");
// 6. 创建一个MessageProducer对象,该对象将消息发送到队列或主题。
MessageProducer producer = session.createProducer(destination);
// 7. 创建一个消息。
TextMessage message = session.createTextMessage("Hello world!");
// 8. 把消息发送到队列或主题。
producer.send(message);
// 9. 关闭所有的JMS对象。
producer.close();
session.close();
connection.close();
}
}
接收方示例
import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
public static void main(String[] args) throws Exception {
// 1. 创建一个ConnectionFactory对象,指定ActiveMQ的地址和端口号。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// 2. 创建一个Connection对象。
Connection connection = connectionFactory.createConnection();
// 3. 启动Connection。
connection.start();
// 4. 创建一个Session对象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5. 创建一个Destination(队列或主题)对象,该对象表示应该将消息发送到的目标。
Destination destination = session.createQueue("example.queue");
// 6. 创建一个MessageConsumer对象,该对象可以从队列或主题中接收消息。
MessageConsumer consumer = session.createConsumer(destination);
// 7. 创建一个消息处理类。
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
try {
String text = ((TextMessage) message).getText();
System.out.println("Received message: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
};
// 8. 注册消息处理类到MessageConsumer对象。
consumer.setMessageListener(listener);
// 9. 等待消息到达。
Thread.sleep(1000);
// 10. 关闭所有的JMS对象。
consumer.close();
session.close();
connection.close();
}
}
这就是使用JMS消息队列的完整攻略。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java进程间通信之消息队列 - Python技术站