使用Netty编写一个多人聊天程序(中)-服务端实现
前言
前文中我们进行了需求澄清,协议制订,服务端设计,本文将在这些的基础上实现完整的服务端功能。
编解码器实现
消息的收发基础是编解码器。上文对协议的制订,最外围的结构是报文头加报文体的形式。针对这个结构,实现报文分割,我们可以直接接触Netty提供的内嵌支持LengthFieldBasedFrameDecoder进行报文体的长度确定和分割。
报文体中是具体的消息,我们需要根据消息的不同类型来进行具体的区分,这部分就需要自行实现解码器了,自定义解码器的类名制订为handler.CommandDecoder。解码器的核心思路读取第一个字节的协议类型,而后根据不同的协议类型,按照协议读取出对应的字段数据,将这些字段数据组装Command对象,并且向后续的处理器进行传递。整体的代码设计如下
public class CommandDecoder extends ChannelInboundHandlerAdapter
{
private static final Charset CHARSET = Charset.forName("utf8");
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
ByteBuf buf = (ByteBuf) msg;
byte b = buf.readByte();
CommandType commandType = CommandType.value(b);
Command command;
switch (commandType)
{
case REGISTER:
{
String clientName = readString(buf);
command = new RegisterCommand();
((RegisterCommand) command).setRegisterName(clientName);
break;
}
case LOGIN:
{
//...省略相似逻辑代码
}
case SEND_TO_CLIENT:
{
//...省略相似逻辑代码
}
case CREATE_GROUP:
{
//...省略相似逻辑代码
}
case JOIN_GROUP:
{
//...省略相似逻辑代码
}
case SEND_TO_GROUP:
{
//...省略相似逻辑代码
}
case HEART_BEAT:
{
//...省略相似逻辑代码
}
default:
throw new IllegalStateException("Unexpected value: " + commandType);
}
buf.release();
//将二进制协议解析完毕,组装成具体的Command类,传递给后续的处理器
ctx.fireChannelRead(command);
}
private String readString(ByteBuf buf)
{
int length = buf.readInt();
byte[] content = new byte[length];
buf.readBytes(content);
return new String(content, CHARSET);
}
}
在类的功能设计中,遵循“单一职责”这一设计原则,每一个类仅仅只完成自己负责的部分。通过这种方式,有利于降低类之间的耦合,在出错的时候也方便于定位问题区域。CommandDecoder的设计就遵循了这一原则,其只是关注于完成将二进制数据转换为后续处理器能处理的命令对象。
这里有一个地方需要注意,在将二进制数据转换为Command对象后,ByteBuf对象需要进行手动释放。Netty中采用内存池的方式管理分配的内存,其分配的内存的载体就是ByteBuf对象。而ByteBuf在使用完毕后,必须进行释放,才能将其承载的内存空间归还给缓存池。如果不释放的话,实际上就造成了内存泄漏,最终会导致没有内存可用。因此,CommandDecoder在将command对象传递给下一个handler之前,执行buf.release()进行释放。将命令对象传递给下一个处理器,通过调用方法io.netty.channel.ChannelHandlerContext#fireChannelRead来完成。
命令处理器
命令对象被解析出来后,接着就是对命令对象的处理。通讯协议的设计,第一个字节用于表达消息类型,因此存在着很大的扩展性。对于命令处理器来说,不能使用硬编码的方式进行,否则无论后续是修改已经存在的消息格式,还是添加新的消息格式,都需要反复的修改代码,不方便也带来隐患。
在面向对象中,有一个原则叫做“开闭原则”,简单来说,就是代码需要对扩展开放,对修改封闭。显然,如果命令处理器采用硬编码解析命令的话,就违反了这一原则。
在这里,考虑到所有的命令都可以通过命令类型来进行区分,命令处理器实际上可以设计成为一个分发路由的模式。也就是命令处理器本身提供一个统一的接口,每一个具体的命令解析都继承这个接口,实现为一个针对具体命令的命令解析器。而命令处理器在获取到命令之后,根据命令类型,找到对应的命令解析器,将命令分发给其处理,命令处理器本身不执行具体的业务逻辑。
这种设计的好处在于,后续需要扩展或者修改对应的命令格式时,只需要新增或者修改对应的命令解析器,而命令处理器本身不需要修改。下面来看具体的代码实现
public class CommandHandler extends ChannelInboundHandlerAdapter
{
private volatile EnumMap<CommandType, CommandProcessor> processors;
public CommandHandler(EnumMap<CommandType, CommandProcessor> processors)
{
this.processors = processors;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
{
Command command = (Command) msg;
processors.get(command.type()).process(command, ctx);
}
public void setProcessors(EnumMap<CommandType, CommandProcessor> processors)
{
this.processors = processors;
}
static interface CommandProcessor
{
void process(Command command, ChannelHandlerContext ctx);
}
}
通过允许调用set方法,甚至可以实现在运行期更改命令解析器,从而实现动态消息协议处理变更或者消息新增的功能。考虑到允许运行期修改,解析器映射processors要使用volatile关键字进行修饰,确保修改后可见。
此外,服务端收到的每一个命令都需要发送给客户端其对应的响应,这部分响应的处理,也是在各自命令的命令解析器中完成。
注册命令解析器
服务端对注册命令的解析包含两个方面的作用:
- 持久化客户端信息,包含客户端标识和客户端ID。
- 当前客户端状态变更为在线,修改对应的路由表信息,由于此时是新建的客户端,因此只需要更新单聊中的路由信息。
登录命令解析器
服务端对登录命令的解析包含有:
- 该客户端的状态变更为在线,更新对应的路由表信息,包含单聊和群聊路由信息。
单聊消息命令解析器
服务端对单聊消息命令的解析包含有:
- 通过路由表寻找到目的客户端的
SocketChanel对象,按照消息格式,发送数据给对应的客户端
创建群聊命令解析器
服务端对创建群聊命令的解析有:
- 持久化群聊信息,包含群聊标识和群聊ID。
- 将当前的客户端加入到该群聊中。
- 新增一个群聊路由,并且其中包含当前客户端的
SocketChannel对象。
加入群聊命令解析器
服务端对加入群聊命令的解析有:
- 将当前客户端加入到群聊中
- 获取该群聊路由,在其中加入当前客户端的
SocketChannel对象。
发送群聊消息命令解析器
服务端对发送群聊消息命令的解析有:
- 通过路由表寻找目的群聊,组装发送的群聊消息对象,遍历群聊路由,对除了发送者外的客户端的
SocketChannel发送群聊消息。
心跳命令解析器
服务端对心跳命令的解析有:
- 更新该客户端的在线状态超时计时器,确认客户端当前的在线状态。
客户端在线状态维持-心跳检测
消息的接收发送都依赖于路由表,因此对路由表的及时更新就显得很重要。而路由表的维持中,重要的一点就在于客户端的上线状态变更。上线自不用说,客户端发送注册命令或者登录命令就说明了客户端上线。而客户端可以随时关闭链接下线,链接的关闭可能是通过关闭发送tcp消息来关闭,也可能是因为网络中断。
通过判断一定时间内是否有收到客户端消息来判断客户端在线是一个简单有效的方法。对于服务端来说,在内部为每一个客户端维持一个计时器,在超时时间内如果没有收到客户端发送的消息,则判定为客户端离线,在服务端这一侧关闭客户端的SocketChannel对象。对于服务端来说,除了业务需要发送的登录,消息等,没有功能需求需要发送数据时,则可以通过发送心跳消息来通知服务端自己仍然在线。为此,客户端也需要自己维持一个超时计时器,在超时时间内,如果没有发送过消息,则主动发送一个心跳消息。
需要注意的是,为了避免服务端误判,服务端的超时时间需要比客户端的超时时间要长。
心跳检测,也称之为空闲检测,Netty已经为我们提供了内置支持,也就是类io.netty.handler.timeout.IdleStateHandler。该处理器支持对链路上的读操作进行超时跟踪,也支持对写操作进行超时跟踪,也支持同时对两者进行超时跟踪。来看下其构造方法
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit){ //...省略构造方法
}
readerIdleTime、writerIdleTime,allIdleTime分表代表读取空闲超时,写出空闲超时,读或写空闲超时。对于allIdleTime而言,无论读取还是写出,只要超时了都会触发这个方法。
这个类的使用也很简单,将其放在的任意位置均可。当对应的超时事件触发时,则会沿
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>
