Future接口的线程安全保证:Netty如何扩展实现Future和Promise接口
前言
Netty的大部分用户接口都是异步化的,返回的都是一个ChannelFuture对象。该接口是Netty对JDK中的Future接口扩展而来。和开发者相关比较大的变化是允许添加一个GenericFutureListener监听器,以便在异步任务完成时触发回调任务。
接口的定义比较简单,不过如何保证并发的安全性则是一个值得思考的问题。假定在任务完成的瞬间,addListener方法被调用,回调方法是否一定被触发?下面带着问题来看源码
类层次
首先让我们来看下类层次图。

虽然大部分用户接口代码返回都是ChannelFuture,但是实际上真正生效的是接口ChannelPromise。从Promise接口继承的能力,使得该接口允许设置成功或者失败标识。下面在源码走读中具体来分析。
源码走读
Future
Netty自定义的Future接口,继承自JDK的Future接口,不过实际当中使用到的都是自己定义的方法。方法大致上分为两类:
- 获取结果和进行结果等待的,诸如
sync,await等 - 添加任务回调方法的,诸如
addListener、addListeners
AbstractFuture
该抽象实现主要是提供了2个方法的实现,分别是AbstractFuture#get()和AbstractFuture#get(long, java.util.concurrent.TimeUnit)。以get的源码为例进行分析
public V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
可以看到,get方法只需要组合其他方法的逻辑即可得到结果,其中最为重要的就是await方法。这个方法则由子类去实现。
这也是许多工具类方法的经典设计技巧。通过对部分原子方法的组合,来获得新的功能。而这部分新的,公共的功能,则可以在抽象类中实现,而具体的原子功能,则推迟到子类中完成。这样不同的子类有不同的实现策略,就可以组合出不同的上层功能特性。
Promise
Future接口对于结果内容本身是不可写的。为此,而Promise接口则提供了对于结果对象的写方法,提供了诸如setSuccess,setFailure,trySuccess等方法。从方法名就可以看出,这个接口关注的点就是在于对结果的写入。
DefaultPromise
层次较高的一个默认实现,该实现完整提供了Future和Promise的接口功能。首先来看下该类的几个重要属性。
private volatile Object result;//异步任务结果 private final EventExecutor executor;//产生该Promise对象的EventExecutor。 private Object listeners;//异步任务监听器对象或者对象列表的存储属性 private short waiters;//该异步任务上的等待线程数 private boolean notifyingListeners;//EventExecutor是否正在唤醒任务监听器
这些属性的具体作用和生效机制需要结合后文的方法分析来分析。
sync方法
sync方法是很常用的了,用于等待任务的完成。其内部实现,实际上是委托了另外一个等待方法。代码如下
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
可以看到,具体的等待,是委托给了await方法。
await方法
这个方法,顾名思义,就是执行任务等待,或者说让线程等待任务直到完成。方法体的实现遵循两个大步骤:
- 死锁可能检查
- 使用
synchronized关键字执行线程等待
下面来从代码的角度看下
public Promise<V> await() throws InterruptedException {
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
return this;
}
死锁检查checkDeadLock的实现如下
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}
思想也很简单。因为Promise实例是在EventExecutor被生成出来,而具体的任务也是在这个线程上执行。前文有说过,在Netty的线程模型中,一个EventExecutor是死循环执行自己队列中的任务的。因此此时在这个线程上执行等待,那么其对应的任务永远没有机会获得线程执行了。
死锁检查通过后,就开始使用synchronized关键字进行线程等待。等待区域的代码逻辑很简单。
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
wait();
} finally {
decWaiters();
}
}
}
使用synchronized对当前对象加锁,而后在一个while循环中,检查任务是否完成,如果没有完成,则依靠Object的wait方法让线程进入等待。在等待的前后会分别对等待计数器进行加减。对等待计数有两方面的作用,一方面避免过多的线程在这个对象上执行等待,否则后期唤醒的效率就比较低。另外一个作用,后面任务完成后,执行唤醒时,通过检查等待计数,如果计数为0,就不需要执行notifyAll。
addListener方法
还有一个高频率的用户方法就是对Future,添加任务监听器。代码如下
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//使用本对象为锁,确保对listeners属性操作的并发正确性
synchronized (this) {
addListener0(listener);
}
//如果结果对象已经被设置,则直接触发监听
if (isDone()) {
n
剩余60%内容,订阅专栏后可继续查看/也可单篇购买
<p> 通过本专刊的学习,对网络开发所需掌握的基础理论知识会更加牢固,对网络应用涉及的线程模型,设计模式,高性能架构等更加明确。通过对Netty的源码深入讲解,使得读者对Netty达到“知其然更之所以然”的程度。在遇到一些线上的问题时,具备了扎实理论功底的情况,可以有的放矢而不会显得盲目。 本专刊购买后即可解锁所有章节,故不可以退换哦~ </p> <p> <br /> </p>
