以下是Kafka、RabbitMQ和RocketMQ队列的消息可靠性保证分析的完整攻略,包含两个示例说明。
Kafka
Kafka通过以下机制来保证消息的可靠性:
-
生产者确认机制:生产者在发送消息后,会等待Broker的确认消息,确认消息包含了消息的偏移量,生产者会将偏移量保存在本地,以便在需要重发消息时使用。
-
备份机制:Kafka通过副本机制来保证消息的可靠性,每个分区都有多个副本,其中一个副本为Leader,其他副本为Follower。生产者将消息发送到Leader,Leader将消息复制到Follower,只有当Leader和Follower都确认接收到消息后,生产者才会认为消息发送成功。
-
ISR机制:Kafka通过ISR(In-Sync Replicas)机制来保证消息的可靠性,只有在ISR列表中的副本才能成为Leader,当Follower与Leader的同步延迟超过一定时间时,Follower会被从ISR列表中移除,只有当Follower与Leader的同步延迟恢复正常后,Follower才会重新加入ISR列表。
示例1:发送消息
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.Future;
public class Producer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "Hello World!");
Future<RecordMetadata> future = producer.send(record);
try {
RecordMetadata metadata = future.get();
System.out.println("Message sent to partition " + metadata.partition() + ", offset " + metadata.offset());
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
示例2:接收消息
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class Consumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my_group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("my_topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
System.out.println("Message received from partition " + record.partition() + ", offset " + record.offset() + ": " + record.value());
});
}
}
}
RabbitMQ
RabbitMQ通过以下机制来保证消息的可靠性:
-
生产者确认机制:生产者在发送消息后,会等待Broker的确认消息,确认消息包含了消息的标识符,生产者会将标识符保存在本地,以便在需要重发消息时使用。
-
消费者确认机制:消费者在接收到消息后,会向Broker发送确认消息,确认消息包含了消息的标识符,只有当Broker收到确认消息后,才会将消息从队列中删除。
-
消息持久化机制:RabbitMQ支持将消息持久化到磁盘,以便在Broker宕机后,消息不会丢失。
示例1:发送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("my_queue", true, false, false, null);
String message = "Hello World!";
channel.basicPublish("", "my_queue", null, message.getBytes());
System.out.println("Message sent: " + message);
channel.close();
connection.close();
}
}
示例2:接收消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("my_queue", true, false, false, null);
channel.basicConsume("my_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Message received: " + message);
}
});
}
}
RocketMQ
RocketMQ通过以下机制来保证消息的可靠性:
-
生产者确认机制:生产者在发送消息后,会等待Broker的确认消息,确认消息包含了消息的偏移量,生产者会将偏移量保存在本地,以便在需要重发消息时使用。
-
消费者确认机制:消费者在接收到消息后,会向Broker发送确认消息,确认消息包含了消息的偏移量,只有当Broker收到确认消息后,才会将消息从队列中删除。
-
消息持久化机制:RocketMQ支持将消息持久化到磁盘,以便在Broker宕机后,消息不会丢失。
示例1:发送消息
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("my_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message message = new Message("my_topic", "Hello World!".getBytes());
producer.send(message);
System.out.println("Message sent: " + new String(message.getBody()));
producer.shutdown();
}
}
示例2:接收消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("my_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
System.out.println("Message received: " + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
以上就是Kafka、RabbitMQ和RocketMQ队列的消息可靠性保证分析的完整攻略,包含两个示例说明。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:kafka rabbitMQ及rocketMQ队列的消息可靠性保证分析 - Python技术站