RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析
RocketMQ是一个高性能、高可靠、可伸缩、分布式的消息中间件。在消息投递和消费的过程中,RocketMQ的ConsumeQueue与IndexFile起到了至关重要的作用,本篇文章将对其实时更新机制进行源码解析。
ConsumeQueue介绍
ConsumeQueue是RocketMQ用于存储消息消费进度的数据结构,它是一个顺序读写的消息存储结构。在消息消费时,消费者首先从ConsumeQueue中获取消息,并更新自己已经消费的进度。ConsumeQueue分为多个文件存储,每个文件固定大小为1G,文件名为CQxxxxxx,其中xxxxxx为文件偏移量(即存储消息的偏移量)。不同的主题(Topic)下面各自有一个ConsumeQueue。
ConsumeQueue实时更新机制
当有新消息被存储到commitlog中时,RocketMQ会将该消息的偏移量(offset)和消息的存储时间戳放入一个名为TransientStorePool的内存缓冲区中,并将该消息的commitlog文件名和偏移量存入与该主题对应的ConsumeQueue文件中。当TransientStorePool缓冲区满时,RocketMQ会将其中的数据写入磁盘,以保证消息的持久化。
每个ConsumeQueue文件中存储的是该偏移量所在的消息的存储时间戳(ConsumeQueue的索引值)。当消费者进行消息消费时,通过顺序读取ConsumeQueue中的索引值,消费者可以找出自己当前已经消费到的偏移量。
在ConsumeQueue实时更新机制中,当TransientStorePool缓冲区中有数据写入磁盘时,RocketMQ会在内存中构建一个Map,记录下新消息的偏移量和存储时间,并将其缓存到内存中的IndexFile中。IndexFile的大小为40M,RocketMQ会将IndexFile文件位置的offset和消息的偏移量存入前文提到的ConsumeQueue中。因为IndexFile是进程级别的Cache,所以可以加快消费者查找消息的速度。
ConsumeQueue的代码实现很长,在这里提供一个简单实现的示例:
public class ConsumeQueue {
private final MappedByteBuffer mappedByteBuffer;
public ConsumeQueue(File file, long mappedFileSize, AtomicInteger logic,
ConcurrentHashMap<Integer/* mappedFile position */, LongAdder/* physic offset */> indexMap) throws IOException {
......
}
public long get(long index) {
return mappedByteBuffer.getLong((int) index * CQ_STORE_UNIT_SIZE);
}
public void put(long index, long offsetPy) {
......
}
public boolean flush(final int flushLeastPages) {
......
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
this.mappedByteBuffer.position((int) nextFlushOffset);
byteBuffer.limit((int) (nextFlushOffset - this.mappedByteBuffer.position()));
FileChannel fileChannel = fileChannel();
long writeOffset = this.fileFromOffset + this.mappedByteBuffer.position();// 写入的位置
int writeBytes = byteBuffer.remaining();
if (writeBytes > 0) {
long beginTime = System.currentTimeMillis();
try {
fileChannel.write(byteBuffer);
fileChannel.force(false);
this.lastMappedFile.setWrotePosition(writeOffset + writeBytes);
return true;
} catch (Throwable e) {
......
}
}
return false;
}
......//其他方法实现
}
在上述示例中,我们可以看到ConsumeQueue的构造函数接受了一个参数mappedFileSize
,该参数表示MappedByteBuffer的大小,MappedByteBuffer是堆外内存映射的ByteBuffer,可以从该映射区域直接访问文件数据,因为 mappedByteBuffer 中的内容已经是文件中的数据,所以不需要进行I/O操作,就可以访问文件中的数据。
ConsumeQueue中在写入新消息完成后,会异步地在内存中创建一个Map对象,按照一定的格式将主题、队列ID、消息偏移量、消息存储时间戳等信息存放到IndexFile中,实现较为复杂,在此不再赘述。
IndexFile介绍
IndexFile是RocketMQ实现快速查找消息偏移量的文件,其由一个个固定大小的Index Slot组成。一个Index Slot是一组包含了message offset和phy offset两个属性的元组,用于快速查找具体消息在Commitlog中的位置。
每个IndexFile中包含多个Index Slot,每个 Index Slot 包含一个对应消息在 CommitLog 中的 offset、物理位置(物理存储) 和所属主题、队列 ID、时间戳等信息。每一个 IndexFile 有一个固定长度,大小默认为40MB。
IndexFile的代码实现也比较复杂,在这里提供一个简单实现的示例:
public class IndexFile {
private final MappedByteBuffer mappedByteBuffer;
private final ConcurrentHashMap<String, List<Index>> indexMap;
private final TreeMap<Long, Long> keyOffsetTable = new TreeMap<Long, Long>();
private final File file;
private String fileName;
public Index append(long endPhyOffset, int idx, long timeStamp, String filterBitMap) {
......
//每个slot存5个值,4个long值和一段byte数组
//offset = 0 第一个long值,消息偏移量
//offset = 8 第二个long值,消息物理偏移量
//offset = 16 第三个long值,消息提交时间戳
//offset = 24 第四个long值,消息在Commit Log中的长度
//offset = 32 Filter信息
// 将Index信息写入到mappedByteBuffer
mappedByteBuffer.putLong(nextIndex * INDEX_SIZE, offset);
mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 8, endPhyOffset);
mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 16, timeStamp);
mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 24, 0);
mappedByteBuffer.putInt(nextIndex * INDEX_SIZE + 32, filterBitMap.getBytes().length);
mappedByteBuffer.put(filterBitMap.getBytes());
......
return index;
}
public Index lookup(final long phyOffset, final int maxSearchKey) {
......
for (Map.Entry<Long, Long> next : keyOffsetTable.entrySet()) {
if (next.getKey() >= phyOffset) {
//获取消息索引信息
mappedByteBuffer.position((int) (next.getValue() & this.indexFileMappedByteBufferMaxSizeMask));
for (int i = 0; i < maxSearchKey; i++) {
final long offsetPy = this.indexHeaderSize + (i * INDEX_SIZE);
long tmp = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy) & this.indexFileMappedByteBufferMaxSizeMask);
if (tmp < phyOffset) {
continue;
}
long storeTime = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy + 16) & this.indexFileMappedByteBufferMaxSizeMask);
return new Index(next.getKey(), tmp, storeTime, i);
}
break;
}
}
return null;
}
......//其他方法实现
}
在该示例中,我们可以看到在IndexFile
中,Index Slot
的所有信息被写入到了mappedByteBuffer
中,组成一个Map,用于较快的获取对应的偏移量。同时,IndexFile还维护了一个TreeMap,将偏移量按顺序存储。
示例
假设我们有一个名为RocketMQTest
的主题,我们将消息的key、value、tag分别设置为"key1"、"value1"和"tag1"。我们可以通过以下代码来进行该主题下的生产和消费:
public class RocketMQSample {
public static void main(String[] args) {
try {
//生产者
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
//消息主题
String topic = "RocketMQTest";
Message message = new Message(topic, "tag1", "key1", "value1".getBytes());
SendResult sendResult = producer.send(message);
System.out.println("同步发送消息:" + sendResult);
//消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe(topic, "*");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
System.out.println(Thread.currentThread().getName() + " 消费消息:" + msgs.get(0).getKeys() + ":" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.in.read();
} catch (Exception e) {
e.printStackTrace();
}
}
}
在生产者发送完消息后,我们可以通过BrokerController
类中的doDispatch
方法查看ConsumeQueue和IndexFile的实时更新情况:
public class BrokerController {
private final transient ReentrantLock lock = new ReentrantLock();
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable =
new ConcurrentHashMap<>(1024);
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, IndexFile>> indexFileTable =
new ConcurrentHashMap<>(1024);
private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, MappedFileQueue>> mappedFileQueueTable =
new ConcurrentHashMap<>(1024);
......
private boolean doDispatch(final ChannelHandlerContext ctx, final RemotingCommand request,
final RemotingResponseCallback responseCallback) {
......
// 添加到 Commit Log 文件
final byte[] body = request.getBody();
final long offset = commitLog.putMessage(body);
assert offset > 0;
resultData.setStatus(SendStatus.SEND_OK);
resultData.setOffset(offset);
//这里我们可以查看ConsumeQueue和IndexFile的实时更新情况。
ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(msgExt.getTopic());
if (map != null) {
ConsumeQueue cq = map.get(msgExt.getQueueId());
if (cq != null) {
cq.putMessagePositionInfoWrapper(queueOffset, offset, size, System.currentTimeMillis(), messageExt.getStoreHost());
}
}
ConcurrentHashMap<Integer, IndexFile> indexMap = indexFileTable.get(msgExt.getTopic());
if (indexMap != null) {
IndexFile ifile = indexMap.get(msgExt.getQueueId());
if (ifile != null) {
ifile.update(indexMsgTimestamp, offset);
}
}
......
return true;
}
......//其他方法实现
}
从代码的执行过程中,我们可以看到当有新的消息被写入CommitLog中时,ConsumeQueue和IndexFile文件中都会实时更新,保证了消费者可以准确地快速查找到自己消费的偏移量。
以上就是RocketMQ ConsumeQueue与IndexFile实时更新机制的源码解析,其中包含了两条示例,帮助大家更加深入地理解RocketMQ消息中间件的内部实现机制。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析 - Python技术站