Netty事件循环主逻辑NioEventLoop的run方法分析

Netty是一个基于NIO的高性能网络编程框架,它采用了Reactor模式和异步非阻塞IO模型,致力于提供简洁、易用的API和高效、稳定的性能。其中,核心模块之一就是事件循环(EventLoop),它是Netty高性能、高吞吐量的关键所在。本文将详细讲解Netty事件循环主逻辑NioEventLoop的run方法分析。

概述

Netty的事件循环(EventLoop)是一个线程,它不断从Channel的任务队列中取出任务执行,包括I/O事件的处理、定时任务等。在Netty中,每一个Channel都会分配一条专门的EventLoop线程,EventLoop是整个Netty框架中最核心的部分之一。

在Netty的NIOEventLoopGroup中,每个EventLoop通过一个无限循环的机制(即run方法)来监听注册在其中的Channel的IO事件和任务队列,具体过程如下:

  • 首先,EventLoop会调用其内部Selector的select方法进行阻塞等待IO事件的发生。
  • 当Channel上有感兴趣的IO事件发生时,EventLoop会响应并从内部Selector的selectedKeys集合中取出对应的SelectionKey,分别处理读、写、连接、关闭等各种类型的事件。
  • 如果任务队列中有待处理的任务,EventLoop将立即执行队列中的任务,例如执行注册Channel、Channel关闭、用户提交的自定义任务等。
  • 如果任务队列中没有待处理的任务,则EventLoop会进入下一次select阻塞等待IO事件的发生。

值得注意的是,如果两个Channel注册在同一个EventLoop上,那么它们会共享这个EventLoop的Selector,也就是说,一个Channel上的IO事件触发,可能会影响另一个Channel,此时EventLoop的任务处理必须尽快完成,否则将影响其他Channel上的IO事件响应时间,进而降低整个系统的性能。

具体分析

了解了EventLoop线程的基本运行机制后,我们来进一步分析其核心源码,探究其实现细节。具体可参考NioEventLoop源代码,以下是其中的核心方法run:

/**
 * Main event loop that is used to handle I/O and scheduled tasks for a {@link NioEventLoop}.
 */
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { // 第一步
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        selector.select(); // 第二步 阻塞 在此等待IO事件
                        break;
                    case SelectStrategy.SELECT_NOW:
                        selector.selectNow();
                        break;
                    case SelectStrategy.SELECT_WITH_FIX_RATE:
                        selectWithSimpleRate(selectNowSupplier, true);
                        break;
                    case SelectStrategy.SELECT_WITH_FIX_DELAY:
                        selectWithSimpleDelay(selectNowSupplier, true);
                        break;
                    default:
                        throw new Error();
                }
            } finally {
                cancelledKeys = 0;
                needsToSelectAgain = false;
            }

            ++selectCnt;
            cancelledKeys = 0;
            needsToSelectAgain = false;

            processSelectedKeys(); // 第三步 处理IO事件

            if (preventWakeup && wakeupCounter.getAndSet(-1) > 0) {
                wakeup(true);
            }

            runAllTasks(); // 第四步 处理任务队列

        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

上述的run方法包含了以下四个步骤:

1. calculateStrategy方法

该方法是计算下一步选择策略的方法,具体实现是通过调用SelectStrategy的calculateStrategy方法计算得出,方法的内部实现并不复杂,它主要根据当前有无任务需要处理以及是否有IO事件需要处理来决定下一步的策略。例如,如果可能存在待处理的任务,则调用selectNow方法进行轮询,否则则调用select方法进行阻塞等待IO事件的发生。

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
        continue;
    case SelectStrategy.SELECT:
        selector.select(); // 阻塞 在此等待IO事件
        break;
    case SelectStrategy.SELECT_NOW:
        selector.selectNow();
        break;
    case SelectStrategy.SELECT_WITH_FIX_RATE:
        selectWithSimpleRate(selectNowSupplier, true);
        break;
    case SelectStrategy.SELECT_WITH_FIX_DELAY:
        selectWithSimpleDelay(selectNowSupplier, true);
        break;
    default:
        throw new Error();
}

2. selector.select方法

该方法是阻塞当前EventLoop线程,等待感兴趣的IO事件的发生。具体实现是通过Java NIO提供的Selector.select方法来进行等待。

selector.select(); // 阻塞 在此等待IO事件

3. processSelectedKeys方法

该方法是处理IO事件的主要方法,它会遍历注册在当前EventLoop上的所有Channel,并对已发生的IO事件进行处理。例如,如果当前Channel上的连接事件已经被触发,则会调用ChannelPipeline的fireChannelActive方法来通知Pipeline上的各个Handler处理连接事件。整个processSelectedKeys方法的实现非常复杂,主要包括以下几个步骤:

  • 从Selector.selectedKeys集合中取出当前已经发生的IO事件。
  • 遍历选中的SelectionKey集合,并根据其类型分别进行处理,例如连接事件、读事件、写事件、关闭事件等。
  • 如果当前Channel还有未处理完的IO事件,那么将该Channel对应的SelectionKey重新加入到Selector的selectedKeys集合中,以便下一次检测。
private void processSelectedKeys() {
    readyKeys.clear();
    SelectorUtil.select(selector, readyKeys, null, 1000); // limit = 1000

    for (int i = 0; i < readyKeys.size(); ++i) {
        SelectionKey k = readyKeys.get(i);
        Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 如果attachment不是Channel,说明此SelectionKey已经被标记为取消
            // 这种情况主要是由于调用了Channel的close方法,但是由于SelectionKey的取消需要在Selector的下一次select操作时才能真正执行,因此需要保存为已取消的状态
            // 此时需要将cancelledKeys计数器加1,以便在下一次select操作时忽略这类失效的SelectionKey
            cancelledKeys++;
        }
    }
}

4. runAllTasks方法

该方法是处理任务队列的主要方法,其实现也比较简单,只需要遍历任务队列,并依次执行即可。

private void runAllTasks() {
    fetchFromScheduledTaskQueue(); // 从ScheduledTaskQueue中取出即将执行的任务
    // 获取并执行普通任务队列中的所有任务
    for (;;) {
        Runnable task = pollTask();
        if (task == null) {
            break;
        }
        task.run();
    }
    lastExecutionTime = ScheduledFutureTask.nanoTime(); // 更新最后一次执行任务的时间戳
}

示例说明

下面对上述内容进行两个示例说明,以更好地理解EventLoop的运行轨迹。

示例一

假设现在有两个Channel分别注册到了同一个EventLoop线程上,此时如果其中一个Channel上的连接事件触发了,那么EventLoop就会调用其相应的ChannelPipeline的fireChannelActive方法通知Pipeline上的Handler来处理该事件,如果此时处理较慢,就会影响整个EventLoop的性能。因此,EventLoop的逻辑必须尽快处理完成对连接事件的响应。因此,在实际应用中,我们要合理控制Channel的注册和分配,使其能够被均匀分配,防止某个Channel过于繁忙而影响其他Channel的响应时间。

示例二

假设现在有一个用户提交了一个Task,并且这个Task的执行需要一定的时间(例如执行一个大文件的读写操作),此时EventLoop线程是不能够一直等待Task的完成的,因为这会导致当前EventLoop阻塞,其他的I/O事件无法及时响应,从而导致整个系统的性能下降。因此,Netty为用户提供了一个自定义的Task类型——ScheduledFutureTask,可以在指定的时间再次将Task放入任务队列中,避免当前的EventLoop被阻塞,示例代码如下:

ScheduledFutureTask<?> futureTask = new ScheduledFutureTask<>(() -> {
    // task执行的具体逻辑
}, 0, TimeUnit.SECONDS);
eventLoop.submit(futureTask);

此时futureTask就会被提交到指定EventLoop线程的任务队列中,并且在当前的EventLoop周期执行完毕后就会执行。而且在执行ScheduledFutureTask的时候,如果当前任务队列中还有其他的Task需要执行,也会及时启动执行。这种方式可以非常有效地避免EventLoop被阻塞的问题,大大提高了应用程序的响应速度和运行效率。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Netty事件循环主逻辑NioEventLoop的run方法分析 - Python技术站

(0)
上一篇 2023年6月13日
下一篇 2023年6月13日

相关文章

  • python django下载大的csv文件实现方法分析

    接下来是关于“Python Django下载大的CSV文件实现方法分析”的完整攻略: 一、需求分析 在实际开发中,我们有时会需要从服务器端下载一些数据文件,比如CSV文件。当需要下载大的CSV文件时,网页下载的方式可能会导致内存泄漏,这时候我们需要一种更为高效的实现方法。 二、解决方案 实现高效的下载大的CSV文件的方法主要是使用Python Django框…

    python 2023年6月3日
    00
  • python中对开区间和闭区间的理解

    在 Python 中,我们通常使用两种区间表示法:闭区间表示法和开区间表示法。理解这两种区间表示法对于编写 Python 代码非常重要,下面是一些有关 Python 中对开区间和闭区间的详细讲解: 什么是开区间? 在 Python 中,开区间表示法是指只包括区间的端点之外的所有值。也可以说,它是一个由两个不同的数字组成的集合。这个集合中不包括其中的两个数字。…

    python 2023年6月3日
    00
  • Qt Quick QML-500行代码实现合成大西瓜游戏

    Qt Quick QML-500行代码实现合成大西瓜游戏,是一篇非常好的学习资料。本文将详细讲解如何实现该游戏,并附上两条示例说明。 首先,我们需要了解 QML 的基础知识。QML 是 Qt 平台的一种界面描述语言,它基于 JavaScript 语法,用于描述应用程序的界面和交互行为。在这篇文章中,我们将主要使用 QML 来实现合成大西瓜游戏。 其次,我们需…

    python 2023年5月19日
    00
  • 十个Python程序员易犯的错误

    下面是对“十个Python程序员易犯的错误”进行详细讲解的攻略。 错误1:没有理解Python的作用域 在Python中,作用域是由代码块中的缩进决定的。如果在函数内部定义一个变量,并在函数外尝试访问该变量,将会遇到NameError的错误。 示例: def my_func(): my_var = 10 print(my_var) 输出: NameError…

    python 2023年5月13日
    00
  • Python – 如何在没有特定文件的代码块中检查 PEP8 错误 [重复]

    【问题标题】:Python – How to check PEP8 errors in a chunk of code with no specific file [duplicate]Python – 如何在没有特定文件的代码块中检查 PEP8 错误 [重复] 【发布时间】:2023-04-04 04:14:01 【问题描述】: 我目前在一个电子学习平台上…

    Python开发 2023年4月6日
    00
  • 从0开始的Python学习016异常

    下面是从0开始的Python学习016异常的完整攻略: 异常 1. 什么是异常? 在程序运行中,如果出现了错误,那么这个错误就被称为异常。Python中提供了很多的异常类型,常见的有:ZeroDivisionError(除数为0)、IndexError(索引错误)、TypeError(类型错误)等等。遇到异常时,程序会自动停止,并提示异常信息或者直接崩溃。 …

    python 2023年5月13日
    00
  • Python使用Crypto库实现加密解密的示例详解

    Python使用Crypto库实现加密解密的示例详解 什么是Crypto库 Crypto是一个基于Python的加密工具包。它提供了各种加密算法、随机数生成器以及协议的实现。Crypto中的主要模块有:Cipher、Hash、Protocol、PublicKey、Util、IO。我们可以根据自己的需要选择具体的模块使用。接下来我们将介绍如何使用Crypto库…

    python 2023年5月20日
    00
  • Python实现读取及写入csv文件的方法示例

    下面是Python实现读取及写入CSV文件的方法示例的详细攻略。 CSV文件简介 CSV(Comma Separated Values)即逗号分隔值,是一种常见的文件格式,它可以被Excel等表格处理软件读取和编辑。CSV文件以纯文本形式存储数据,其中每一行数据表示为一行文本,每个数据字段以逗号分隔。 读取CSV文件方法示例 Python提供了csv模块,可…

    python 2023年6月3日
    00
合作推广
合作推广
分享本页
返回顶部