RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ是一个高性能、高可靠、可伸缩、分布式的消息中间件。在消息投递和消费的过程中,RocketMQ的ConsumeQueue与IndexFile起到了至关重要的作用,本篇文章将对其实时更新机制进行源码解析。

ConsumeQueue介绍

ConsumeQueue是RocketMQ用于存储消息消费进度的数据结构,它是一个顺序读写的消息存储结构。在消息消费时,消费者首先从ConsumeQueue中获取消息,并更新自己已经消费的进度。ConsumeQueue分为多个文件存储,每个文件固定大小为1G,文件名为CQxxxxxx,其中xxxxxx为文件偏移量(即存储消息的偏移量)。不同的主题(Topic)下面各自有一个ConsumeQueue。

ConsumeQueue实时更新机制

当有新消息被存储到commitlog中时,RocketMQ会将该消息的偏移量(offset)和消息的存储时间戳放入一个名为TransientStorePool的内存缓冲区中,并将该消息的commitlog文件名和偏移量存入与该主题对应的ConsumeQueue文件中。当TransientStorePool缓冲区满时,RocketMQ会将其中的数据写入磁盘,以保证消息的持久化。

每个ConsumeQueue文件中存储的是该偏移量所在的消息的存储时间戳(ConsumeQueue的索引值)。当消费者进行消息消费时,通过顺序读取ConsumeQueue中的索引值,消费者可以找出自己当前已经消费到的偏移量。

在ConsumeQueue实时更新机制中,当TransientStorePool缓冲区中有数据写入磁盘时,RocketMQ会在内存中构建一个Map,记录下新消息的偏移量和存储时间,并将其缓存到内存中的IndexFile中。IndexFile的大小为40M,RocketMQ会将IndexFile文件位置的offset和消息的偏移量存入前文提到的ConsumeQueue中。因为IndexFile是进程级别的Cache,所以可以加快消费者查找消息的速度。

ConsumeQueue的代码实现很长,在这里提供一个简单实现的示例:

public class ConsumeQueue {
    private final MappedByteBuffer mappedByteBuffer;

    public ConsumeQueue(File file, long mappedFileSize, AtomicInteger logic,
            ConcurrentHashMap<Integer/* mappedFile position */, LongAdder/* physic offset */> indexMap) throws IOException {
        ......
    }

    public long get(long index) {
        return mappedByteBuffer.getLong((int) index * CQ_STORE_UNIT_SIZE);
    }

    public void put(long index, long offsetPy) {
        ......
    }

    public boolean flush(final int flushLeastPages) {
        ......

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        this.mappedByteBuffer.position((int) nextFlushOffset);
        byteBuffer.limit((int) (nextFlushOffset - this.mappedByteBuffer.position()));
        FileChannel fileChannel = fileChannel();
        long writeOffset = this.fileFromOffset + this.mappedByteBuffer.position();// 写入的位置

        int writeBytes = byteBuffer.remaining();
        if (writeBytes > 0) {
            long beginTime = System.currentTimeMillis();
            try {
                fileChannel.write(byteBuffer);
                fileChannel.force(false);
                this.lastMappedFile.setWrotePosition(writeOffset + writeBytes);
                return true;
            } catch (Throwable e) {
                ......
            }
        }
        return false;
    }

    ......//其他方法实现
}

在上述示例中,我们可以看到ConsumeQueue的构造函数接受了一个参数mappedFileSize,该参数表示MappedByteBuffer的大小,MappedByteBuffer是堆外内存映射的ByteBuffer,可以从该映射区域直接访问文件数据,因为 mappedByteBuffer 中的内容已经是文件中的数据,所以不需要进行I/O操作,就可以访问文件中的数据。

ConsumeQueue中在写入新消息完成后,会异步地在内存中创建一个Map对象,按照一定的格式将主题、队列ID、消息偏移量、消息存储时间戳等信息存放到IndexFile中,实现较为复杂,在此不再赘述。

IndexFile介绍

IndexFile是RocketMQ实现快速查找消息偏移量的文件,其由一个个固定大小的Index Slot组成。一个Index Slot是一组包含了message offset和phy offset两个属性的元组,用于快速查找具体消息在Commitlog中的位置。

每个IndexFile中包含多个Index Slot,每个 Index Slot 包含一个对应消息在 CommitLog 中的 offset、物理位置(物理存储) 和所属主题、队列 ID、时间戳等信息。每一个 IndexFile 有一个固定长度,大小默认为40MB。

IndexFile的代码实现也比较复杂,在这里提供一个简单实现的示例:

public class IndexFile {
    private final MappedByteBuffer mappedByteBuffer;
    private final ConcurrentHashMap<String, List<Index>> indexMap;
    private final TreeMap<Long, Long> keyOffsetTable = new TreeMap<Long, Long>();
    private final File file;
    private String fileName;

    public Index append(long endPhyOffset, int idx, long timeStamp, String filterBitMap) {
        ......

        //每个slot存5个值,4个long值和一段byte数组
        //offset = 0   第一个long值,消息偏移量
        //offset = 8   第二个long值,消息物理偏移量
        //offset = 16  第三个long值,消息提交时间戳
        //offset = 24  第四个long值,消息在Commit Log中的长度
        //offset = 32  Filter信息

        // 将Index信息写入到mappedByteBuffer
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE, offset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 8, endPhyOffset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 16, timeStamp);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 24, 0);
        mappedByteBuffer.putInt(nextIndex * INDEX_SIZE + 32, filterBitMap.getBytes().length);
        mappedByteBuffer.put(filterBitMap.getBytes());

        ......

        return index;
    }

    public Index lookup(final long phyOffset, final int maxSearchKey) {
        ......

        for (Map.Entry<Long, Long> next : keyOffsetTable.entrySet()) {
            if (next.getKey() >= phyOffset) {
                //获取消息索引信息
                mappedByteBuffer.position((int) (next.getValue() & this.indexFileMappedByteBufferMaxSizeMask));
                for (int i = 0; i < maxSearchKey; i++) {
                    final long offsetPy = this.indexHeaderSize + (i * INDEX_SIZE);
                    long tmp = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy) & this.indexFileMappedByteBufferMaxSizeMask);
                    if (tmp < phyOffset) { 
                        continue;
                    }

                    long storeTime = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy + 16) & this.indexFileMappedByteBufferMaxSizeMask);
                    return new Index(next.getKey(), tmp, storeTime, i);
                }
                break;
            }
        }
        return null;
    }

    ......//其他方法实现
}

在该示例中,我们可以看到在IndexFile中,Index Slot的所有信息被写入到了mappedByteBuffer中,组成一个Map,用于较快的获取对应的偏移量。同时,IndexFile还维护了一个TreeMap,将偏移量按顺序存储。

示例

假设我们有一个名为RocketMQTest的主题,我们将消息的key、value、tag分别设置为"key1"、"value1"和"tag1"。我们可以通过以下代码来进行该主题下的生产和消费:

public class RocketMQSample {
    public static void main(String[] args) {
        try {
            //生产者
            DefaultMQProducer producer = new DefaultMQProducer("producer-group");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            //消息主题
            String topic = "RocketMQTest";
            Message message = new Message(topic, "tag1", "key1", "value1".getBytes());
            SendResult sendResult = producer.send(message);

            System.out.println("同步发送消息:" + sendResult);

            //消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.println(Thread.currentThread().getName() + " 消费消息:" + msgs.get(0).getKeys() + ":" + new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

            consumer.start();
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在生产者发送完消息后,我们可以通过BrokerController类中的doDispatch方法查看ConsumeQueue和IndexFile的实时更新情况:

public class BrokerController {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, IndexFile>> indexFileTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, MappedFileQueue>> mappedFileQueueTable =
            new ConcurrentHashMap<>(1024);

    ......

    private boolean doDispatch(final ChannelHandlerContext ctx, final RemotingCommand request,
                               final RemotingResponseCallback responseCallback) {
        ......

        // 添加到 Commit Log 文件
        final byte[] body = request.getBody();
        final long offset = commitLog.putMessage(body);
        assert offset > 0;
        resultData.setStatus(SendStatus.SEND_OK);
        resultData.setOffset(offset);

        //这里我们可以查看ConsumeQueue和IndexFile的实时更新情况。
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(msgExt.getTopic());
        if (map != null) {
            ConsumeQueue cq = map.get(msgExt.getQueueId());
            if (cq != null) {
                cq.putMessagePositionInfoWrapper(queueOffset, offset, size, System.currentTimeMillis(), messageExt.getStoreHost());
            }
        }
        ConcurrentHashMap<Integer, IndexFile> indexMap = indexFileTable.get(msgExt.getTopic());
        if (indexMap != null) {
            IndexFile ifile = indexMap.get(msgExt.getQueueId());
            if (ifile != null) {
                ifile.update(indexMsgTimestamp, offset);
            }
        }

        ......

        return true;
    }

    ......//其他方法实现
}

从代码的执行过程中,我们可以看到当有新的消息被写入CommitLog中时,ConsumeQueue和IndexFile文件中都会实时更新,保证了消费者可以准确地快速查找到自己消费的偏移量。

以上就是RocketMQ ConsumeQueue与IndexFile实时更新机制的源码解析,其中包含了两条示例,帮助大家更加深入地理解RocketMQ消息中间件的内部实现机制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Spring 整合 MyBatis的实现步骤

    当我们要在Spring中使用MyBatis时,我们需要完成以下几个步骤。在这里,我将详细讲解整合步骤及相关示例。 1.添加依赖项 第一步是将必要的依赖项添加到项目中。我们需要添加Spring和MyBatis的依赖项,以及与他们相关的数据库驱动。这里我给出一个示例的pom.xml文件。 <dependencies> <!– Spring -…

    Java 2023年6月3日
    00
  • Java使用动态规划算法思想解决背包问题

    Java 使用动态规划算法思想解决背包问题 什么是动态规划算法 动态规划(Dynamic Programming)是一种解决多阶段决策问题的优化方法。它将问题分解为多个阶段,并针对每个阶段进行决策。每个阶段的决策将会影响后续的阶段,因此需要对每个阶段进行全局最优化的考虑,以确保最终的结果是最优的。 背包问题 背包问题(Knapsack Problem)是常见…

    Java 2023年5月19日
    00
  • 更改MySQL数据库的编码为utf8mb4问题

    更改MySQL数据库的编码为utf8mb4需要经历以下几个步骤: 1. 检查MySQL数据库当前编码 在终端或命令行中运行以下命令: mysql -u 用户名 -p 接着输入你的密码登录MySQL数据库,然后执行以下查询语句检查当前数据库编码: SHOW VARIABLES LIKE ‘%character%’; 2. 备份MySQL数据库 在进行更改编码之…

    Java 2023年5月20日
    00
  • Tomcat搭建本地服务器的图文教程

    Tomcat搭建本地服务器的完整攻略 什么是Tomcat Tomcat是一种开源的Web应用服务器,可实现Java Servlet、JavaServer Page和Java WebSocket技术。其内核实现了Java Servlet 和 JavaServer Page 规范, 作为Web服务器可以处理静态页面, 还可以扩展Servlet来处理动态内容。 如…

    Java 2023年5月19日
    00
  • Java如何获取Date的“昨天”与“明天”示例代码

    获取Date的“昨天”与“明天”可以通过以下步骤实现: 步骤一:获取当前日期 首先,我们需要获取当前的日期。Java中可以使用java.util.Date类来表示日期时间。可以通过new Date()方法获取到当前的日期: Date today = new Date(); 步骤二:计算“昨天” 要计算“昨天”,我们需要通过java.util.Calendar…

    Java 2023年5月20日
    00
  • Java文件基本操作总结

    下面我将详细讲解Java文件基本操作总结的完整攻略。 概述 Java是一种广泛使用的编程语言。在一个Java程序中,文件是很重要的组成部分,因为它包含了编程人员所书写的代码。在这篇攻略中,我们将介绍Java文件的基本操作。 基本操作 1. 文件的读取 Java文件读取有两种方式:字符流和字节流。其中,字符流主要用于读取文本文件,可以逐个字符读取。而字节流则用…

    Java 2023年5月20日
    00
  • 使用json字符串插入节点或者覆盖节点

    使用json字符串插入节点或者覆盖节点的过程可以分为以下几个步骤: 将json字符串解析为json对象 根据需要插入或覆盖的节点,生成新的json节点 将新的json节点插入或覆盖到目标json对象中 将最终结果转换为json字符串 下面通过两个示例说明具体的操作过程。 示例1:插入节点 假设原始的json字符串为: { "name": …

    Java 2023年5月26日
    00
  • JSP中c:foreach遍历和s:iterator遍历异同实例分析

    JSP中有两种常用的集合遍历方式:c:foreach和s:iterator。它们都可用于遍历Java集合对象,但在使用上有一些异同点。 c:foreach遍历 c:foreach是JSTL的核心标签库之一,提供了一种简化集合遍历的方法。它的语法如下: <c:forEach var="item" items="${colle…

    Java 2023年6月15日
    00
合作推广
合作推广
分享本页
返回顶部