Netty流程细讲之数据写出
引言
在前文中我们介绍了连接就绪,读就绪,接受就绪三种事件流程的处理代码。这篇文章我们来介绍下最后一种流程,数据写出。当用户业务需要将数据写出时一般会调用channel.write或者是channel.writeAndFlush。相关的API在入门篇的例子中也有过演示。本文,我们将来分析这两个API背后的工作原理。
write和flush的区别
在Netty的设计中,写出侧存在一个写出缓存。当我们调用channel.write或者channelHandlerContext.write时,其实只是将需要写出的内容放入到写出缓存的队列中。而只有调用channel.flush或者channelHandlerContext.flush才会让Netty将写出缓存中的数据真正写出到Socket的缓冲区从而通过tcp来发送。当然,Netty也提供了方便的整合方法writeAndFlush用于将数据写出并且刷出到socket缓冲区。下面我们来分别分析下两个不同的方法。
数据写出write
在NioSocketChannel的write方法实际上委托给pipeline来处理,是一个出站事件。经过管道中层层处理器对数据的转换,最终到达管道的首节点的write方法。而首节点的write方法再次将写出动作委托给unsafe.write。在NioSocketChannel中,unsafe实例是NioSocketChannelUnsafe,其write方法继承自AbstractUnsafe,其代码如下
public final void write(Object msg, ChannelPromise promise)
{
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
//代码①
if(outboundBuffer == null)
{
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
ReferenceCountUtil.release(msg);
return;
}
int size;
try
{
//代码②
msg = filterOutboundMessage(msg);
//代码③
size = pipeline.estimatorHandle().size(msg);
if(size < 0)
{
size = 0;
}
}
catch(Throwable t)
{
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//代码④
outboundBuffer.addMessage(msg, size, promise);
}
整个写出基本分为三个步骤:1)检查;2)检查与转换;3)放入写出缓存。
先看代码①。首先确认写出缓存outboundBuffer是否为空。outboundBuffer只有在通道关闭或者关闭写出方向连接时才会被设置为null。因此后续的写入就直接返回失败。这里需要注意的是,如果放弃写出,需要通过方法ReferenceCountUtil.release来执行资源的释放,因为msg可能是一个ByteBuf或者其他可以释放的资源,不执行释放的话,会造成内存泄漏问题。
接着看代码②。filterOutboundMessage方法用于被子类重写,用于完成对msg对象的转换,以及检查要写出的对象是否是当前通道支持的类型。比如NioServerSocketChannel对该方法的重写就是直接抛出异常,因为NioServerSocketChannel只用于接受客户端接入请求,无法写出数据。对于NioSocketChannel而言,对这个方法的重写如下
protected final Object filterOutboundMessage(Object msg)
{
if(msg instanceof ByteBuf)
{
ByteBuf buf = (ByteBuf) msg;
if(buf.isDirect())
{
return msg;
}
return newDirectBuffer(buf);
}
if(msg instanceof FileRegion)
{
return msg;
}
throw new UnsupportedOperationException("unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
如果ByteBuf不是堆外的,则尝试转换为堆外内存,也就是newDirectBuffer方法。如果这个方法是尽最大努力转换,如果当前使用的ByteBuf分配器没有使用内存池,而当前线程也没有可以使用的DirectByteBuf,则不进行转换。因为直接分配一个堆外的ByteBuf消耗较大。
如果msg不是ByteBuf或者FileRegion则抛出不支持异常。
这边来细说下为何要将ByteBuf转化为堆外的模式。ByteBuf本质上就是一个byte[]数组对象,如果一个堆内的byte[]要通过socket写出时,就需要先将数据拷贝到堆外,之后才能用堆外的数据执行写出。这是因为,通过socket的api写出byte[]内容,本质上是将byte[]在内存中的地址传递给了系统的socket接口。但是由于JVM的GC是会挪动堆内数据的位置的,这就可能导致系统在读取数据时因为JVM的GC导致原先给定的地址读取到非法数据。而堆外内存则不受GC控制,因此通过socket执行写出的场合,如果传递的是堆内的byte[]都需要拷贝堆外执行。而JVM执行的拷贝是自行创建一个DirectByteBuffer实例来进行数据拷贝,而创建DirectByteBuffer是消耗较大的操作,如果Netty层面可以直接转化,则可以提升效率。因此在filterOutboundMessage方法内会尝试执行转换。
接着来看代码③。Netty中使用MessageSizeEstimator.Handle#size接口来估计需要写出的对象的大小。该接口的实现有两个:
- 默认实现:
DefaultMessageSizeEstimator.HandleImpl,如果对象是ByteBuf或者ByteBufHolder则返回对象大小,如果是FileRegion返回0(FileRegion是在写出时直接从磁盘读取数据,对象内部没有持有byte[]等内容),其余情况返回默认的估计值。 - 专用于Http2协议的帧大小估计的实现:
FlowControlledFrameSizeEstimator$1,FlowControlledFrameSizeEstimator提供的匿名内部类实现。
代码③计算得到的消息大小,会在将对象放入写出缓存时用于对当前写出缓存消耗总内存的计算。
接着来看代码④。将消息添加到写出缓存中。当这个消息代表的数据被真正写出到socket缓存区后,与该消息关联的promise对象会被设置写出结果,用于通知在其上的监听器。
到这里,写出任务就完成了,实际的数据写出则依托于写出缓存内部的实现逻辑。
数据刷出flush
数据的刷出依靠方法channel.flush,该方法触发了管道的flush事件,这个事件最终会传递到首节点,首节点的flush方法再次委托给unsafe的flush方式,其具体实现如下
public final void flush()
{
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if(outboundBuffer == null)
{
return;
}
outboundBuffer.addFlush();
flush0();
}
protected final void flush0()
{
if(!isFlushPending())
{
super.flush0();
}
}
private boolean isFlushPending()
{
SelectionKey selectionKey = selectionKey();
return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}
方法outboundBuffer.addFlush()用于添加一个刷出标记到写出缓存中,具体的作用后文在分析。方法isFlushPending用于检查当前通道上是否已经有写出就绪等待了。如果当前通道上已经注册了可写就绪事件关注,则意味着之前已经有数据在等待刷出(后文在写出缓存流程中详细分析)。
如果isFlushPending返回false,意味着当前没有数据正在刷出,则触发父类的flush0方法进行数据刷出,flush0方法内部也是再次委托了NioSocketChannel#doWrite,但是flush0本身也做了一些模板方法处理,比如在开始前检查通道是否仍然激活,如果不的话则作废写出缓存中的所有数据;比如在dowrite出现异常时关闭通道并且作废写出缓存中的数据。由此可以看出,调用dowrite方法时,Netty真正的启动了数据的刷出。为了更好的理解的刷出的过程,这里我们先暂停下,来看下写出缓存ChannelOutboundBuffer的数据结构。
ChannelOutboundBuffer数据结构
ChannelOutboundBuffer是一个由其元素对象ChannelOutboundBuffer.Entry构成的单向链表。该链表有三个指针:
tailEntry:指向链表中最后一个元素。unflushedEntry:指向链表中第一个未标记为刷出的元素。flushedEntry:指向链表中第一个标记为刷出的元素。
一开始,队列是空的,三个指针也都没有任何指向。可以通过调用方法ChannelOutboundBuffer#addMessage来添加元素。
添加元素
插入第一个元素,队列就成了

每次插入元素,都会改变tailEntry的值。多次插入后队列就成了

有了
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>

曼迪匹艾公司福利 121人发布