异步化的Netty:隐藏在其后的线程模型

异步化的Netty

Netty在官网首页有这么一句话介绍自己

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

异步的特性甚至还摆在事件驱动之前,可见其重要性。Netty的异步操作在代码中随处可见,几个比较重要的地方返回都是ChannelFuture接口。先来重温下在什么地方会遇到异步接口。

第一处,也是最为常见,在服务端引导程序绑定监听端口的地方,代码如下

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class);
ChannelFuture sync = serverBootstrap.bind(2323).sync();

bind方法返回的ChannelFuture对象有两种使用方式:

  • 第一种,在允许阻塞的上下文中,可以直接使用sync或者await方法等待异步任务完成。
  • 第二种,当前上下文不能阻塞的情况,可以调用ChannelFutureaddListener方法注册一个回调函数。该回调函数会被异步任务被完成后触发。

第二处使用返回异步任务的地方则是紧随监听端口绑定成功之后,为了不让main方法退出,需要去等待服务端程序的关闭,代码如下

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker).channel(NioServerSocketChannel.class);
ChannelFuture sync = serverBootstrap.bind(2323).sync();
sync.channel().closeFuture().sync();

通过sync.channel()的调用获得了绑定监听端口成功的服务端通道。而后通过closeFuture方法获得了该服务端通道的关闭异步任务。只有在服务端通道关闭后,该异步任务才会完成。通常而言,服务端通道关闭就意味着整个网络服务应用的下线。因此在这里等待通道的关闭实质就是等待整体应用的结束。

这里的等待是有着实质的重要作用的,一般而言,我们在初始化ServerBootstrap都会传入工作线程池,也就是EventLoopGroup对象。这些线程池在服务端通道关闭后,其内部的任务队列可能还剩余一些任务没有完成。此时为了数据的正确性考虑,不能强制关闭整个程序,否则就可能造成数据不一致或其他异常。因此需要在EventLoopGroup上执行优雅关闭,也就是调用shutdownGracefully方法。该方***首先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

一般而言,在服务端的代码中我们的写法都是

public static void main(String[] args)
    {
     EventLoopGroup  boss            = new NioEventLoopGroup(1);
        EventLoopGroup  worker          = new NioEventLoopGroup();
        try
        {

            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);
            serverBootstrap.channel(NioServerSocketChannel.class);
            ChannelFuture bind = serverBootstrap.bind(2356);
            bind.sync();
            Channel serverChannel = bind.channel();
            serverChannel.closeFuture().sync();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

如果没有serverChannel.closeFuture().sync();就会直接结束main方法,然后执行finally中的内容,这会导致运行中的应用中断。根据上文的介绍,除了使用sync等待,还可以添加监听器,在监听器中进行线程池的优雅关闭。不过相对来说,sync等待这种写***比较常见和简洁一些。

第三处则是在数据写出的地方,先看实例代码

public static void main(String[] args)
    {
        EventLoopGroup      boss   = new NioEventLoopGroup(1);
        EventLoopGroup      worker = new NioEventLoopGroup();
        final AtomicInteger count  = new AtomicInteger();
        try
        {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception
                {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter()
                    {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
                        {
                            ChannelFuture future = ctx.write(msg);
                            future.addListener(new ChannelFutureListener()
                            {
                                @Override
                                public void operationComplete(ChannelFuture future) throws Exception
                                {
                                    //消息数量统计
                                    count.incrementAndGet();
                                }
                            });
                        }
                    });
                }
            });
            ChannelFuture bind = serverBootstrap.bind(2356);
            bind.sync();
            Channel serverChannel = bind.channel();
            serverChannel.closeFuture().sync();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        finally
        {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

这个例子中我们实现简单的消息发出的总数的功能。可以注意到,我们将计数的增加放在了任务的监听器之中实现。

这是因为执行io.netty.channel.ChannelOutboundInvoker#write(java.lang.Object)方法,该方法是一个异步方法,直接返回了ChannelFuture实例,当方法返回的时候,消息可能还没有写入到Socket发送缓冲区。如果在方法返回的时候就进行累加,累加的结果就和实际情况存在偏差了。

而在异步任务的监听器中进行累加,当方法operationComplete被调用时,数据已经被写入socket发送缓存区。此时进行计数累加的结果就是真正的消息发出的总数了(不考虑TCP通道中断的情况下)。

异步的好处显而易见,不让线程阻塞在IO操作上,可以尽可能的利用CPU资源。不过异步并不是“免费午餐”,支持异步实现需要背后高效合理的线程模式设计。这也是下文要分析的内容。

从《Scalable IO in Java》看线程模型

在操作系统支持IO多路复用能力后,针对这种能力,衍生了专门使用其的编程模型,也就是。网络上的翻译都是反应堆模式,但是觉得一点都不达意,也没有找到好的翻译,因此下文就直接称

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>

全部评论

相关推荐

评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务