使用Java NIO编写服务端应用

需求分析

本文将实战如何使用Java NIO编写一个趋向于实际的echo应用。首先是明确这个echo服务器的需求,总结来说有以下几条:

  • 服务器原样返回客户端发送的信息。
  • 客户端发送的信息以'\r'作为一个消息的结尾,一个消息的最大长度不超过128。
  • 客户端可能会一次发送多个消息,服务端需要按照收到的消息的顺序依次回复,不能乱序。
  • 客户端可以在任意时刻关闭通道。
  • 服务端不能主动关闭通道。

代码实战

很少有程序是一蹴而就的,一般都是在满足需求,再反复修改细节得到最终的成品。在这里,我们先以《Java的服务端编程进化史:从BIO到NIO,最后走向AIO》一文中NIO的代码作为基础蓝本进行改造。为了方便区分改造区域,我们将基础蓝本中处理客户端的单独剥离,成为一个独立的类,最后得到的基础代码如下

public class MainDemo
{
    static class ClientProcessor implements Runnable
    {
        private Selector selector;

        public ClientProcessor(Selector selector)
        {
            this.selector = selector;
        }

        @Override
        public void run()
        {
            while (true)
            {
                try
                {
                    selector.select();
                    Set<SelectionKey>      selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator      = selectionKeys.iterator();
                    while (iterator.hasNext())
                    {
                        SelectionKey key = iterator.next();
                        if (key.isValid() == false)
                        {
                            continue;
                        }
                        if (key.isReadable())
                        {//代码①
                            ByteBuffer    buffer        = ByteBuffer.wrap(new byte[16]);
                            SocketChannel clientChannel = (SocketChannel) key.channel();
                            int           read          = clientChannel.read(buffer);
                            if (read == -1)//关闭分支
                            {
                                //通道连接关闭,可以取消这个注册键,后续不在触发。
                                key.cancel();
                                clientChannel.close();
                            }
                            else//读写分支
                            {
                                buffer.flip();
                                clientChannel.write(buffer);
                            }
                        }
                        iterator.remove();
                    }
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) throws IOException, ExecutionException, InterruptedException
    {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
        for (int i = 0; i < selectors.length; i++)
        {
            final Selector selector = Selector.open();
            selectors[i] = selector;
            new Thread(new ClientProcessor(selector)).start();
        }
        AtomicInteger id       = new AtomicInteger();
        Selector      selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true)
        {
            selector.select();
            Set<SelectionKey>      selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator      = selectionKeys.iterator();
            while (iterator.hasNext())
            {
                iterator.next();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);
                iterator.remove();
            }
        }
    }
}

第一次修改

针对需求2:客户端发送的信息以'\r'作为一个消息的结尾,一个消息的最大长度不超过128。这意味着我们不能以定长的形式处理消息了。而且此时我们需要考虑TCP拆包和粘包的可能。

TCP是面向流的协议,其本身无法知道上层协议一个数据包的边界。因此在接受数据的时候,可能会因为一个消息过大而分次填充到Socket缓存区,此时应用读取到数据,感觉就是数据包被拆开了。

而如果TCP在收到数据时,将多个数据包的数据一起读取了一并填充到socket缓存区,此时应用读取到数据,感觉就是多个数据包粘合了。

结合需求2和需求3,我们需要改造的是数据的读取部分内容。主要变动有:

  • 检查每一个字节,确认是否是消息结束符。
  • 累积聚合字节直到一个完整的消息被拆分出来。

按照上述需求改动之后的代码如下:

public class NioDemo
{
        static class ClientProcessor implements Runnable
        {
            private Selector selector;

            public ClientProcessor(Selector selector)
            {
                this.selector = selector;
            }

            @Override
            public void run()
            {
                while (true)
                {
                    try
                    {
                        selector.select();
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        for (SelectionKey each : selectionKeys)
                        {
                            if (each.isValid() == false)
                            {
                                continue;
                            }
                            if (each.isReadable())
                            {
                                SocketChannel clientChannel = (SocketChannel) each.channel();
                                ByteBuffer    readBuffer    = (ByteBuffer) each.attachment();
                                int           read          = clientChannel.read(readBuffer);
                                if (read == -1)
                                {
                                    //通道连接关闭,可以取消这个注册键,后续不在触发。
                                    each.cancel();
                                    clientChannel.close();
                                }
                                else
                                {
                                    //翻转buffer,从写入状态切换到读取状态
                                    readBuffer.flip();
                                    int              position = readBuffer.position();
                                    int              limit    = readBuffer.limit();
                                    List<ByteBuffer> buffers  = new ArrayList<>();
                                    //新增二:按照协议从流中分割出消息
                                    /**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
                                    for (int i = position; i < limit; i++)
                                    {
                                        //读取到消息结束符
                                        if (readBuffer.get(i) == '\r')
                                        {
                                            ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position()+1);
                                            readBuffer.limit(i+1);
                                            message.put(readBuffer);
                                            readBuffer.limit(limit);
                                            message.flip();
                                            buffers.add(message);
                                            readBuffer.limit(limit);
                                        }
                                    }
                                    /**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
                                    /**将所有得到的消息发送出去**/
                                    for (ByteBuffer buffer : buffers)
                                    {
                                        //新增三
                                        while (buffer.hasRemaining())
                                        {
                                            clientChannel.write(buffer);
                                        }
                                    }
                                    /**将所有得到的消息发送出去**/
                                    //新增四:压缩readBuffer,压缩完毕后进入写入状态。并且由于长度是256,压缩之后必然有足够的空间可以写入一条消息
                                    readBuffer.compact();
                                }
                            }
                        }
                        selectionKeys.clear();
                    }
                    catch (IOException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
        }

        public static void main(String[] args) throws IOException, ExecutionException, InterruptedException
        {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(8899));
            final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
            for (int i = 0; i < selectors.length; i++)
            {
                final Selector selector = Selector.open();
   

剩余60%内容,订阅专栏后可继续查看/也可单篇购买

<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>

全部评论

相关推荐

10-30 19:23
已编辑
山东大学(威海) C++
牛至超人:其实简历是不需要事无巨细的写的,让对方知道你有这段经历就行了,最重要的是面试的时候讲细讲明白
点赞 评论 收藏
分享
12-05 18:09
已编辑
广东药科大学 后端工程师
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务