Netty事件循环主逻辑NioEventLoop的run方法分析

yizhihongxing

Netty是一个基于NIO的高性能网络编程框架,它采用了Reactor模式和异步非阻塞IO模型,致力于提供简洁、易用的API和高效、稳定的性能。其中,核心模块之一就是事件循环(EventLoop),它是Netty高性能、高吞吐量的关键所在。本文将详细讲解Netty事件循环主逻辑NioEventLoop的run方法分析。

概述

Netty的事件循环(EventLoop)是一个线程,它不断从Channel的任务队列中取出任务执行,包括I/O事件的处理、定时任务等。在Netty中,每一个Channel都会分配一条专门的EventLoop线程,EventLoop是整个Netty框架中最核心的部分之一。

在Netty的NIOEventLoopGroup中,每个EventLoop通过一个无限循环的机制(即run方法)来监听注册在其中的Channel的IO事件和任务队列,具体过程如下:

  • 首先,EventLoop会调用其内部Selector的select方法进行阻塞等待IO事件的发生。
  • 当Channel上有感兴趣的IO事件发生时,EventLoop会响应并从内部Selector的selectedKeys集合中取出对应的SelectionKey,分别处理读、写、连接、关闭等各种类型的事件。
  • 如果任务队列中有待处理的任务,EventLoop将立即执行队列中的任务,例如执行注册Channel、Channel关闭、用户提交的自定义任务等。
  • 如果任务队列中没有待处理的任务,则EventLoop会进入下一次select阻塞等待IO事件的发生。

值得注意的是,如果两个Channel注册在同一个EventLoop上,那么它们会共享这个EventLoop的Selector,也就是说,一个Channel上的IO事件触发,可能会影响另一个Channel,此时EventLoop的任务处理必须尽快完成,否则将影响其他Channel上的IO事件响应时间,进而降低整个系统的性能。

具体分析

了解了EventLoop线程的基本运行机制后,我们来进一步分析其核心源码,探究其实现细节。具体可参考NioEventLoop源代码,以下是其中的核心方法run:

/**
 * Main event loop that is used to handle I/O and scheduled tasks for a {@link NioEventLoop}.
 */
@Override
protected void run() {
    for (;;) {
        try {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { // 第一步
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        selector.select(); // 第二步 阻塞 在此等待IO事件
                        break;
                    case SelectStrategy.SELECT_NOW:
                        selector.selectNow();
                        break;
                    case SelectStrategy.SELECT_WITH_FIX_RATE:
                        selectWithSimpleRate(selectNowSupplier, true);
                        break;
                    case SelectStrategy.SELECT_WITH_FIX_DELAY:
                        selectWithSimpleDelay(selectNowSupplier, true);
                        break;
                    default:
                        throw new Error();
                }
            } finally {
                cancelledKeys = 0;
                needsToSelectAgain = false;
            }

            ++selectCnt;
            cancelledKeys = 0;
            needsToSelectAgain = false;

            processSelectedKeys(); // 第三步 处理IO事件

            if (preventWakeup && wakeupCounter.getAndSet(-1) > 0) {
                wakeup(true);
            }

            runAllTasks(); // 第四步 处理任务队列

        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

上述的run方法包含了以下四个步骤:

1. calculateStrategy方法

该方法是计算下一步选择策略的方法,具体实现是通过调用SelectStrategy的calculateStrategy方法计算得出,方法的内部实现并不复杂,它主要根据当前有无任务需要处理以及是否有IO事件需要处理来决定下一步的策略。例如,如果可能存在待处理的任务,则调用selectNow方法进行轮询,否则则调用select方法进行阻塞等待IO事件的发生。

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
    case SelectStrategy.CONTINUE:
        continue;
    case SelectStrategy.SELECT:
        selector.select(); // 阻塞 在此等待IO事件
        break;
    case SelectStrategy.SELECT_NOW:
        selector.selectNow();
        break;
    case SelectStrategy.SELECT_WITH_FIX_RATE:
        selectWithSimpleRate(selectNowSupplier, true);
        break;
    case SelectStrategy.SELECT_WITH_FIX_DELAY:
        selectWithSimpleDelay(selectNowSupplier, true);
        break;
    default:
        throw new Error();
}

2. selector.select方法

该方法是阻塞当前EventLoop线程,等待感兴趣的IO事件的发生。具体实现是通过Java NIO提供的Selector.select方法来进行等待。

selector.select(); // 阻塞 在此等待IO事件

3. processSelectedKeys方法

该方法是处理IO事件的主要方法,它会遍历注册在当前EventLoop上的所有Channel,并对已发生的IO事件进行处理。例如,如果当前Channel上的连接事件已经被触发,则会调用ChannelPipeline的fireChannelActive方法来通知Pipeline上的各个Handler处理连接事件。整个processSelectedKeys方法的实现非常复杂,主要包括以下几个步骤:

  • 从Selector.selectedKeys集合中取出当前已经发生的IO事件。
  • 遍历选中的SelectionKey集合,并根据其类型分别进行处理,例如连接事件、读事件、写事件、关闭事件等。
  • 如果当前Channel还有未处理完的IO事件,那么将该Channel对应的SelectionKey重新加入到Selector的selectedKeys集合中,以便下一次检测。
private void processSelectedKeys() {
    readyKeys.clear();
    SelectorUtil.select(selector, readyKeys, null, 1000); // limit = 1000

    for (int i = 0; i < readyKeys.size(); ++i) {
        SelectionKey k = readyKeys.get(i);
        Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            // 如果attachment不是Channel,说明此SelectionKey已经被标记为取消
            // 这种情况主要是由于调用了Channel的close方法,但是由于SelectionKey的取消需要在Selector的下一次select操作时才能真正执行,因此需要保存为已取消的状态
            // 此时需要将cancelledKeys计数器加1,以便在下一次select操作时忽略这类失效的SelectionKey
            cancelledKeys++;
        }
    }
}

4. runAllTasks方法

该方法是处理任务队列的主要方法,其实现也比较简单,只需要遍历任务队列,并依次执行即可。

private void runAllTasks() {
    fetchFromScheduledTaskQueue(); // 从ScheduledTaskQueue中取出即将执行的任务
    // 获取并执行普通任务队列中的所有任务
    for (;;) {
        Runnable task = pollTask();
        if (task == null) {
            break;
        }
        task.run();
    }
    lastExecutionTime = ScheduledFutureTask.nanoTime(); // 更新最后一次执行任务的时间戳
}

示例说明

下面对上述内容进行两个示例说明,以更好地理解EventLoop的运行轨迹。

示例一

假设现在有两个Channel分别注册到了同一个EventLoop线程上,此时如果其中一个Channel上的连接事件触发了,那么EventLoop就会调用其相应的ChannelPipeline的fireChannelActive方法通知Pipeline上的Handler来处理该事件,如果此时处理较慢,就会影响整个EventLoop的性能。因此,EventLoop的逻辑必须尽快处理完成对连接事件的响应。因此,在实际应用中,我们要合理控制Channel的注册和分配,使其能够被均匀分配,防止某个Channel过于繁忙而影响其他Channel的响应时间。

示例二

假设现在有一个用户提交了一个Task,并且这个Task的执行需要一定的时间(例如执行一个大文件的读写操作),此时EventLoop线程是不能够一直等待Task的完成的,因为这会导致当前EventLoop阻塞,其他的I/O事件无法及时响应,从而导致整个系统的性能下降。因此,Netty为用户提供了一个自定义的Task类型——ScheduledFutureTask,可以在指定的时间再次将Task放入任务队列中,避免当前的EventLoop被阻塞,示例代码如下:

ScheduledFutureTask<?> futureTask = new ScheduledFutureTask<>(() -> {
    // task执行的具体逻辑
}, 0, TimeUnit.SECONDS);
eventLoop.submit(futureTask);

此时futureTask就会被提交到指定EventLoop线程的任务队列中,并且在当前的EventLoop周期执行完毕后就会执行。而且在执行ScheduledFutureTask的时候,如果当前任务队列中还有其他的Task需要执行,也会及时启动执行。这种方式可以非常有效地避免EventLoop被阻塞的问题,大大提高了应用程序的响应速度和运行效率。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Netty事件循环主逻辑NioEventLoop的run方法分析 - Python技术站

(0)
上一篇 2023年6月13日
下一篇 2023年6月13日

相关文章

  • 通过python爬虫赚钱的方法

    通过Python爬虫赚钱的方法 Python爬虫是一种非常强大的工具,可以帮助我们获取互联网上的各种数据。通过Python爬虫,我们可以获取并分析大量的数据,从而找到商机,实现收益。下面是通过Python爬虫赚钱的一些方法和技巧。 1. 数据商业化 通过Python爬虫可以获取各种各样的数据,我们可以将这些数据进行整理分析,然后将分析报告、行业研究等推广出去…

    python 2023年5月14日
    00
  • Python对Tornado请求与响应的数据处理

    Tornado是一个Python的Web框架,它提供了高效的非阻塞I/O操作,适用于高并发的Web应用程序。在Tornado中,请求和响应的数据处理是非常重要的,本文将介绍Python对Tornado请求与响应的数据处理的完整攻略,包括以下内容: Tornado请求的数据处理 Tornado响应的数据处理 以下是两个示例说明,用于演示Python对Torna…

    python 2023年5月14日
    00
  • 在Python中使用NumPy计算切比雪夫级数的根值

    前置知识 在开始本文所述的计算切比雪夫级数的根值之前,需先了解以下概念: 切比雪夫级数(Chebyshev polynomials) numpy库的基本用法 切比雪夫级数 切比雪夫级数,又称特比雪夫级数,是一组在数学中应用广泛的正交多项式。在数值计算中,这种级数有着重要的应用。切比雪夫级数的经典定义为: T_n(x) = cos(n * arccos(x))…

    python-answer 2023年3月25日
    00
  • 使用Gitee自动化部署python脚本的详细过程

    下面我将详细讲解使用Gitee自动化部署Python脚本的详细过程。 1. 首先,在Gitee上创建仓库并配置SSH key 1.1 在Gitee上创建一个仓库,将需要自动化部署的Python脚本上传至该仓库中。 1.2 在本地生成SSH key,并将公钥部分添加至Gitee的SSH key中,以便在后续操作中使用SSH协议上传代码,并且不用每次操作都输入用…

    python 2023年5月19日
    00
  • Python离线安装PIL 模块的方法

    下面是详细讲解Python离线安装PIL模块的方法的完整攻略: 环境准备 首先需要下载PIL模块的安装包,可以从Pillow官方网站获取。下载地址:https://pypi.org/project/Pillow/#files 离线安装Python,建议使用Anaconda,因为在Anaconda中,可以通过conda这个软件包管理工具来进行离线安装。可在An…

    python 2023年5月14日
    00
  • python3爬虫中多线程的优势总结

    在Python3爬虫中,使用多线程可以提高爬取效率,加快数据获取速度。本文将详细讲解Python3爬虫中多线程的优势,并提供两个示例,演示如何使用Python3多线程爬取数据。 多线程的优势 使用多线程可以提高爬取效率,加快数据获取速度。以下是多线程的优势: 提高效率:多线程可以同时处理多个任务,提高效率。 加快速度:多线程可以同时下载多个文件,加快数据获取…

    python 2023年5月15日
    00
  • 拓扑排序Python实现的过程

    拓扑排序Python实现的过程 拓扑排序是一种常用的有向无环图(DAG)的排序算法,它可以将DAG中的节点按照一定的顺序进行排序。实际应用中,拓扑排序常于任务调度、依赖关系分析等场景。本文将介绍拓扑排序的Python实现过程,并提供两个示例说明。 拓扑排序的实现过程 拓扑排序的实现过程可以分为以下几个步骤: 构建DAG:将有向表示为邻接表或邻接矩阵的形式。 …

    python 2023年5月14日
    00
  • python3处理word文档实例分析

    Python3处理Word文档实例分析 简介 Microsoft Word是一种广泛使用的文字处理软件,常用于编写报告、论文等文档。在Python中,通过使用第三方库python-docx,可以方便地实现Word文档的读写操作。 安装依赖 在进行Python3处理Word文档之前,需要安装第三方库python-docx。可以使用以下命令进行安装: pip i…

    python 2023年6月5日
    00
合作推广
合作推广
分享本页
返回顶部