异步化的Netty:隐藏在其后的线程模型
异步化的Netty
Netty在官网首页有这么一句话介绍自己
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
异步的特性甚至还摆在事件驱动之前,可见其重要性。Netty的异步操作在代码中随处可见,几个比较重要的地方返回都是ChannelFuture接口。先来重温下在什么地方会遇到异步接口。
第一处,也是最为常见,在服务端引导程序绑定监听端口的地方,代码如下
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class); ChannelFuture sync = serverBootstrap.bind(2323).sync();
bind方法返回的ChannelFuture对象有两种使用方式:
- 第一种,在允许阻塞的上下文中,可以直接使用
sync或者await方法等待异步任务完成。 - 第二种,当前上下文不能阻塞的情况,可以调用
ChannelFuture的addListener方法注册一个回调函数。该回调函数会被异步任务被完成后触发。
第二处使用返回异步任务的地方则是紧随监听端口绑定成功之后,为了不让main方法退出,需要去等待服务端程序的关闭,代码如下
ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class); ChannelFuture sync = serverBootstrap.bind(2323).sync(); sync.channel().closeFuture().sync();
通过sync.channel()的调用获得了绑定监听端口成功的服务端通道。而后通过closeFuture方法获得了该服务端通道的关闭异步任务。只有在服务端通道关闭后,该异步任务才会完成。通常而言,服务端通道关闭就意味着整个网络服务应用的下线。因此在这里等待通道的关闭实质就是等待整体应用的结束。
这里的等待是有着实质的重要作用的,一般而言,我们在初始化ServerBootstrap都会传入工作线程池,也就是EventLoopGroup对象。这些线程池在服务端通道关闭后,其内部的任务队列可能还剩余一些任务没有完成。此时为了数据的正确性考虑,不能强制关闭整个程序,否则就可能造成数据不一致或其他异常。因此需要在EventLoopGroup上执行优雅关闭,也就是调用shutdownGracefully方法。该方***首先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。
一般而言,在服务端的代码中我们的写法都是
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
ChannelFuture bind = serverBootstrap.bind(2356);
bind.sync();
Channel serverChannel = bind.channel();
serverChannel.closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
如果没有serverChannel.closeFuture().sync();就会直接结束main方法,然后执行finally中的内容,这会导致运行中的应用中断。根据上文的介绍,除了使用sync等待,还可以添加监听器,在监听器中进行线程池的优雅关闭。不过相对来说,sync等待这种写***比较常见和简洁一些。
第三处则是在数据写出的地方,先看实例代码
public static void main(String[] args)
{
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup();
final AtomicInteger count = new AtomicInteger();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch) throws Exception
{
ch.pipeline().addLast(new ChannelInboundHandlerAdapter()
{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ChannelFuture future = ctx.write(msg);
future.addListener(new ChannelFutureListener()
{
@Override
public void operationComplete(ChannelFuture future) throws Exception
{
//消息数量统计
count.incrementAndGet();
}
});
}
});
}
});
ChannelFuture bind = serverBootstrap.bind(2356);
bind.sync();
Channel serverChannel = bind.channel();
serverChannel.closeFuture().sync();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
这个例子中我们实现简单的消息发出的总数的功能。可以注意到,我们将计数的增加放在了任务的监听器之中实现。
这是因为执行io.netty.channel.ChannelOutboundInvoker#write(java.lang.Object)方法,该方法是一个异步方法,直接返回了ChannelFuture实例,当方法返回的时候,消息可能还没有写入到Socket发送缓冲区。如果在方法返回的时候就进行累加,累加的结果就和实际情况存在偏差了。
而在异步任务的监听器中进行累加,当方法operationComplete被调用时,数据已经被写入socket发送缓存区。此时进行计数累加的结果就是真正的消息发出的总数了(不考虑TCP通道中断的情况下)。
异步的好处显而易见,不让线程阻塞在IO操作上,可以尽可能的利用CPU资源。不过异步并不是“免费午餐”,支持异步实现需要背后高效合理的线程模式设计。这也是下文要分析的内容。
从《Scalable IO in Java》看线程模型
在操作系统支持IO多路复用能力后,针对这种能力,衍生了专门使用其的编程模型,也就是。网络上的翻译都是反应堆模式,但是觉得一点都不达意,也没有找到好的翻译,因此下文就直接称
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>