RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

yizhihongxing

RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

RocketMQ是一个高性能、高可靠、可伸缩、分布式的消息中间件。在消息投递和消费的过程中,RocketMQ的ConsumeQueue与IndexFile起到了至关重要的作用,本篇文章将对其实时更新机制进行源码解析。

ConsumeQueue介绍

ConsumeQueue是RocketMQ用于存储消息消费进度的数据结构,它是一个顺序读写的消息存储结构。在消息消费时,消费者首先从ConsumeQueue中获取消息,并更新自己已经消费的进度。ConsumeQueue分为多个文件存储,每个文件固定大小为1G,文件名为CQxxxxxx,其中xxxxxx为文件偏移量(即存储消息的偏移量)。不同的主题(Topic)下面各自有一个ConsumeQueue。

ConsumeQueue实时更新机制

当有新消息被存储到commitlog中时,RocketMQ会将该消息的偏移量(offset)和消息的存储时间戳放入一个名为TransientStorePool的内存缓冲区中,并将该消息的commitlog文件名和偏移量存入与该主题对应的ConsumeQueue文件中。当TransientStorePool缓冲区满时,RocketMQ会将其中的数据写入磁盘,以保证消息的持久化。

每个ConsumeQueue文件中存储的是该偏移量所在的消息的存储时间戳(ConsumeQueue的索引值)。当消费者进行消息消费时,通过顺序读取ConsumeQueue中的索引值,消费者可以找出自己当前已经消费到的偏移量。

在ConsumeQueue实时更新机制中,当TransientStorePool缓冲区中有数据写入磁盘时,RocketMQ会在内存中构建一个Map,记录下新消息的偏移量和存储时间,并将其缓存到内存中的IndexFile中。IndexFile的大小为40M,RocketMQ会将IndexFile文件位置的offset和消息的偏移量存入前文提到的ConsumeQueue中。因为IndexFile是进程级别的Cache,所以可以加快消费者查找消息的速度。

ConsumeQueue的代码实现很长,在这里提供一个简单实现的示例:

public class ConsumeQueue {
    private final MappedByteBuffer mappedByteBuffer;

    public ConsumeQueue(File file, long mappedFileSize, AtomicInteger logic,
            ConcurrentHashMap<Integer/* mappedFile position */, LongAdder/* physic offset */> indexMap) throws IOException {
        ......
    }

    public long get(long index) {
        return mappedByteBuffer.getLong((int) index * CQ_STORE_UNIT_SIZE);
    }

    public void put(long index, long offsetPy) {
        ......
    }

    public boolean flush(final int flushLeastPages) {
        ......

        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        this.mappedByteBuffer.position((int) nextFlushOffset);
        byteBuffer.limit((int) (nextFlushOffset - this.mappedByteBuffer.position()));
        FileChannel fileChannel = fileChannel();
        long writeOffset = this.fileFromOffset + this.mappedByteBuffer.position();// 写入的位置

        int writeBytes = byteBuffer.remaining();
        if (writeBytes > 0) {
            long beginTime = System.currentTimeMillis();
            try {
                fileChannel.write(byteBuffer);
                fileChannel.force(false);
                this.lastMappedFile.setWrotePosition(writeOffset + writeBytes);
                return true;
            } catch (Throwable e) {
                ......
            }
        }
        return false;
    }

    ......//其他方法实现
}

在上述示例中,我们可以看到ConsumeQueue的构造函数接受了一个参数mappedFileSize,该参数表示MappedByteBuffer的大小,MappedByteBuffer是堆外内存映射的ByteBuffer,可以从该映射区域直接访问文件数据,因为 mappedByteBuffer 中的内容已经是文件中的数据,所以不需要进行I/O操作,就可以访问文件中的数据。

ConsumeQueue中在写入新消息完成后,会异步地在内存中创建一个Map对象,按照一定的格式将主题、队列ID、消息偏移量、消息存储时间戳等信息存放到IndexFile中,实现较为复杂,在此不再赘述。

IndexFile介绍

IndexFile是RocketMQ实现快速查找消息偏移量的文件,其由一个个固定大小的Index Slot组成。一个Index Slot是一组包含了message offset和phy offset两个属性的元组,用于快速查找具体消息在Commitlog中的位置。

每个IndexFile中包含多个Index Slot,每个 Index Slot 包含一个对应消息在 CommitLog 中的 offset、物理位置(物理存储) 和所属主题、队列 ID、时间戳等信息。每一个 IndexFile 有一个固定长度,大小默认为40MB。

IndexFile的代码实现也比较复杂,在这里提供一个简单实现的示例:

public class IndexFile {
    private final MappedByteBuffer mappedByteBuffer;
    private final ConcurrentHashMap<String, List<Index>> indexMap;
    private final TreeMap<Long, Long> keyOffsetTable = new TreeMap<Long, Long>();
    private final File file;
    private String fileName;

    public Index append(long endPhyOffset, int idx, long timeStamp, String filterBitMap) {
        ......

        //每个slot存5个值,4个long值和一段byte数组
        //offset = 0   第一个long值,消息偏移量
        //offset = 8   第二个long值,消息物理偏移量
        //offset = 16  第三个long值,消息提交时间戳
        //offset = 24  第四个long值,消息在Commit Log中的长度
        //offset = 32  Filter信息

        // 将Index信息写入到mappedByteBuffer
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE, offset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 8, endPhyOffset);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 16, timeStamp);
        mappedByteBuffer.putLong(nextIndex * INDEX_SIZE + 24, 0);
        mappedByteBuffer.putInt(nextIndex * INDEX_SIZE + 32, filterBitMap.getBytes().length);
        mappedByteBuffer.put(filterBitMap.getBytes());

        ......

        return index;
    }

    public Index lookup(final long phyOffset, final int maxSearchKey) {
        ......

        for (Map.Entry<Long, Long> next : keyOffsetTable.entrySet()) {
            if (next.getKey() >= phyOffset) {
                //获取消息索引信息
                mappedByteBuffer.position((int) (next.getValue() & this.indexFileMappedByteBufferMaxSizeMask));
                for (int i = 0; i < maxSearchKey; i++) {
                    final long offsetPy = this.indexHeaderSize + (i * INDEX_SIZE);
                    long tmp = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy) & this.indexFileMappedByteBufferMaxSizeMask);
                    if (tmp < phyOffset) { 
                        continue;
                    }

                    long storeTime = this.indexFileMappedByteBuffer.getLong((int) (next.getValue() + offsetPy + 16) & this.indexFileMappedByteBufferMaxSizeMask);
                    return new Index(next.getKey(), tmp, storeTime, i);
                }
                break;
            }
        }
        return null;
    }

    ......//其他方法实现
}

在该示例中,我们可以看到在IndexFile中,Index Slot的所有信息被写入到了mappedByteBuffer中,组成一个Map,用于较快的获取对应的偏移量。同时,IndexFile还维护了一个TreeMap,将偏移量按顺序存储。

示例

假设我们有一个名为RocketMQTest的主题,我们将消息的key、value、tag分别设置为"key1"、"value1"和"tag1"。我们可以通过以下代码来进行该主题下的生产和消费:

public class RocketMQSample {
    public static void main(String[] args) {
        try {
            //生产者
            DefaultMQProducer producer = new DefaultMQProducer("producer-group");
            producer.setNamesrvAddr("localhost:9876");
            producer.start();

            //消息主题
            String topic = "RocketMQTest";
            Message message = new Message(topic, "tag1", "key1", "value1".getBytes());
            SendResult sendResult = producer.send(message);

            System.out.println("同步发送消息:" + sendResult);

            //消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
            consumer.setNamesrvAddr("localhost:9876");
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.println(Thread.currentThread().getName() + " 消费消息:" + msgs.get(0).getKeys() + ":" + new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });

            consumer.start();
            System.in.read();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

在生产者发送完消息后,我们可以通过BrokerController类中的doDispatch方法查看ConsumeQueue和IndexFile的实时更新情况:

public class BrokerController {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, IndexFile>> indexFileTable =
            new ConcurrentHashMap<>(1024);
    private final ConcurrentHashMap<String/* topic */, ConcurrentHashMap<Integer/* queueId */, MappedFileQueue>> mappedFileQueueTable =
            new ConcurrentHashMap<>(1024);

    ......

    private boolean doDispatch(final ChannelHandlerContext ctx, final RemotingCommand request,
                               final RemotingResponseCallback responseCallback) {
        ......

        // 添加到 Commit Log 文件
        final byte[] body = request.getBody();
        final long offset = commitLog.putMessage(body);
        assert offset > 0;
        resultData.setStatus(SendStatus.SEND_OK);
        resultData.setOffset(offset);

        //这里我们可以查看ConsumeQueue和IndexFile的实时更新情况。
        ConcurrentHashMap<Integer, ConsumeQueue> map = consumeQueueTable.get(msgExt.getTopic());
        if (map != null) {
            ConsumeQueue cq = map.get(msgExt.getQueueId());
            if (cq != null) {
                cq.putMessagePositionInfoWrapper(queueOffset, offset, size, System.currentTimeMillis(), messageExt.getStoreHost());
            }
        }
        ConcurrentHashMap<Integer, IndexFile> indexMap = indexFileTable.get(msgExt.getTopic());
        if (indexMap != null) {
            IndexFile ifile = indexMap.get(msgExt.getQueueId());
            if (ifile != null) {
                ifile.update(indexMsgTimestamp, offset);
            }
        }

        ......

        return true;
    }

    ......//其他方法实现
}

从代码的执行过程中,我们可以看到当有新的消息被写入CommitLog中时,ConsumeQueue和IndexFile文件中都会实时更新,保证了消费者可以准确地快速查找到自己消费的偏移量。

以上就是RocketMQ ConsumeQueue与IndexFile实时更新机制的源码解析,其中包含了两条示例,帮助大家更加深入地理解RocketMQ消息中间件的内部实现机制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Java详细讲解分析双指针法的使用

    Java详细讲解分析双指针法的使用 双指针法是一种常见的解决数组或链表中遍历查找的算法。其核心思想是使用两个指针,分别从不同的方向或位置同时开始遍历数组或链表,通过相对移动指针位置来达到某种目的。本文将为你详细讲解Java中如何使用双指针法。 双指针法的种类 双指针法有多种不同的应用场景。下面列举了常见的几种种类: 快慢指针法:用于解决一些链表中的问题,例如…

    Java 2023年5月26日
    00
  • java连接Oracle数据库的方法解析

    下面是Java连接Oracle数据库的方法解析的完整攻略。 一、准备工作 1.1 下载Oracle JDBC驱动 在Java连接Oracle数据库之前,需要先下载Oracle JDBC驱动程序,可以前往Oracle官网进行下载。 1.2 配置Oracle环境变量 将Oracle的安装目录加入环境变量中,以便Java程序能够正常连接Oracle数据库。 二、J…

    Java 2023年5月19日
    00
  • 一文带你深入了解Java泛型

    一文带你深入了解Java泛型 什么是Java泛型? Java泛型是Java提供的一个强大的工具,它允许我们在编写代码的时候指定类型参数,从而可以在编译时检测类型安全性,避免了许多运行时错误。另外,通过使用泛型,我们还可以编写更加通用、灵活的代码。Java泛型最早出现在JDK1.5版本中,是Java语言中的一个重要特性。 泛型类和泛型接口 Java泛型既可以应…

    Java 2023年5月26日
    00
  • java使用UDP实现点对点通信

    下面是我为您提供的“java使用UDP实现点对点通信”的攻略。 一、什么是UDP UDP是无连接的传输协议,数据报(Datagram)套接字就是基于UDP协议实现的,它不会像TCP那样保证数据传输的可靠性,传输的数据包也不要求应答。但是,UDP具备比TCP更快的传输速度和更小的网络开销,因此,当需要高效传输数据时,可以选择UDP协议。 二、使用UDP实现点对…

    Java 2023年5月20日
    00
  • C#动态webservice调用接口

    下面我将为您详细讲解“C#动态webservice调用接口”的完整攻略。 1. 确认webservice的接口地址和方法名称 在使用新的webservice之前,必须确定它的接口地址和方法名称。可以通过与webservice API的提供者沟通或查看文档进行确认。通常情况下,webservice的接口地址以.asmx文件或.svc文件结尾。 2. 创建web…

    Java 2023年5月19日
    00
  • SpringBoot自定义启动器Starter流程详解

    Spring Boot自定义启动器Starter是一种将多个依赖项打包成一个单独的依赖项的方式,以便在应用程序中轻松引入和配置这些依赖项。以下是Spring Boot自定义启动器Starter的详细攻略: 创建Starter项目 首先,我们需要创建一个Maven项目,并将其打包成一个jar文件。在项目中,我们需要创建一个名为“spring-boot-star…

    Java 2023年5月15日
    00
  • Markdown基本语法

    Markdown 基本语法介绍 Markdown 是一种轻量级的标记语言,常用于编写文档和博客文章。它简单易学,具有清晰的结构和格式化效果,是非常适合写作和发布内容的工具。下面我们来介绍一些 Markdown 基本语法。 1. 标题 在 Markdown 中,可以使用 # 符号表示标题,一级标题使用一个 # 符号,二级标题使用两个 # 符号,以此类推,最多支…

    Java 2023年4月30日
    00
  • idea如何配置javafxsdk详细教程

    下面我将给出详细讲解“IDEA如何配置JavaFX SDK”的完整攻略。 1. 下载JavaFX SDK 首先,我们需要下载JavaFX SDK,并解压到一个方便查找的目录中。可以通过以下链接下载:JavaFX SDK。 2. 配置IDEA 2.1 配置项目 打开IDEA,创建一个新项目,选择JavaFX模板,设置项目名称和保存路径。然后在“Project …

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部