Netty是一个基于NIO的高性能网络编程框架,它采用了Reactor模式和异步非阻塞IO模型,致力于提供简洁、易用的API和高效、稳定的性能。其中,核心模块之一就是事件循环(EventLoop),它是Netty高性能、高吞吐量的关键所在。本文将详细讲解Netty事件循环主逻辑NioEventLoop的run方法分析。
概述
Netty的事件循环(EventLoop)是一个线程,它不断从Channel的任务队列中取出任务执行,包括I/O事件的处理、定时任务等。在Netty中,每一个Channel都会分配一条专门的EventLoop线程,EventLoop是整个Netty框架中最核心的部分之一。
在Netty的NIOEventLoopGroup中,每个EventLoop通过一个无限循环的机制(即run方法)来监听注册在其中的Channel的IO事件和任务队列,具体过程如下:
- 首先,EventLoop会调用其内部Selector的select方法进行阻塞等待IO事件的发生。
- 当Channel上有感兴趣的IO事件发生时,EventLoop会响应并从内部Selector的selectedKeys集合中取出对应的SelectionKey,分别处理读、写、连接、关闭等各种类型的事件。
- 如果任务队列中有待处理的任务,EventLoop将立即执行队列中的任务,例如执行注册Channel、Channel关闭、用户提交的自定义任务等。
- 如果任务队列中没有待处理的任务,则EventLoop会进入下一次select阻塞等待IO事件的发生。
值得注意的是,如果两个Channel注册在同一个EventLoop上,那么它们会共享这个EventLoop的Selector,也就是说,一个Channel上的IO事件触发,可能会影响另一个Channel,此时EventLoop的任务处理必须尽快完成,否则将影响其他Channel上的IO事件响应时间,进而降低整个系统的性能。
具体分析
了解了EventLoop线程的基本运行机制后,我们来进一步分析其核心源码,探究其实现细节。具体可参考NioEventLoop源代码,以下是其中的核心方法run:
/**
* Main event loop that is used to handle I/O and scheduled tasks for a {@link NioEventLoop}.
*/
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { // 第一步
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
selector.select(); // 第二步 阻塞 在此等待IO事件
break;
case SelectStrategy.SELECT_NOW:
selector.selectNow();
break;
case SelectStrategy.SELECT_WITH_FIX_RATE:
selectWithSimpleRate(selectNowSupplier, true);
break;
case SelectStrategy.SELECT_WITH_FIX_DELAY:
selectWithSimpleDelay(selectNowSupplier, true);
break;
default:
throw new Error();
}
} finally {
cancelledKeys = 0;
needsToSelectAgain = false;
}
++selectCnt;
cancelledKeys = 0;
needsToSelectAgain = false;
processSelectedKeys(); // 第三步 处理IO事件
if (preventWakeup && wakeupCounter.getAndSet(-1) > 0) {
wakeup(true);
}
runAllTasks(); // 第四步 处理任务队列
} catch (Throwable t) {
handleLoopException(t);
}
}
}
上述的run方法包含了以下四个步骤:
1. calculateStrategy方法
该方法是计算下一步选择策略的方法,具体实现是通过调用SelectStrategy的calculateStrategy方法计算得出,方法的内部实现并不复杂,它主要根据当前有无任务需要处理以及是否有IO事件需要处理来决定下一步的策略。例如,如果可能存在待处理的任务,则调用selectNow方法进行轮询,否则则调用select方法进行阻塞等待IO事件的发生。
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
selector.select(); // 阻塞 在此等待IO事件
break;
case SelectStrategy.SELECT_NOW:
selector.selectNow();
break;
case SelectStrategy.SELECT_WITH_FIX_RATE:
selectWithSimpleRate(selectNowSupplier, true);
break;
case SelectStrategy.SELECT_WITH_FIX_DELAY:
selectWithSimpleDelay(selectNowSupplier, true);
break;
default:
throw new Error();
}
2. selector.select方法
该方法是阻塞当前EventLoop线程,等待感兴趣的IO事件的发生。具体实现是通过Java NIO提供的Selector.select方法来进行等待。
selector.select(); // 阻塞 在此等待IO事件
3. processSelectedKeys方法
该方法是处理IO事件的主要方法,它会遍历注册在当前EventLoop上的所有Channel,并对已发生的IO事件进行处理。例如,如果当前Channel上的连接事件已经被触发,则会调用ChannelPipeline的fireChannelActive方法来通知Pipeline上的各个Handler处理连接事件。整个processSelectedKeys方法的实现非常复杂,主要包括以下几个步骤:
- 从Selector.selectedKeys集合中取出当前已经发生的IO事件。
- 遍历选中的SelectionKey集合,并根据其类型分别进行处理,例如连接事件、读事件、写事件、关闭事件等。
- 如果当前Channel还有未处理完的IO事件,那么将该Channel对应的SelectionKey重新加入到Selector的selectedKeys集合中,以便下一次检测。
private void processSelectedKeys() {
readyKeys.clear();
SelectorUtil.select(selector, readyKeys, null, 1000); // limit = 1000
for (int i = 0; i < readyKeys.size(); ++i) {
SelectionKey k = readyKeys.get(i);
Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
// 如果attachment不是Channel,说明此SelectionKey已经被标记为取消
// 这种情况主要是由于调用了Channel的close方法,但是由于SelectionKey的取消需要在Selector的下一次select操作时才能真正执行,因此需要保存为已取消的状态
// 此时需要将cancelledKeys计数器加1,以便在下一次select操作时忽略这类失效的SelectionKey
cancelledKeys++;
}
}
}
4. runAllTasks方法
该方法是处理任务队列的主要方法,其实现也比较简单,只需要遍历任务队列,并依次执行即可。
private void runAllTasks() {
fetchFromScheduledTaskQueue(); // 从ScheduledTaskQueue中取出即将执行的任务
// 获取并执行普通任务队列中的所有任务
for (;;) {
Runnable task = pollTask();
if (task == null) {
break;
}
task.run();
}
lastExecutionTime = ScheduledFutureTask.nanoTime(); // 更新最后一次执行任务的时间戳
}
示例说明
下面对上述内容进行两个示例说明,以更好地理解EventLoop的运行轨迹。
示例一
假设现在有两个Channel分别注册到了同一个EventLoop线程上,此时如果其中一个Channel上的连接事件触发了,那么EventLoop就会调用其相应的ChannelPipeline的fireChannelActive方法通知Pipeline上的Handler来处理该事件,如果此时处理较慢,就会影响整个EventLoop的性能。因此,EventLoop的逻辑必须尽快处理完成对连接事件的响应。因此,在实际应用中,我们要合理控制Channel的注册和分配,使其能够被均匀分配,防止某个Channel过于繁忙而影响其他Channel的响应时间。
示例二
假设现在有一个用户提交了一个Task,并且这个Task的执行需要一定的时间(例如执行一个大文件的读写操作),此时EventLoop线程是不能够一直等待Task的完成的,因为这会导致当前EventLoop阻塞,其他的I/O事件无法及时响应,从而导致整个系统的性能下降。因此,Netty为用户提供了一个自定义的Task类型——ScheduledFutureTask,可以在指定的时间再次将Task放入任务队列中,避免当前的EventLoop被阻塞,示例代码如下:
ScheduledFutureTask<?> futureTask = new ScheduledFutureTask<>(() -> {
// task执行的具体逻辑
}, 0, TimeUnit.SECONDS);
eventLoop.submit(futureTask);
此时futureTask就会被提交到指定EventLoop线程的任务队列中,并且在当前的EventLoop周期执行完毕后就会执行。而且在执行ScheduledFutureTask的时候,如果当前任务队列中还有其他的Task需要执行,也会及时启动执行。这种方式可以非常有效地避免EventLoop被阻塞的问题,大大提高了应用程序的响应速度和运行效率。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Netty事件循环主逻辑NioEventLoop的run方法分析 - Python技术站