Netty流程细讲之数据读取与连接远端
引言
前文我们介绍了Netty如何让NioServerSocketChannel服务端通道接受并且初始化一个客户端链接。对于一个客户端链接而言,主要的处理任务有两类:处理就绪事件和处理队列任务。其中就绪事件又可以再细分为读就绪,写就绪,连接就绪和接受就绪。接受就绪我们在上文已经分析过了,本文主要来分析读就绪和连接就绪事件。
数据读取
当NioServerSocketChannel初始化完成一个链接后,Netty就可以在该链接上执行读取操作。不过在分析读取操作之前,我们先来复习下客户端链接初始化的一些细节。
客户端链接初始化细节
前文我们介绍了客户端链接在方法ServerBootstrapAcceptor#channelRead初始化的步骤,分别是:添加ChannelInitializer对象,设置配置项和属性,注册到EventLoop上。还记得前文提到过,Channel注册到EventLoop上,最终会委托到一个方法AbstractChannel.AbstractUnsafe#register0,该方法在将通道绑定到EventLoop对应的Selector上后,会执行一段触发channelActive事件的代码,如下
if(isActive())
{
if(firstRegistration)
{
pipeline.fireChannelActive();
}
else if(config().isAutoRead())
{
beginRead();
}
}
客户端链接是NioSocketChannel对象,其isActive方法实现在链接仍然打开的状态下返回true,显然firstRegistration在客户端链接刚被服务端通道接受的情况下也是true。因此,这里就会触发channelActive事件。
而对channelActive事件的响应,前文我们曾经介绍过,在pipeline的首节点中会进行,其代码如下
public void channelActive(ChannelHandlerContext ctx)
{
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead()
{
if(channel.config().isAutoRead())
{
channel.read();
}
}
NioServerSocketChannel和NioSocketChannel都会执行到这段代码(不过前者是在绑定端口时触发而后者是在注册到EventLoop就触发了)。这里的read和在NioServerSocketChannel一样,触发了pipeline的read事件,最终委托到方法AbstractChannel.AbstractUnsafe#beginRead。来看下其代码
protected void doBeginRead() throws Exception
{
final SelectionKey selectionKey = this.selectionKey;
if(!selectionKey.isValid())
{
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if((interestOps & readInterestOp) == 0)
{
selectionKey.interestOps(interestOps | readInterestOp);
}
}
对于NioSocketChannel而言,其readInterestOp属性的值是SelectionKey#OP_READ。也就是在这里,NioSocketChannel在Selector上的就绪事件关注增加了对数据可读就绪的关注。
从这个调用中,也可以总结出Channel.read这个调用的思路。该调用触发通道上的read事件,并且最终传递到HeadContext也就是首节点,经过一系列委托调用,最终触发到AbstractNioChannel#doBeginRead。而这个方法的作用就是更新通道注册的就绪事件,增加上readInterestOp属性代表的事件。对于ServerChannel,这个值是Accept;对于SocketChannel,这个值是Read。使用这个方法注册新的读取(可读or接入)事件后,就可以在下次进入Selector.select时就可以执行真正的读取动作了。
读取数据并处理
当NioSocketChannel注册到EventLoop上后,并且就绪事件关注修改为可读就绪事件关注后,EventLoop线程就可以在其run方法中处理数据读取了。在章节五中我们已经分析过run方法的流程,这里不再赘述。我们直接分析处理可读就绪事件的方法processSelectedKeys,前文分析过这个方法的内部细节,这里我们直接关注可读就绪的部分,也即是
if((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
{
unsafe.read();
}
与之前不同,这里的unsafe实例是NioByteUnsafe,下面来看下其read方法的实现,如下
public final void read()
{
final ChannelConfig config = config();
//代码①
if(shouldBreakReadReady(config))
{
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
//代码②
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try
{
do {
//代码③
byteBuf = allocHandle.allocate(allocator);
//代码④
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if(allocHandle.lastBytesRead() <= 0)
{
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if(close)
{
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
//代码⑤
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if(close)
{
closeOnRead(pipeline);
}
}
catch(Throwable t)
{
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
finally
{
//代码⑥
if(!readPending && !config.isAutoRead())
{
removeReadOp();
}
}
}
代码虽然有点长,但是整体的脉络较为清晰,其流程可以概括如下

首先我们说一下readPending这个属性,这个属性的命令会让人以为这个这个属性用于表达当前通道是否正在读取。但这个属性实际表达的含义是当前通道在注册读取后是否未曾读取到数据。该属性在调用方法Channel.read或者ChannelHandlerContext.read时被设置为true,当这个通道成功读取到一个消息(比如NioServerSocketChannel类型的通道)或成功读取到字节消息亦或EOF(比如NioSocketChannel类型的通道)后就被修改回false。默认配置下,一个链接是被配置为自动读取模式,也就是只要通道上有数据就会触发channelRead事件,所以这个属性大多数时候并不起到什么作用。但是在需要应用逻辑来控制是否读取数据的场合,也就是通道的autoRead模式被关闭的情况下,读取数据是依靠应用程序主动调用Channel.read或ChannelHandlerContext.read的场合,该属性就可以用于决定取消读就绪事件关注的取消时机。这个可以在后文的代码分析中看到。
接着我们来分析下代码,首先是代码①。shouldBreakReadReady方法用于判断是否中断读取,其内容倒也简单,如果底层Socket已经关闭输入流或链接终止(可能是经过Netty关闭,也可能是外部导致的关闭),且通道本身不支持半关闭或Netty之前已经确认过该通道输入流被关闭(通过标识位留存判断);则当前读取中断,方法直接返回。该方法主要是避免后续无谓的读取操作导致的内存分配行为。
接着来看方法②。该方法重置了读取逻辑处理器的统计数据。其重置的统计数据主要是2个:总计读取消息数和总计读取字节数。在这里,RecvByteBufAllocator这个接口的实现是AdaptiveRecvByteBufAllocator。
接着来看代码③。对于读取而言,申请的ByteBuf的大小对于性能表现很重要。申请的ByteBuf太小,要完整读取Socket缓冲区中的数据就需要多次的读取操作,会降低系统运行效能;申请的ByteBuf太大,读取数据是方便了,但是对于同时服务大量客户端的服务端应用而言,每一个客户端申请的ByteBuf越大,总和之后,对于整体系统的内存和GC压力都会增大。因此Netty中制定了不同的策略来实现对ByteBuf的大小申请机制。这个会在后文开单张进行具体的说明,这边大家先了解这个情况。
接着我们来看代码④。通过方法doReadBytes进行实际的字节读取,其方法如下
protected int doReadBytes(ByteBuf byteBuf) throws Exception
{
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.attemptedBytesRead(byteBuf.writableBytes());
return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}
首先通过方法allocHandle.attemptedBytesRead记录本次尝试读取的字节数,也就是byteBuf可以容纳的写入大小。而后通过方法ByteBuf#writeBytes(ScatteringByteChannel, int)完成字节的写入,该方法是ByteBuf写入方法之一,方法实现会尝试从入参的Channel最大读取第二入参长度的字节内容,并且在容量不足时自动扩容。字节读取完毕后便向后触发管道channelRead事件。
接着来看代码⑤,方法RecvByteBufAllocator.Handle#continueReading决定了是否继续从通道中读取数据,这里的实现是DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#continueReading(),其实现如下
public boolean continueReading()
{
return continueReading(defaultMaybeMoreSupplier);
}
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier)
{
return config.isAutoRead() && (!respectMaybeMoreData || maybeMoreDataSupplier.get()) && totalMessages < maxMessagePerRead && totalBytesRead > 0;
}
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>