RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ是一个高性能、高可靠、可伸缩、分布式的消息中间件。在消息投递和消费的过程中,RocketMQ的ConsumeQueue与IndexFile起到了至关重要的作用,本篇文章将对其实时更新机制进行源码解析。

ConsumeQueue介绍

ConsumeQueue是RocketMQ用于存储消息消费进度的数据结构,它是一个顺序读写的消息存储结构。在消息消费时,消费者首先从ConsumeQueue中获取消息,并更新自己已经消费的进度。ConsumeQueue分为多个文件存储,每个文件固定大小为1G,文件名为CQxxxxxx,其中xxxxxx为文件偏移量(即存储消息的偏移量)。不同的主题(Topic)下面各自有一个ConsumeQueue。

ConsumeQueue实时更新机制

当有新消息被存储到commitlog中时,RocketMQ会将该消息的偏移量(offset)和消息的存储时间戳放入一个名为TransientStorePool的内存缓冲区中,并将该消息的commitlog文件名和偏移量存入与该主题对应的ConsumeQueue文件中。当TransientStorePool缓冲区满时,RocketMQ会将其中的数据写入磁盘,以保证消息的持久化。

每个ConsumeQueue文件中存储的是该偏移量所在的消息的存储时间戳(ConsumeQueue的索引值)。当消费者进行消息消费时,通过顺序读取ConsumeQueue中的索引值,消费者可以找出自己当前已经消费到的偏移量。

在ConsumeQueue实时更新机制中,当TransientStorePool缓冲区中有数据写入磁盘时,RocketMQ会在内存中构建一个Map,记录下新消息的偏移量和存储时间,并将其缓存到内存中的IndexFile中。IndexFile的大小为40M,RocketMQ会将IndexFile文件位置的offset和消息的偏移量存入前文提到的ConsumeQueue中。因为IndexFile是进程级别的Cache,所以可以加快消费者查找消息的速度。

ConsumeQueue的代码实现很长,在这里提供一个简单实现的示例:

public class ConsumeQueue {
    private final MappedByteBuffer mappedByteBuffer;

    public ConsumeQueue(File file, long mappedFileSize, AtomicInteger logic,
            ConcurrentHashMap<Integer/* mappedFile position */, LongAdder/* physic offset */> indexMap) throws IOException {
        ......
    }

    public long get(long index) {
        return mappedByteBuffer.getLong((int) index * CQ_STORE_UNIT_SIZE);
    }

    public void put(long index, long offsetPy) {
        ......
    }

    public boolean flush(final int flushLeastPages) {
        ......

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        this.mappedByteBuffer.position((int) nextFlushOffset);
        byteBuffer.limit((int) (nextFlushOffset - this.mappedByteBuffer.position()));
        FileChannel fileChannel = fileChannel();
        long writeOffset = this.fileFromOffset + this.mappedByteBuffer.position();// 写入的位置

        int writeBytes = byteBuffer.remaining();
        if (writeBytes > 0) {
            long beginTime = System.currentTimeMillis();
            try {
                fileChannel.write(byteBuffer);
                fileChannel.force(false);
                this.lastMappedFile.setWrotePosition(writeOffset + writeBytes);
                return true;
            } catch (Throwable e) {
                ......
            }
        }
        return false;
    }

    ......//其他方法实现
}

在上述示例中,我们可以看到ConsumeQueue的构造函数接受了一个参数mappedFileSize,该参数表示MappedByteBuffer的大小,MappedByteBuffer是堆外内存映射的ByteBuffer,可以从该映射区域直接访问文件数据,因为 mappedByteBuffer 中的内容已经是文件中的数据,所以不需要进行I/O操作,就可以访问文件中的数据。

ConsumeQueue中在写入新消息完成后,会异步地在内存中创建一个Map对象,按照一定的格式将主题、队列ID、消息偏移量、消息存储时间戳等信息存放到IndexFile中,实现较为复杂,在此不再赘述。

IndexFile介绍

IndexFile是RocketMQ实现快速查找消息偏移量的文件,其由一个个固定大小的Index Slot组成。一个Index Slot是一组包含了message offset和phy offset两个属性的元组,用于快速查找具体消息在Commitlog中的位置。

每个IndexFile中包含多个Index Slot,每个 Index Slot 包含一个对应消息在 CommitLog 中的 offset、物理位置(物理存储) 和所属主题、队列 ID、时间戳等信息。每一个 IndexFile 有一个固定长度,大小默认为40MB。

IndexFile的代码实现也比较复杂,在这里提供一个简单实现的示例:

public class IndexFile {
    private final MappedByteBuffer mappedByteBuffer;
    private final ConcurrentHashMap<String, List<Index>> indexMap;
    private final TreeMap<Long, Long> keyOffsetTable = new TreeMap<Long, Long>();
    private final File file;
    private String fileName;

    public Index append(long endPhyOffset, int idx, long timeStamp, String filterBitMap) {
        ......

        //每个slot存5个值,4个long值和一段byte数组
        //offset = 0   第一个long值,消息偏移量
        //offset = 8   第二个long值,消息物理偏移量
        //offset = 16  第三个long值,消息提交时间戳
        //offset = 24  第四个long值,消息在Commit Log中的长度
        //offset = 32  Filter信息

        // 将Index信息写入到mappedByteBuffer
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE, offset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 8, endPhyOffset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 16, timeStamp);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 24, 0);
        mappedByteBuffer.putInt(nextIndex * INDEX_SIZE + 32, filterBitMap.getBytes().length);
        mappedByteBuffer.put(filterBitMap.getBytes());

        ......

        return index;
    }

    public Index lookup(final long phyOffset, final int maxSearchKey) {
        ......

        for (Map.Entry<Long, Long> next : keyOffsetTable.entrySet()) {
            if (next.getKey() >= phyOffset) {
                //获取消息索引信息
                mappedByteBuffer.position((int) (next.getValue() & this.indexFileMappedByteBufferMaxSizeMask));
                for (int i = 0; i < maxSearchKey; i++) {
                    final long offsetPy = this.indexHeaderSize + (i * INDEX_SIZE);
                    long tmp = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy) & this.indexFileMappedByteBufferMaxSizeMask);
                    if (tmp < phyOffset) { 
                        continue;
                    }

                    long storeTime = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy + 16) & this.indexFileMappedByteBufferMaxSizeMask);
                    return new Index(next.getKey(), tmp, storeTime, i);
                }
                break;
            }
        }
        return null;
    }

    ......//其他方法实现
}

在该示例中,我们可以看到在IndexFile中,Index Slot的所有信息被写入到了mappedByteBuffer中,组成一个Map,用于较快的获取对应的偏移量。同时,IndexFile还维护了一个TreeMap,将偏移量按顺序存储。

示例

假设我们有一个名为RocketMQTest的主题,我们将消息的key、value、tag分别设置为"key1"、"value1"和"tag1"。我们可以通过以下代码来进行该主题下的生产和消费:

public class RocketMQSample {
    public static void main(String[] args) {
        try {
            //生产者
            DefaultMQProducer producer = new DefaultMQProducer("producer-group");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            //消息主题
            String topic = "RocketMQTest";
            Message message = new Message(topic, "tag1", "key1", "value1".getBytes());
            SendResult sendResult = producer.send(message);

            System.out.println("同步发送消息:" + sendResult);

            //消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.println(Thread.currentThread().getName() + " 消费消息:" + msgs.get(0).getKeys() + ":" + new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

            consumer.start();
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在生产者发送完消息后,我们可以通过BrokerController类中的doDispatch方法查看ConsumeQueue和IndexFile的实时更新情况:

public class BrokerController {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, IndexFile>> indexFileTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, MappedFileQueue>> mappedFileQueueTable =
            new ConcurrentHashMap<>(1024);

    ......

    private boolean doDispatch(final ChannelHandlerContext ctx, final RemotingCommand request,
                               final RemotingResponseCallback responseCallback) {
        ......

        // 添加到 Commit Log 文件
        final byte[] body = request.getBody();
        final long offset = commitLog.putMessage(body);
        assert offset > 0;
        resultData.setStatus(SendStatus.SEND_OK);
        resultData.setOffset(offset);

        //这里我们可以查看ConsumeQueue和IndexFile的实时更新情况。
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(msgExt.getTopic());
        if (map != null) {
            ConsumeQueue cq = map.get(msgExt.getQueueId());
            if (cq != null) {
                cq.putMessagePositionInfoWrapper(queueOffset, offset, size, System.currentTimeMillis(), messageExt.getStoreHost());
            }
        }
        ConcurrentHashMap<Integer, IndexFile> indexMap = indexFileTable.get(msgExt.getTopic());
        if (indexMap != null) {
            IndexFile ifile = indexMap.get(msgExt.getQueueId());
            if (ifile != null) {
                ifile.update(indexMsgTimestamp, offset);
            }
        }

        ......

        return true;
    }

    ......//其他方法实现
}

从代码的执行过程中,我们可以看到当有新的消息被写入CommitLog中时,ConsumeQueue和IndexFile文件中都会实时更新,保证了消费者可以准确地快速查找到自己消费的偏移量。

以上就是RocketMQ ConsumeQueue与IndexFile实时更新机制的源码解析,其中包含了两条示例,帮助大家更加深入地理解RocketMQ消息中间件的内部实现机制。

阅读剩余 79%

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • 基于Java中进制的转换函数详解

    基于Java中进制的转换函数详解 什么是进制? 进制是数学中一种表示数值大小的方法,常见的进制有10进制、2进制、16进制等。 在计算机科学中,2进制是最常用的进制,因为计算机仅能识别0和1,而所有的数值、字符、图片等都可以用二进制表示。 Java中进制转换函数 Java中提供了许多用于进制转换的函数,如下:- Integer.parseInt(String…

    Java 2023年5月26日
    00
  • JSP spring boot / cloud 使用filter防止XSS

    下面是详细的JSP Spring Boot/Cloud使用Filter防止XSS的攻略: 什么是XSS攻击 XSS(Cross Site Scripting)攻击是一种非常常见的互联网应用程序攻击,攻击者通过注入恶意脚本代码,在受害者的浏览器中运行这些脚本,从而窃取用户的个人信息或者执行其他危险操作。该攻击方式十分危险,能够威胁到用户的隐私和信息安全,目前已…

    Java 2023年6月15日
    00
  • Javabean简介_动力节点Java学院整理

    Javabean简介:动力节点Java学院整理 什么是Javabean? Javabean是Java语言写成的、可重用的组成部分。它们实际上是简单的Java类,其中包括了表达业务层概念的属性和方法。Javabean对外暴露一个无参构造函数,并且使用一定的规范来描述它的属性和方法 Javabean命名规范 Javabean命名一般采用驼峰式的命名方式 Java…

    Java 2023年6月15日
    00
  • springboot通过注解、接口创建定时任务详解

    让我详细讲解一下关于“springboot通过注解、接口创建定时任务”的完整攻略。 一、创建定时任务需要的依赖 在项目中,需要导入以下依赖: <!– Spring Boot定时器需要的依赖 –> <dependency> <groupId>org.springframework.boot</groupId>…

    Java 2023年5月19日
    00
  • 基于jfreechart生成曲线、柱状等图片并展示到JSP

    生成曲线、柱状图等图片并展示到 JSP 页面是很常见的需求,而 JFreeChart 是一款 Java 的图表组件库,可以帮助我们轻松地生成各种类型的图表。下面是基于 JFreeChart 生成曲线、柱状等图片并展示到 JSP 的攻略: 1. 引入 JFreeChart 库和相关依赖 在项目中引入 JFreeChart 库和相关依赖。可以在 Maven 项目…

    Java 2023年6月15日
    00
  • Java的递归算法详解

    Java的递归算法详解 什么是递归算法? 递归算法是指在函数中调用自身实现的一种算法思想。使用递归可以大大简化代码实现,提高代码可读性和代码质量。 递归算法的特点 递归算法需要有边界条件(也称为递归结束条件),以避免无限循环调用自身而导致栈溢出等问题。 递归算法要求问题能够分解成与原问题同类型的子问题,且子问题的求解可以通过递归调用自身来实现。 递归算法在实…

    Java 2023年5月19日
    00
  • 解决mybatis-plus 查询耗时慢的问题

    当使用MyBatis-Plus进行查询时,如果存在查询耗时慢的问题,我们可以通过以下方式进行优化: 1. 添加索引优化 在进行查询时,若存在大表或多表关联的情况,可以考虑通过添加索引来优化查询速度。具体可以通过以下方式操作: 1.1. 加速索引扫描 当查询条件中包含索引列时,MySQL会尝试使用索引扫描,但当行数非常大时,索引扫描比全表扫描更慢。此时可以通过…

    Java 2023年6月16日
    00
  • Java 设计模式中的策略模式详情

    Java 设计模式中的策略模式 策略模式基础概念 策略模式是一种行为型设计模式,它能让你定义一些算法并将其封装到具有公共接口的独立类中。由于所有策略类都实现了相同的接口,因此它们可以自由地相互替换。 策略模式的结构 策略模式的核心在于定义一个策略接口(Istrategy),所有的算法都实现这个接口;然后定义一个上下文类(Context),这个上下文类有一个策…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部