使用Java NIO编写服务端应用
需求分析
本文将实战如何使用Java NIO编写一个趋向于实际的echo应用。首先是明确这个echo服务器的需求,总结来说有以下几条:
- 服务器原样返回客户端发送的信息。
- 客户端发送的信息以'\r'作为一个消息的结尾,一个消息的最大长度不超过128。
- 客户端可能会一次发送多个消息,服务端需要按照收到的消息的顺序依次回复,不能乱序。
- 客户端可以在任意时刻关闭通道。
- 服务端不能主动关闭通道。
代码实战
很少有程序是一蹴而就的,一般都是在满足需求,再反复修改细节得到最终的成品。在这里,我们先以《Java的服务端编程进化史:从BIO到NIO,最后走向AIO》一文中NIO的代码作为基础蓝本进行改造。为了方便区分改造区域,我们将基础蓝本中处理客户端的单独剥离,成为一个独立的类,最后得到的基础代码如下
public class MainDemo
{
static class ClientProcessor implements Runnable
{
private Selector selector;
public ClientProcessor(Selector selector)
{
this.selector = selector;
}
@Override
public void run()
{
while (true)
{
try
{
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext())
{
SelectionKey key = iterator.next();
if (key.isValid() == false)
{
continue;
}
if (key.isReadable())
{//代码①
ByteBuffer buffer = ByteBuffer.wrap(new byte[16]);
SocketChannel clientChannel = (SocketChannel) key.channel();
int read = clientChannel.read(buffer);
if (read == -1)//关闭分支
{
//通道连接关闭,可以取消这个注册键,后续不在触发。
key.cancel();
clientChannel.close();
}
else//读写分支
{
buffer.flip();
clientChannel.write(buffer);
}
}
iterator.remove();
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException
{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < selectors.length; i++)
{
final Selector selector = Selector.open();
selectors[i] = selector;
new Thread(new ClientProcessor(selector)).start();
}
AtomicInteger id = new AtomicInteger();
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true)
{
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext())
{
iterator.next();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.register(selectors[id.getAndIncrement() % selectors.length], SelectionKey.OP_READ);
iterator.remove();
}
}
}
}
第一次修改
针对需求2:客户端发送的信息以'\r'作为一个消息的结尾,一个消息的最大长度不超过128。这意味着我们不能以定长的形式处理消息了。而且此时我们需要考虑TCP拆包和粘包的可能。
TCP是面向流的协议,其本身无法知道上层协议一个数据包的边界。因此在接受数据的时候,可能会因为一个消息过大而分次填充到Socket缓存区,此时应用读取到数据,感觉就是数据包被拆开了。
而如果TCP在收到数据时,将多个数据包的数据一起读取了一并填充到socket缓存区,此时应用读取到数据,感觉就是多个数据包粘合了。
结合需求2和需求3,我们需要改造的是数据的读取部分内容。主要变动有:
- 检查每一个字节,确认是否是消息结束符。
- 累积聚合字节直到一个完整的消息被拆分出来。
按照上述需求改动之后的代码如下:
public class NioDemo
{
static class ClientProcessor implements Runnable
{
private Selector selector;
public ClientProcessor(Selector selector)
{
this.selector = selector;
}
@Override
public void run()
{
while (true)
{
try
{
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
for (SelectionKey each : selectionKeys)
{
if (each.isValid() == false)
{
continue;
}
if (each.isReadable())
{
SocketChannel clientChannel = (SocketChannel) each.channel();
ByteBuffer readBuffer = (ByteBuffer) each.attachment();
int read = clientChannel.read(readBuffer);
if (read == -1)
{
//通道连接关闭,可以取消这个注册键,后续不在触发。
each.cancel();
clientChannel.close();
}
else
{
//翻转buffer,从写入状态切换到读取状态
readBuffer.flip();
int position = readBuffer.position();
int limit = readBuffer.limit();
List<ByteBuffer> buffers = new ArrayList<>();
//新增二:按照协议从流中分割出消息
/**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
for (int i = position; i < limit; i++)
{
//读取到消息结束符
if (readBuffer.get(i) == '\r')
{
ByteBuffer message = ByteBuffer.allocate(i - readBuffer.position()+1);
readBuffer.limit(i+1);
message.put(readBuffer);
readBuffer.limit(limit);
message.flip();
buffers.add(message);
readBuffer.limit(limit);
}
}
/**从readBuffer确认每一个字节,发现分割符则切分出一个消息**/
/**将所有得到的消息发送出去**/
for (ByteBuffer buffer : buffers)
{
//新增三
while (buffer.hasRemaining())
{
clientChannel.write(buffer);
}
}
/**将所有得到的消息发送出去**/
//新增四:压缩readBuffer,压缩完毕后进入写入状态。并且由于长度是256,压缩之后必然有足够的空间可以写入一条消息
readBuffer.compact();
}
}
}
selectionKeys.clear();
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException
{
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8899));
final Selector[] selectors = new Selector[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < selectors.length; i++)
{
final Selector selector = Selector.open();
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
Java岗高薪必备:Netty从入门到实战30讲 文章被收录于专栏
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>


