Netty流程细讲之服务端接受并创建客户端链接
引言
在上篇文章中我们分析了服务端的启动流程。在服务端启动成功后,就可以开始监听端口,并且处理客户端的接入请求了。
服务端的ServerChannel是注册在EventLoop线程上的,而对客户端的接入处理也是从这个地方开始。因此,在这里我们需要首先分析下在NioEventLoop
NioEventLoop的run方法
源码篇第一讲,我们曾分析过NioEventLoop,它是一个特殊化的SingThreadEventExecutor,因为其run方法的实现并不是单纯的从队列中取出任务执行,还包含了在Selector对象上执行等待的流程。对于在端口上监听客户端接入请求的服务端程序而言,显然其监听等待也是依靠了在Selector上的等待。那下面我们就看下其run方法的实现,具体如下
protected void run()
{
for(;;)
{
try
{
try
{
switch(selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()))
{
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if(wakenUp.get())
{
selector.wakeup();
}
default:
}
}
catch(IOException e)
{
rebuildSelector0();
handleLoopException(e);
continue;
}
{
//代码①:处理就绪Key和队列任务
}
}
catch(Throwable t)
{
handleLoopException(t);
}
{
//代码②:处理EventLoop关闭情况
}
}
}
等待策略选择
首先来关注下其选择等待部分的代码。首先是通过代码selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())来确定接下来的选择等待策略。SelectStrategy是一个接口,其唯一实现就是DefaultSelectStrategy。而方法calculateStrategy的内容体也很简单,如下
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception
{
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果队列中没有任务要执行,则后续需要在Selector上执行等待,也就是使用SELECT策略。否则的话,就是执行selectSupplier.get方法来获取后续策略,其方法体如下
int selectNow() throws IOException
{
try
{
return selector.selectNow();
}
finally
{
if(wakenUp.get())
{
selector.wakeup();
}
}
}
SelectStrategy接口定义的策略属性均为负数值,非负数的返回就可以让后续的处理进入代码②的部分。结合代码,这个选择策略可以简单归纳为:在没有队列任务的情况下,执行阻塞等待就绪事件;在有队列任务的情况下,执行无阻塞的就绪选择,并且返回就绪事件数,以便后续代码进入任务和就绪事件处理环节。
阻塞等待
如果等待策略选择的结果是阻塞等待,则进入代码select(wakenUp.getAndSet(false))中。在进入方法体之前,我们先解释下属性wakenUp。
这个属性是一个AtomicBoolean对象,让各个线程通过CAS操作来争夺唤醒EventLoop持有的Selector的权利,因为Selector.wakeUp是一个昂贵的操作。由于NioEventLoop在外部添加任务时其EventLoop线程可能仍然阻塞在Selector.select操作上,所以需要通过Selector.wakeUp进行唤醒,而为了避免频繁的执行这个操作,所以通过wakenUp属性进行CAS保护。而该属性在EventLoop准备执行阻塞select操作之前,都会被设置为false的基本状态。
说明完wakenUp属性,我们再来看看select方法的具体内容,如下
private void select(boolean oldWakenUp) throws IOException
{
Selector selector = this.selector;
try
{
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
//代码①
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for(;;)
{
//代码②
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000 L) / 1000000 L;
if(timeoutMillis <= 0)
{
if(selectCnt == 0)
{
selector.selectNow();
selectCnt = 1;
}
break;
}
//代码③
if(hasTasks() && wakenUp.compareAndSet(false, true))
{
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
//代码④
if(selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks())
{
break;
}
//代码⑤
if(Thread.interrupted())
{
if(logger.isDebugEnabled())
{
logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
//代码⑥
if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)
{
selectCnt = 1;
}
//代码⑦
else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD)
{
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if(selectCnt > MIN_PREMATURE_SELECTOR_RETURNS)
{
if(logger.isDebugEnabled())
{
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector);
}
}
}
catch(CancelledKeyException e)
{
//。。。省略代码,日志记录异常
}
}
整体代码比较长,我们分段来分析,首先来看代码①。通过delayNanos方法计算后面Selector.select可以阻塞等待的时间。其计算逻辑也很简单,如果存在定时任务,则取定时任务的触发时间与当前时间的间隔;如果不存在,则返回默认间隔,NIOEventLoop这边是1s。
接着来看代码②,其逻辑也很简单,如果随着时间流逝,当前时间已经超出了代码①中计算的截止时间,则退出当前循环,也就是意味着离开select方法。
接着来看代码③,马上就要执行Selector.select(timeout)进行阻塞等待,在这之前最后判断一次任务队列是否有了新添加的任务。如果有的话,在属性wakenUp执行CAS竞争,竞争成功的话,执行非阻塞Selector.selectNow获取可能的就绪事件并返回。代码③的作用是减少不必要的阻塞等待,在有任务的情况下。而如果竞争失败,则按照正常流程走向阻塞等待,只不过另外的线程会执行唤醒,实际上也不会阻塞多少时间。
接着来看代码④,此时已经从选择器的阻塞等待中返回,需要确认返回原因。可能的原因包括:
- 有选择Key进入就绪状态。
- 用户线程唤醒了选择器,这种情况意味着外部添加了直接任务或者定时任务。
虽然代码④的判断条件比较多,但是实际应对的情况也只有这两种,而将wakenUp.get()、hasTasks()、hasScheduledTasks()只是考虑了任务添加的不同步骤。
接着来看代码⑤,Netty自身的EventLoop不会中断线程,这种调用必然是用户代码完成的,对于这种情况,Netty也视为离开Select方法的条件。
接着来看代码⑥,此时已经超过当初计算的阻塞截止时间,等待下一次循环通过代码②离开Select方法。
接着来看代码⑦,这里的代码意味着Selector已经触发了JDK的底层Bug,这个问题后续我们开个单章来具体说明。
总结的来说,阻塞等待的流程就是先计算等待的截止时间,并且在这个时间通过for循环执行Selector.select(timeout)阻塞等待。而在等待前,等待返回后都检查是否存在退出条件:存在就绪Key,用户添加任务等。满足退出条件就退出循环。
处理就绪Key
NioEventLoop的run方法在从select方法返回后,就进入处理就绪Key和队列任务的环节,也就是在run方法中的代码①部分,其代码如下
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if(ioRatio == 100)
{
try
{
processSelectedKeys();
}
finally
{
runAllTasks();
}
}
else
{
final long ioStartTime = System.nanoTime();
try
{
processSelectedKeys();
}
finally
{
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - io
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>
