Netty 线程模型详解:从演进的角度看源码设计
前言
从这篇文章开始,开始进入源码分析阶段。在之前的入门篇和实战篇中,通过代码的编写和各种特性的讲解,让我们对 Netty 有了充分的感性认知。那么接下来,通过对代码的详细走读,来细细查看哪些在感性直觉背后的实现原理。
Netty 的线程模型
Netty 的线程模型其实没有什么太特别的地方,属于比较自然而然的设计。
首先,基于Selector可以同时监控多个链接的特性,很容易想到将所有通道都注册到一个selector对象,然后死循环获取通道上的就绪事件并且进行处理。这就是最朴素的设计,也就是作为的单线程模型。这种模式,比较适合客户端,用于服务端的话,因为线程数太少,无法有效的利用多核 CPU 的处理能力。
单线程无法有效的利用 CPU,而且在一个Selector上管理太多的链接,效率也会下降。很自然的想到利用多线程提升效率。如何使用多线程按照不同的方向扩展又有所区分。单线程模式中的点在于两个:
- 单个
Selector关注了太多链接,导致一次取出的集合可能很大,遍历耗时太多 - 所有的链接事件都在一个线程中处理,处理完才能处理下一个链接,导致在后面的链接长时间的等待
根据这两点,对应的也有两种扩展模式。
模式一
针对单个Selector的问题,创建一组Selector对象,将链接的就绪检查平均的分配到每一个Selector对象上。并且为每一个Selector对象,绑定一个线程,线程自身执行一个死循环的就绪检查。每当有一个新的链接对象时,使用轮训或者其他策略从Selector组中选择一个Selector,将链接注册上去。可以形象的用图来看

模式二
解决单线程处理链接就绪事件的法子很容易想到就是线程池。这种就成了派发模式。主线程检查通道的就绪事件,发现就绪事件后,将就绪事件的处理逻辑包装为为一个任务,提交到线程池中处理。提交到线程池的处理速度是十分快的。提交完毕后,主线程继续执行select方法,监控通道的就绪事件。可以形象的用图来看

对于模式一,一般称之为多主模式。而模式二,因为在就绪事件的处理阶段引入了线程池,常称之为多线程模式。
对于模式一而言,还会有细化的演变。使用一组Selector来无差别的服务服务端链接和客户端链接显得职责有些不清晰。因此会将Selector分为两组:
- 第一组,只服务于服务端链接,其绑定的线程处理客户端接入就绪事件。如果应用程序只有一个服务端监听链接,那么该组的大小为 1.
- 第二组,只服务于客户端链接,其绑定的线程处理客户端读写就绪事件。该组的大小有几种思路,比如 CPU 内核数+1,比如 CPU 内核数的 2 倍
这种演进模式由于具备了明显的职责区分,常称之为主从模式。大多数公开的材料,博客,Netty 官网以及《Netty 实战》等介绍的 Netty 写法,都是使用主从模式。主从模式可以表达为

实际当中,这几个模式并不会互相排斥,比较常见的有将主从模式和多线程模式结合在一起使用,此时这种模式就是多线程版本主从模式。
Netty 对线程模式的支持主要体现在EventLoopGroup的配置支持上。通过配置不同个数的EventLoopGroup以及在不同地方配置EventLoopGroup,Netty 可以实现从单线程模式变化为多主模型,再演化为主从模式,最后终极的就是多线程版主从模型。
可以看到,整个模型的演进其实是很自然的事情,并不如一些文章中说的特别精巧或者特意的设计。是一种职责梳理后,根据需要扩展的点,很清晰,很容易就能想到的设计方案。
Netty 线程模型涉及的类
上一个章节梳理了 Netty 的线程模型设计和演进思路。这一章节来梳理下线程模型中会涉及到的类。Netty 的线程模型基本上可以从接口EventLoop和其实现类的实现细节上看出来。先看下总览的类层次图:

有几个比较明显的意图可以直接从继承关系中看出。继承了ExecutorService是为了实现方法submit方便进行任务的提交;继承了ScheduleExecutorService是为了提交定时任务;EventLoop继承了EventLoopGroup是为了方便后者将任务转发给具体的EventLoop线程去执行。
学习 Netty 的线程模型,需要比较深入的了解几个类,分别是
- NioEventLoopGroup: 负责管理 NioEventLoop
- NioEventLoop: 实际的线程本身,负责执行具体的 IO 任务和用户任务。
NioEventLoopGroup
照例仍然是先看下类图,如下

从类图可以看到,EventLoopGroup是继承于EventExecutorGroup,那么先来看下EventExecutorGroup这个接口。
EventExecutorGroup
先看下接口代码
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
Future<?> terminationFuture();
@Override
@Deprecated
void shutdown();
@Override
@Deprecated
List<Runnable> shutdownNow();
EventExecutor next();
//以下省略继承的接口方法
} 从接口定义上来看,EventExecutorGroup只新增了 2 类方法:
- 用于关闭 group 以及获取关闭信息的相关方法。
- 获取下一个可以执行任务的
EventExecutor实例。
next方法实际上提供了一种隐喻和暗示,当向 group 提交任务时,实际上内部可以通过next方法取得一个EventExecutor实例来真正的执行一个任务。这个暗示会在后文的抽象类实现中被印证。
AbstractEventExecutorGroup
该抽象实现覆盖了大部分与执行runnable或者callable的方法,几乎所有的抽象实现都是通过next方法获取一个EventExecutor实例,然后将方法的执行委托该给实例的同签名方法。来看下代码会直观,如下
@Override
public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return next().submit(task);
} 这个实现方式也印证了上节提出的猜测。
MultithreadEventExecutorGroup
该类顾名思义,就是通过管理多个线程进而对外提供服务的EventExecutorGroup,关注下其构造方法。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//省略代码,与错误检查相关。。。
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//在这里可以看到,根据给定的线程数,构造待分配使用的EventExecutor实例。后续的next方法分配从这个数组中选择。
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
//省略代码,与异常输出相关。。。
} finally {
//省略的代码,与EventExecutorGroup关闭相关。。。
}
}
//省略的代码。。。
} 从构造方法就可以看出,这个类就是在构造方法中事先初始化和储备了一组EventExecutor供后续进行分配,也就是供next方法进行分配。
比较重要的newChild方法则留给了子类去实现。这里额外提一下入参的 executor 对象。如果入参没有赋值的话,则使用默认实现ThreadPerTaskExecutor,代码如下
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
} 看上去,每次调用 execute 方法都会产生一个新的线程。但实际上,任何一个EventExecutor实现内部只会调用一次该方法。
DefaultEventExecutorGroup
网络上大多数材料,博客和官网在介绍 Netty 的时候,都是使用主从的线程模式,所以这个类基本都见不到。这个类最大的用处是在当 Netty 处于多线程主从模式时,用于承担ChannelHandler运行的线程池的角色。常见的用法一般是

前文在讲 Netty 的线程模型的时候曾经提到过这种用法。由于这个线程池中线程不需要涉及Selector的操作,因此使用NIOEventLoopGroup比较浪费,使用DefaultEventExecutorGroup就刚刚好。该类的主要作用就是用来提供方法newChild的实现。代码如下
protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
} 关于DefaultEventExecutor,在后文介绍,这边先留着。
EventLoopGroup
EventExecutorGroup如果不是使用多线程模式,也不会用到,较为少见。EventLoopGroup因为需要初始化的时候传入,就相对熟悉很多了。
EventLoopGroup可以认为是特殊化的EventExecutorGroup。提供了额外的接口用来给io.netty.channel.Channel进行注册。从额外提供的注册方法可以看出,EventLoopGroup主要就是在和Channel在打交道。来看下接口方法。
public interface EventLoopGroup extends EventExecutorGroup {
EventLoop next();
ChannelFuture register(Channel channel);
ChannelFuture register(ChannelPromise promise);
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
} 主要的功能就是提供了注册接口,用于给通道注册,让通道可以绑定到特定的Selector和对应的线程上。同时也重写了next方法,将返回值类型变更。整体变更到 EventLoop 体系中。
MultithreadEventLoopGroup
继承了MultithreadEventExecutorGroup,重点就是提供了接口新增的三个register方法的实现。其原理也是通过next方法获取EventLoopGroup实例,委托其执行同签名register方法。
DefaultEventLoopGroup
这个类的作用和DefaultEventExecutorGroup一样,都是为了提供newChild方法的具体实现。具体如下
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
} 可以看到和DefaultEventExecutorGroup不同的地方就在于返回的类不同。
方法 next 的具体实现
上文收到,xxGroup 的大部分方法都是通过next方法选择了一个EventExecutor实例或者EventLoop实例去执行同签名方法。
next方法的功能思想我们在看MultithreadEventExecutorGroup有介绍过。大致是就是从一组实例中选择一个。而该方法的实现,目前在 Netty 中被标准为不稳定。其实现委托了EventExecutorChooserFactory工厂方法,生成一个选择器实例,也就是EventExecutorChooser接口的实现类。工厂类在 Netty 中存在一个默认实现DefaultEventExecutorChooserFactory,该工厂方法生成选择器对象有两种可能:
- 传入的
Eventexecutor为 2 的次方幂个数时,取模有特殊优化,具体的实现类为PowerOfTwoEventExecutorChooser - 其余情况,采用轮训的方式,以普通取模方式得到结果,具体的实现类为
GenericEventExecutorChooser
无论哪种,都是轮训。只不过是在细节处的代码优化罢了。理解思路最为重要。
综述
EventLoopGroup的实现思路和 JDK 中对ExecuteService不同。JDK 中的实现思路是提供一个存储任务的队列,外部调用者将任务放入队列,而ExecuteService内部管理着一些线程,这些线程会在这个队列上争抢任务,争抢成功的线程则执行任务,执行完毕后继续在队列上争抢。
而EventLoopGroup没有统一存储任务的队列,因此实际上是将任务直接投递到具体的某一个EventLoop对象中让其执行。结合《Netty 的线程模型》的分析,EventLoop本身应该是一个死循环执行的线程,其内部应该具备一个队列用于存储任务,EventLoop会在死循环的过程中从队列中获取任务以执行。因此EventLoopGroup只是承担了一个管理EventLoop和关闭的作用,不承担调度职责。因此其实现也是较为简单。
NioEventLoop
先看下类图,如下

先来看下类图中的基础接口EventExecutor.
EventExecutor
为了方便EventExecutorGroup将方法委托给EventExecutor,EventExecutor继承了EventExecutorGroup接口。而其本身并没有新增和处理任务相关的方法,只是新增了生成Future实例的方法。
AbstractEventExecutor
从源码上而言,该类仅仅只是提供了一些基础的方法,并且为个别特定方法提供一个默认实现,这些方法都是与生成Promise实例或者Future实例相关的。大致上都是类似这种
public <V> ProgressivePromise<V> newProgressivePromise() {
return new DefaultProgressivePromise<V>(this);
}
@Override
public <V> Future<V> newSucceededFuture(V result) {
return new SucceededFuture<V>(this, result);
} AbstractScheduledEventExecutor
该抽象实现提供了对计划类方法的支持。对于计划的支持主要依靠两点:
- 定义计划任务类
io.netty.util.concurrent.ScheduledFutureTask,用于封装包含计划时间的任务。 - 使用优先级队列存储计划任务类,排序规则按照任务的计划时间执行排序。不过 Netty 没有使用 JDK 自带的优先级队列,而是采用自定义的实现,不过其采用的数据结构仍然是小顶堆,算法实现也和 JDK 自带的相同。故而不展开。
ScheduledFutureTask主要是封装了和计划任务相关的一些方法,诸如获取时间,获取在优先级队列中的下标等。本身最重要的存储数据就是该任务的截止时间和周期时间(如果是周期性任务)。
概括而言,计划任务采用的优先级队列来实现。周期性任务则是每一次取出任务后,都再次计算截止时间并且再次放入优先级队列。
SingleThreadEventExecutor
单线程实现EventExecutor功能的抽象基类。其核心要点如下:
- 其本身具备未启动、启动、关闭中、关闭,终止 5 个状态。
- 内部使用 Queue 接口存储 Runnable 对象。
- 使用 execute 方法首先将 runnable 对象放入队列中。并且如果 Executor 如果还没有启动,则通过 CAS 方式原子的将状态切换到启动,只有 CAS 成功了才能执行后续的步骤。CAS 成功后通过其持有的
java.util.concurrent.Executor对象执行 execute 方法,执行一个匿名 runnable 对象,该 runnable 对象的内容就是执行SingleThreadEventExecutor类的 run 方法(该方法需要子类实现)。而Executor对象默认情况下是ThreadPerTaskExecutor。也就是该次execute方法的执行时创建了一个线程来执行这个匿名的 runnable 对象。 - 提供了
takeTask方法用于从队列中提取数据。该实现要求 Queue 接口的实现具备BlockingQueue接口。该方***尝试从任务队列和计划队列中同时提取数据,并且计划队列中的任务优先级更高一些。
核心要点基本上阐述了该类的作用和实现的思路。有了思路之后,再继续看代码就显得十分清晰了。来看下最核心的代码execute方法,如下
public void execute(Runnable task) {
//错误检查,忽略相关代码
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
//忽略代码,与EventExecutor关闭状态添加任务处理相关。。。
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
} 这里面首先是将任务添加到该线程独占的待处理队列中。比较核心的代码就一句taskQueue.offer(task)。这个taskQueue就是核心要点 2 提到的 Queue 接口。
当从EventExecutor线程外部添加任务时,也就是判断if (!inEventLoop)为真时,则会尝试启动线程,也就是执行方法startThread。来看代码
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
} 只有在线程未启动的情况下,才能执行真正的启动动作,if (state == ST_NOT_STARTED)就是为了保证这一点。为了避免并发冲突,使用 CAS 的方式进行抢占式更新。只有更新成功,也就是if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED))为真的情况下,才能尝试启动线程。也就是执行方法doStartThread。该方法有点复杂,但是省略掉和启动无关的代码后就很清晰了,如下
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
//省略代码,内容为设置线程对象属性,设置启动时间属性
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//省略代码,内容为在线程关闭时执行的相关清理动作
}
}
});
} 单纯的执行当前对象本身的run方法。考虑到线程并不是一次性资源,可以合理的推测出run方法的方法体,必然是一个循环。再考虑到EventExecutor有一个任务队列。基本上就可以确定,外部或内部线程将任务投递到队列中,然后线程本身死循环从队列中取出任务进行执行。
DefaultEventExecutor
通过类层次图和上面的分析可知,该类需要提供一个 run 方法的实现。代码很简单,如下
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
} 可以看到,思路也很简单,就是在循环中获取任务并且执行。而如果EventExecutor的状态变更为关闭时,则退出循环。
SingleThreadEventExecutor中的 execute 方法只会执行一次Executor的 execute 方法也能看出,这里存在一种暗示,也就是子类的 run 方法都需要是通过循环来不断的执行任务,否则只执行一次就会导致线程退出,后续任务就无法执行了。
看到这里,可以对EventExecutor的实现做一个总结了。简单概括包含几点:
EventExecutorGroup内部管理着一组EventExecutor对象,每一个EventExecutor都实现了runnable接口,也实际上会被分配一个线程用于执行。EventExecutor的抽象实现类持有一个Queue用于存储任务,该对象需要是接口BlockingQueue的实现。EventExecutor的默认实现类对 run 方法的实现就是一个死循环不断的从Queue中获取任务进行执行。- 当调用
EventExecutorGroup方法想要提交任务执行时,EventExecutorGroup内部使用轮训机制选择一个EventExecutor实例,将任务投递到该实例的Queue中。
SingleThreadEventLoop
主要是提供了关于注册 Channel 的一些相关方法的实现。
相对比SingleThreadEventExecutor,SingleThreadEventLoop多实现了接口中关于注册的部分,下面就来看下注册方法io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)。
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
} register方法则是io.netty.channel.AbstractChannel.AbstractUnsafe#register。如下
public final void register(EventLoop eventLoop, final ChannelPromise promise)
{
//入参检查,包括参数非空检查,是否已经注册过检查,以及判断EventLoop是否是合适当前Channel的对象,省略相关代码
//...被省略的代码
//将EventLoop设置到channel的属性上
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop())
{
register0(promise);
}
else
{
try
{
eventLoop.execute(new Runnable()
{
@Override
public void run()
{
register0(promise);
}
});
}
catch (Throwable t)
{
//异常代码,省略
}
}
} 和 EventLoop 相关的调用都有一个固定的套路,如果当前线程是 EventLoop 的线程则直接执行对应的操作;如果不是,则把对应的操作封装为一个匿名的 runnable 对象,投递到 eventLoop 中被执行。上述的代码也是使用了这个套路去执行register0方法。下面看下register0的方法。
private void register0(ChannelPromise promise)
{
try
{
//检查在真正执行注册之前,通道是否仍然保持打开状态。
if (!promise.setUncancellable() || !ensureOpen(promise)) {return; }
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
//先调用handlerAdded方法,再设置promise。避免用户在FutureListener中自己调用导致的冲突可能
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive())
{
//通道可以注册后取消注册再次被注册,只有首次注册才会触发事件
if (firstRegistration)
{
pipeline.fireChannelActive();
}
else if (config().isAutoRead())
{
//如果再次注册,则也需要开始读取数据
beginRead();
}
}
}
catch (Throwable t)
{
//异常处理代码,主要是设置promise失败,省略代码
}
} register0的内容可以概括为:
- 执行真正的注册动作,也就是方法
doRegister。 - 注册成功后,对应的通道就有了其绑定的线程,也就是后续该通道的逻辑处理都在这个线程上执行。如果之前已经有
ChannelHandler添加到这个通道的管道pipeline上,那么首先执行这些ChannelHandler的handlerAdded方法。 - 触发管道的
channelRegistered事件。 - 如果通道是首次注册到线程,并且处于激活状态,则触发
channelActive事件。
其中doRegister方法是被具体的 Channel 类实现的。对于 NIO 来说,则是被io.netty.channel.nio.AbstractNioChannel提供了实现,如下
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
//省略代码,异常处理相关
}
}
} 总结来说就是在NioChannel上注册了一个不关心任何就绪事件的SelectionKey。此时仅仅是完成了注册,但是由于没有关注就绪事件,不会产生效果。对关注的就绪事件集合的更新,在代码的其他地方。
DefaultEventLoop
该实现只是为了提供 run 方法的实现,且实现内容也是不断循环取出任务。和DefaultEventExecutor是相同的。
实际上在 Netty 中这个实现并不会被使用,没有什么关注意义。
NioEventLoop
铺垫了半天,再次说到正主了。一直到DefaultEventLoop,实际上都是面向普通的任务,和 NIO 都没有任何关系。NioEventLoop是专门用于处理 NIO 事件的特殊化EventLoop。其内部实现了对注册通道的读写相关操作。由于内部逻辑较为复杂,直接理解源代码比较麻烦,需要结合场景来进行,因此对于NioEventLoop的源码分析将会根据场景区分为《服务端接入客户端链接》、《客户端链接读取和写出数据》两个场景来分析其中的源码,在这里先不展开。
综述
透过对EventLoop接口和实现类的源码分析,该类的设计意图就十分明显,简而言之就是通过自身持有的任务队列,死循环不断的从队列中获取任务进行执行。其他的代码部分都是围绕在这个核心部分,保障其并发正确性的一些必要防御性编程手段,诸如EventLoop自身的状态设计,队列接口的实现者选择等。
总结与思考
本文我们从线程模型的演变开始,对Netty线程模型涉及到的相关类做了源码解析。线程模型的演变分析中,可以梳理出线程模型设计的思路。而这些思路在Netty相关源码中都能找到合适的映射。一个设计良好的框架,是可以清晰的回溯思路的。
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>
