RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ是一个高性能、高可靠、可伸缩、分布式的消息中间件。在消息投递和消费的过程中,RocketMQ的ConsumeQueue与IndexFile起到了至关重要的作用,本篇文章将对其实时更新机制进行源码解析。

ConsumeQueue介绍

ConsumeQueue是RocketMQ用于存储消息消费进度的数据结构,它是一个顺序读写的消息存储结构。在消息消费时,消费者首先从ConsumeQueue中获取消息,并更新自己已经消费的进度。ConsumeQueue分为多个文件存储,每个文件固定大小为1G,文件名为CQxxxxxx,其中xxxxxx为文件偏移量(即存储消息的偏移量)。不同的主题(Topic)下面各自有一个ConsumeQueue。

ConsumeQueue实时更新机制

当有新消息被存储到commitlog中时,RocketMQ会将该消息的偏移量(offset)和消息的存储时间戳放入一个名为TransientStorePool的内存缓冲区中,并将该消息的commitlog文件名和偏移量存入与该主题对应的ConsumeQueue文件中。当TransientStorePool缓冲区满时,RocketMQ会将其中的数据写入磁盘,以保证消息的持久化。

每个ConsumeQueue文件中存储的是该偏移量所在的消息的存储时间戳(ConsumeQueue的索引值)。当消费者进行消息消费时,通过顺序读取ConsumeQueue中的索引值,消费者可以找出自己当前已经消费到的偏移量。

在ConsumeQueue实时更新机制中,当TransientStorePool缓冲区中有数据写入磁盘时,RocketMQ会在内存中构建一个Map,记录下新消息的偏移量和存储时间,并将其缓存到内存中的IndexFile中。IndexFile的大小为40M,RocketMQ会将IndexFile文件位置的offset和消息的偏移量存入前文提到的ConsumeQueue中。因为IndexFile是进程级别的Cache,所以可以加快消费者查找消息的速度。

ConsumeQueue的代码实现很长,在这里提供一个简单实现的示例:

public class ConsumeQueue {
    private final MappedByteBuffer mappedByteBuffer;

    public ConsumeQueue(File file, long mappedFileSize, AtomicInteger logic,
            ConcurrentHashMap<Integer/* mappedFile position */, LongAdder/* physic offset */> indexMap) throws IOException {
        ......
    }

    public long get(long index) {
        return mappedByteBuffer.getLong((int) index * CQ_STORE_UNIT_SIZE);
    }

    public void put(long index, long offsetPy) {
        ......
    }

    public boolean flush(final int flushLeastPages) {
        ......

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        this.mappedByteBuffer.position((int) nextFlushOffset);
        byteBuffer.limit((int) (nextFlushOffset - this.mappedByteBuffer.position()));
        FileChannel fileChannel = fileChannel();
        long writeOffset = this.fileFromOffset + this.mappedByteBuffer.position();// 写入的位置

        int writeBytes = byteBuffer.remaining();
        if (writeBytes > 0) {
            long beginTime = System.currentTimeMillis();
            try {
                fileChannel.write(byteBuffer);
                fileChannel.force(false);
                this.lastMappedFile.setWrotePosition(writeOffset + writeBytes);
                return true;
            } catch (Throwable e) {
                ......
            }
        }
        return false;
    }

    ......//其他方法实现
}

在上述示例中,我们可以看到ConsumeQueue的构造函数接受了一个参数mappedFileSize,该参数表示MappedByteBuffer的大小,MappedByteBuffer是堆外内存映射的ByteBuffer,可以从该映射区域直接访问文件数据,因为 mappedByteBuffer 中的内容已经是文件中的数据,所以不需要进行I/O操作,就可以访问文件中的数据。

ConsumeQueue中在写入新消息完成后,会异步地在内存中创建一个Map对象,按照一定的格式将主题、队列ID、消息偏移量、消息存储时间戳等信息存放到IndexFile中,实现较为复杂,在此不再赘述。

IndexFile介绍

IndexFile是RocketMQ实现快速查找消息偏移量的文件,其由一个个固定大小的Index Slot组成。一个Index Slot是一组包含了message offset和phy offset两个属性的元组,用于快速查找具体消息在Commitlog中的位置。

每个IndexFile中包含多个Index Slot,每个 Index Slot 包含一个对应消息在 CommitLog 中的 offset、物理位置(物理存储) 和所属主题、队列 ID、时间戳等信息。每一个 IndexFile 有一个固定长度,大小默认为40MB。

IndexFile的代码实现也比较复杂,在这里提供一个简单实现的示例:

public class IndexFile {
    private final MappedByteBuffer mappedByteBuffer;
    private final ConcurrentHashMap<String, List<Index>> indexMap;
    private final TreeMap<Long, Long> keyOffsetTable = new TreeMap<Long, Long>();
    private final File file;
    private String fileName;

    public Index append(long endPhyOffset, int idx, long timeStamp, String filterBitMap) {
        ......

        //每个slot存5个值,4个long值和一段byte数组
        //offset = 0   第一个long值,消息偏移量
        //offset = 8   第二个long值,消息物理偏移量
        //offset = 16  第三个long值,消息提交时间戳
        //offset = 24  第四个long值,消息在Commit Log中的长度
        //offset = 32  Filter信息

        // 将Index信息写入到mappedByteBuffer
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE, offset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 8, endPhyOffset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 16, timeStamp);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 24, 0);
        mappedByteBuffer.putInt(nextIndex * INDEX_SIZE + 32, filterBitMap.getBytes().length);
        mappedByteBuffer.put(filterBitMap.getBytes());

        ......

        return index;
    }

    public Index lookup(final long phyOffset, final int maxSearchKey) {
        ......

        for (Map.Entry<Long, Long> next : keyOffsetTable.entrySet()) {
            if (next.getKey() >= phyOffset) {
                //获取消息索引信息
                mappedByteBuffer.position((int) (next.getValue() & this.indexFileMappedByteBufferMaxSizeMask));
                for (int i = 0; i < maxSearchKey; i++) {
                    final long offsetPy = this.indexHeaderSize + (i * INDEX_SIZE);
                    long tmp = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy) & this.indexFileMappedByteBufferMaxSizeMask);
                    if (tmp < phyOffset) { 
                        continue;
                    }

                    long storeTime = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy + 16) & this.indexFileMappedByteBufferMaxSizeMask);
                    return new Index(next.getKey(), tmp, storeTime, i);
                }
                break;
            }
        }
        return null;
    }

    ......//其他方法实现
}

在该示例中,我们可以看到在IndexFile中,Index Slot的所有信息被写入到了mappedByteBuffer中,组成一个Map,用于较快的获取对应的偏移量。同时,IndexFile还维护了一个TreeMap,将偏移量按顺序存储。

示例

假设我们有一个名为RocketMQTest的主题,我们将消息的key、value、tag分别设置为"key1"、"value1"和"tag1"。我们可以通过以下代码来进行该主题下的生产和消费:

public class RocketMQSample {
    public static void main(String[] args) {
        try {
            //生产者
            DefaultMQProducer producer = new DefaultMQProducer("producer-group");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            //消息主题
            String topic = "RocketMQTest";
            Message message = new Message(topic, "tag1", "key1", "value1".getBytes());
            SendResult sendResult = producer.send(message);

            System.out.println("同步发送消息:" + sendResult);

            //消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.println(Thread.currentThread().getName() + " 消费消息:" + msgs.get(0).getKeys() + ":" + new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

            consumer.start();
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在生产者发送完消息后,我们可以通过BrokerController类中的doDispatch方法查看ConsumeQueue和IndexFile的实时更新情况:

public class BrokerController {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, IndexFile>> indexFileTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, MappedFileQueue>> mappedFileQueueTable =
            new ConcurrentHashMap<>(1024);

    ......

    private boolean doDispatch(final ChannelHandlerContext ctx, final RemotingCommand request,
                               final RemotingResponseCallback responseCallback) {
        ......

        // 添加到 Commit Log 文件
        final byte[] body = request.getBody();
        final long offset = commitLog.putMessage(body);
        assert offset > 0;
        resultData.setStatus(SendStatus.SEND_OK);
        resultData.setOffset(offset);

        //这里我们可以查看ConsumeQueue和IndexFile的实时更新情况。
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(msgExt.getTopic());
        if (map != null) {
            ConsumeQueue cq = map.get(msgExt.getQueueId());
            if (cq != null) {
                cq.putMessagePositionInfoWrapper(queueOffset, offset, size, System.currentTimeMillis(), messageExt.getStoreHost());
            }
        }
        ConcurrentHashMap<Integer, IndexFile> indexMap = indexFileTable.get(msgExt.getTopic());
        if (indexMap != null) {
            IndexFile ifile = indexMap.get(msgExt.getQueueId());
            if (ifile != null) {
                ifile.update(indexMsgTimestamp, offset);
            }
        }

        ......

        return true;
    }

    ......//其他方法实现
}

从代码的执行过程中,我们可以看到当有新的消息被写入CommitLog中时,ConsumeQueue和IndexFile文件中都会实时更新,保证了消费者可以准确地快速查找到自己消费的偏移量。

以上就是RocketMQ ConsumeQueue与IndexFile实时更新机制的源码解析,其中包含了两条示例,帮助大家更加深入地理解RocketMQ消息中间件的内部实现机制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • java实现dijkstra最短路径寻路算法

    下面是Java实现Dijkstra最短路径寻路算法的完整攻略: 什么是Dijkstra最短路径寻路算法 Dijkstra算法是一种可以求解带权重图(有向或无向)中的最短路径的算法。该算法要求图的权重为非负值。 Dijkstra算法实现思路 首先我们需要初始化:所有点的到起点的距离为无穷大,但起点到自己的距离为0。 然后从起点开始,将起点标记为已访问过,并将其…

    Java 2023年5月19日
    00
  • JSONObject使用方法详解

    JSONObject使用方法详解 什么是JSONObject? JSONObject是Java中的JSON处理库之一,它提供了一些方法来创建,解析和操作JSON数据。它是一个无序的键值对集合,其中的键唯一且不可重复,值可以是任意类型的数据,包括其他JSONObject和JSONArray实例。 JSONObject的用法 创建JSONObject对象 可以使…

    Java 2023年5月26日
    00
  • php URL跳转代码 减少外链

    PHP URL跳转代码用于将一个URL重定向到另一个URL,可以帮助网站管理者减少外链,从而提高网站的安全性,避免了在跨域请求过程中被注入恶意代码的风险。下面将为你详细讲解如何使用PHP URL跳转代码来减少外链。 准备工作 在进行URL跳转之前,你需要知道一些准备工作。首先,需要安装一台web服务器,比如说Apache。其次,需要安装PHP,以便在PHP代…

    Java 2023年6月16日
    00
  • java 实现局域网文件传输的实例

    下面是 “Java实现局域网文件传输的实例” 的完整攻略: 1. 确定文件传输协议 Java实现局域网文件传输,首先需要确定文件传输所采用的协议。常用的有TCP、UDP和HTTP等协议。 在这里我们采用TCP协议,因为TCP协议是可靠的传输协议,可以确保数据传输的正确性和完整性。 2. 客户端编写 客户端需要完成以下步骤:1. 创建Socket对象,指定服务…

    Java 2023年5月19日
    00
  • Spring Security过滤器链加载执行流程源码解析

    针对Spring Security过滤器链加载执行流程源码解析的完整攻略,我把它分为以下几个部分: 概述 Spring Security过滤器链的加载流程 Spring Security过滤器链的执行流程 示例1:启动时访问静态资源 示例2:访问受保护资源 下面对每个部分进行详细讲解。 1. 概述 Spring Security是一个基于Spring框架的安…

    Java 2023年5月20日
    00
  • 从最基本的Java工程搭建SpringMVC+SpringDataJPA+Hibernate

    下面我将详细讲解“从最基本的Java工程搭建SpringMVC+SpringDataJPA+Hibernate”的完整攻略。 前置要求 在正式进行搭建之前,需要确保你已经安装配置好以下软件: JDK Maven Tomcat IDE(推荐使用IntelliJ IDEA) 步骤一:创建Maven项目 首先,我们需要创建一个Maven项目。在IDE中,找到创建M…

    Java 2023年5月20日
    00
  • 详解Spring Security怎么从数据库加载我们的用户

    下面是详解Spring Security怎么从数据库加载我们的用户的完整攻略。 准备工作 首先,我们需要在项目中引入Spring Security和Spring JDBC的依赖。具体可以在maven中添加如下依赖: <dependency> <groupId>org.springframework.security</group…

    Java 2023年5月20日
    00
  • 基于Spring-Security自定义登陆错误提示信息

    基于Spring-Security自定义登陆错误提示信息的完整攻略如下: 第一步:添加Spring-Security依赖 我们需要在Maven或者Gradle项目中添加Spring-Security依赖,在pom.xml或build.gradle中添加相应的依赖配置,例如: <dependency> <groupId>org.spri…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部