前提

最近一直在看Netty相关的内容,也在编写一个轻量级的RPC框架来练手,途中发现了Netty的源码有很多亮点,某些实现甚至可以用苛刻来形容。另外,Netty提供的工具类也是相当优秀,可以开箱即用。这里分析一下个人比较喜欢的领域,并发方面的一个Netty工具模块 - Promise

环境版本:

  • Netty:4.1.44.Final
  • JDK1.8

Promise简介

Promise,中文翻译为承诺或者许诺,含义是人与人之间,一个人对另一个人所说的具有一定憧憬的话,一般是可以实现的。

io.netty.util.concurrent.Promise在注释中只有一句话:特殊的可写的io.netty.util.concurrent.FuturePromise接口是io.netty.util.concurrent.Future的子接口)。而io.netty.util.concurrent.Futurejava.util.concurrent.Future的扩展,表示一个异步操作的结果。我们知道,JDK并发包中的Future是不可写,也没有提供可监听的入口(没有应用观察者模式),而Promise很好地弥补了这两个问题。另一方面从继承关系来看,DefaultPromise是这些接口的最终实现类,所以分析源码的时候需要把重心放在DefaultPromise类。一般一个模块提供的功能都由接口定义,这里分析一下两个接口的功能列表:

  • io.netty.util.concurrent.Promise
  • io.netty.util.concurrent.Future

先看io.netty.util.concurrent.Future接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public interface Future<V> extends java.util.concurrent.Future<V> {

// I/O操作是否执行成功
boolean isSuccess();

// 标记是否可以通过下面的cancel(boolean mayInterruptIfRunning)取消I/O操作
boolean isCancellable();

// 返回I/O操作的异常实例 - 如果I/O操作本身是成功的,此方法返回null
Throwable cause();

// 为当前Future实例添加监听Future操作完成的监听器 - isDone()方法激活之后所有监听器实例会得到回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 为当前Future移除监听Future操作完成的监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

// 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),响应中断
Future<V> sync() throws InterruptedException;

// 同步等待Future完成得到最终结果(成功)或者抛出异常(失败),不响应中断
Future<V> syncUninterruptibly();

// 等待Future完成,响应中断
Future<V> await() throws InterruptedException;

// 等待Future完成,不响应中断
Future<V> awaitUninterruptibly();

// 带超时时限的等待Future完成,响应中断
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;

// 带超时时限的等待Future完成,不响应中断
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);

// 非阻塞马上返回Future的结果,如果Future未完成,此方法一定返回null;有些场景下如果Future成功获取到的结果是null则需要二次检查isDone()方法是否为true
V getNow();

// 取消当前Future实例的执行,如果取消成功会抛出CancellationException异常
@Override
boolean cancel(boolean mayInterruptIfRunning);
}

sync()await()方法类似,只是sync()会检查异常执行的情况,一旦发现执行异常马上把异常实例包装抛出,而await()方法对异常无感知。

接着看io.netty.util.concurrent.Promise接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public interface Promise<V> extends Future<V> {

// 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
Promise<V> setSuccess(V result);

// 标记当前Future成功,设置结果,如果设置成功,则通知所有的监听器并且返回true,否则返回false
boolean trySuccess(V result);

// 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器,如果Future已经成功或者失败,则抛出IllegalStateException
Promise<V> setFailure(Throwable cause);

// 标记当前Future失败,设置结果为异常实例,如果设置成功,则通知所有的监听器并且返回true,否则返回false
boolean tryFailure(Throwable cause);

// 标记当前的Promise实例为不可取消,设置成功返回true,否则返回false
boolean setUncancellable();

// 下面的方法和io.netty.util.concurrent.Future中的方法基本一致,只是修改了返回类型为Promise

@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;

@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;

@Override
Promise<V> syncUninterruptibly();
}

到此,Promise接口的所有功能都分析完毕,接下来从源码角度详细分析Promise的实现。

Promise源码实现

Promise的实现类为io.netty.util.concurrent.DefaultPromise(其实DefaultPromise还有很多子类,某些实现是为了定制特定的场景做了扩展),而DefaultPromise继承自io.netty.util.concurrent.AbstractFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public abstract class AbstractFuture<V> implements Future<V> {

// 永久阻塞等待获取结果的方法
@Override
public V get() throws InterruptedException, ExecutionException {
// 调用响应中断的永久等待方法进行阻塞
await();
// 从永久阻塞中唤醒后,先判断Future是否执行异常
Throwable cause = cause();
if (cause == null) {
// 异常为空说明执行成功,调用getNow()方法返回结果
return getNow();
}
// 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
throw new ExecutionException(cause);
}

// 带超时阻塞等待获取结果的方法
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 调用响应中断的带超时时限等待方法进行阻塞
if (await(timeout, unit)) {
// 从带超时时限阻塞中唤醒后,先判断Future是否执行异常
Throwable cause = cause();
if (cause == null) {
// 异常为空说明执行成功,调用getNow()方法返回结果
return getNow();
}
// 异常为空不为空,这里区分特定的取消异常则转换为CancellationException抛出
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 在非等待超时的前提下,非取消异常的其他所有异常都被包装为执行异常ExecutionException抛出
throw new ExecutionException(cause);
}
// 方法步入此处说明等待超时,则抛出超时异常TimeoutException
throw new TimeoutException();
}
}

AbstractFuture仅仅对get()get(long timeout, TimeUnit unit)两个方法进行了实现,其实这两处的实现和java.util.concurrent.FutureTask中的实现方式十分相似。

DefaultPromise的源码比较多,这里分开多个部分去阅读,先看它的属性和构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

// 正常日志的日志句柄,InternalLogger是Netty内部封装的日志接口
private static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultPromise.class);

// 任务拒绝执行时候的日志句柄 - Promise需要作为一个任务提交到线程中执行,如果任务拒绝则使用此日志句柄打印日志
private static final InternalLogger rejectedExecutionLogger =
InternalLoggerFactory.getInstance(DefaultPromise.class.getName() + ".rejectedExecution");

// 监听器的最大栈深度,默认值为8,这个值是防止嵌套回调调用的时候栈深度过大导致内存溢出,后面会举个例子说明它的用法
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));

// 结果更新器,用于CAS更新结果result的值
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");

// 用于填充result的值,当设置结果result传入null,Promise执行成功,用这个值去表示成功的结果
private static final Object SUCCESS = new Object();

// 用于填充result的值,表示Promise不能被取消
private static final Object UNCANCELLABLE = new Object();

// CancellationException实例的持有器,用于判断Promise取消状态和抛出CancellationException
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));

// CANCELLATION_CAUSE_HOLDER的异常栈信息元素数组
private static final StackTraceElement[] CANCELLATION_STACK = CANCELLATION_CAUSE_HOLDER.cause.getStackTrace();

// 真正的结果对象,使用Object类型,最终有可能为null、真正的结果实例、SUCCESS、UNCANCELLABLE或者CANCELLATION_CAUSE_HOLDER等等
private volatile Object result;

// 事件执行器,这里暂时不做展开,可以理解为单个调度线程
private final EventExecutor executor;

// 监听器集合,可能是单个GenericFutureListener实例或者DefaultFutureListeners(监听器集合)实例
private Object listeners;

// 等待获取结果的线程数量
private short waiters;

// 标记是否正在回调监听器
private boolean notifyingListeners;

// 构造函数依赖于EventExecutor
public DefaultPromise(EventExecutor executor) {
this.executor = checkNotNull(executor, "executor");
}

protected DefaultPromise() {
// only for subclasses - 这个构造函数预留给子类
executor = null;
}

// ... 省略其他代码 ...

// 私有静态内部类,用于存放Throwable实例,也就是持有异常的原因实例
private static final class CauseHolder {
final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}

// 私有静态内部类,用于覆盖CancellationException的栈信息为前面定义的CANCELLATION_STACK,同时覆盖了toString()返回CancellationException的全类名
private static final class LeanCancellationException extends CancellationException {
private static final long serialVersionUID = 2794674970981187807L;

@Override
public Throwable fillInStackTrace() {
setStackTrace(CANCELLATION_STACK);
return this;
}

@Override
public String toString() {
return CancellationException.class.getName();
}
}
// ... 省略其他代码 ...
}

Promise目前支持两种类型的监听器:

  • GenericFutureListener:支持泛型的Future监听器。
  • GenericProgressiveFutureListener:它是GenericFutureListener的子类,支持进度表示和支持泛型的Future监听器(有些场景需要多个步骤实现,类似于进度条那样)。
1
2
3
4
5
6
7
8
9
10
11
// GenericFutureListener
public interface GenericFutureListener<F extends Future<?>> extends EventListener {

void operationComplete(F future) throws Exception;
}

// GenericProgressiveFutureListener
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {

void operationProgressed(F future, long progress, long total) throws Exception;
}

为了让Promise支持多个监听器,Netty添加了一个默认修饰符修饰的DefaultFutureListeners类用于保存监听器实例数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// DefaultFutureListeners
final class DefaultFutureListeners {

private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners

// 这个构造相对特别,是为了让Promise中的listeners(Object类型)实例由单个GenericFutureListener实例转换为DefaultFutureListeners类型
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}

public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
// 注意这里,每次扩容数组长度是原来的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
// 把当前的GenericFutureListener加入数组中
listeners[size] = l;
// 监听器总数量加1
this.size = size + 1;
// 如果为GenericProgressiveFutureListener,则带进度指示的监听器总数量加1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}

public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
// 计算需要需要移动的监听器的下标
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
// listenersToMove后面的元素全部移动到数组的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 当前监听器总量的最后一个位置设置为null,数量减1
listeners[-- size] = null;
this.size = size;
// 如果监听器是GenericProgressiveFutureListener,则带进度指示的监听器总数量减1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}

// 返回监听器实例数组
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}

// 返回监听器总数量
public int size() {
return size;
}

// 返回带进度指示的监听器总数量
public int progressiveSize() {
return progressiveSize;
}
}

接下来看DefaultPromise的剩余方法实现,笔者觉得DefaultPromise方法实现在代码顺序上是有一定的艺术的。先看几个判断Promise执行状态的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

// ... 省略其他代码 ...

@Override
public boolean setUncancellable() {
// 通过结果更新器CAS更新result为UNCANCELLABLE,期望旧值为null,更新值为UNCANCELLABLE属性,如果成功则返回true
if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
return true;
}
Object result = this.result;
// 步入这里说明result当前值不为null,isDone0()和isCancelled0()都是终态,这里如果命中终态就返回false
//(笔者注:其实可以这样认为,这里result不能为null,如果不为终态,它只能是UNCANCELLABLE属性实例)
return !isDone0(result) || !isCancelled0(result);
}

@Override
public boolean isSuccess() {
Object result = this.result;
// 如果执行成功,则结果不为null,同时不为UNCANCELLABLE,同时不为CauseHolder类型
//(笔者注:其实可以这样认为,Promise为成功,则result只能是一个开发者定义的实例或者SUCCESS属性实例)
return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
}

@Override
public boolean isCancellable() {
// 是否可取消的,result为null说明Promise处于初始化状态尚未执行,则认为可以取消
return result == null;
}

@Override
public Throwable cause() {
// 通过当前result获取Throwable实例
return cause0(result);
}

private Throwable cause0(Object result) {
// result非CauseHolder类型,则直接返回null
if (!(result instanceof CauseHolder)) {
return null;
}
// 如果result为CANCELLATION_CAUSE_HOLDER(静态CancellationException的持有)
if (result == CANCELLATION_CAUSE_HOLDER) {
// 则新建一个自定义LeanCancellationException实例
CancellationException ce = new LeanCancellationException();
// 如果CAS更新结果result为LeanCancellationException新实例则返回
if (RESULT_UPDATER.compareAndSet(this, CANCELLATION_CAUSE_HOLDER, new CauseHolder(ce))) {
return ce;
}
// 走到这里说明了result是非CANCELLATION_CAUSE_HOLDER的自定义CauseHolder实例
result = this.result;
}
// 兜底返回CauseHolder持有的cause
return ((CauseHolder) result).cause;
}

// 静态方法,判断Promise是否为取消,依据是result必须是CauseHolder类型,同时CauseHolder中的cause必须为CancellationException类型或者其子类
private static boolean isCancelled0(Object result) {
return result instanceof CauseHolder && ((CauseHolder) result).cause instanceof CancellationException;
}

// 静态方法,判断Promise是否完成,依据是result不为null同时不为UNCANCELLABLE属性实例
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}

// 判断Promise实例是否取消
@Override
public boolean isCancelled() {
return isCancelled0(result);
}

// 判断Promise实例是否完成
@Override
public boolean isDone() {
return isDone0(result);
}
// ... 省略其他代码 ...
}

接着看监听器的添加和移除方法(这其中也包含了通知监听器的逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

// ... 省略其他代码 ...
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
// 入参非空校验
checkNotNull(listener, "listener");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 添加监听器
addListener0(listener);
}
// 如果Promise实例已经执行完毕,则通知监听器进行回调
if (isDone()) {
notifyListeners();
}
return this;
}

@Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// 入参非空校验
checkNotNull(listeners, "listeners");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 遍历入参数组添加监听器,有空元素直接跳出
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
addListener0(listener);
}
}
// 如果Promise实例已经执行完毕,则通知监听器进行回调
if (isDone()) {
notifyListeners();
}

return this;
}

@Override
public Promise<V> removeListener(final GenericFutureListener<? extends Future<? super V>> listener) {
// 入参非空校验
checkNotNull(listener, "listener");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 移除监听器
removeListener0(listener);
}
return this;
}

@Override
public Promise<V> removeListeners(final GenericFutureListener<? extends Future<? super V>>... listeners) {
// 入参非空校验
checkNotNull(listeners, "listeners");
// 加锁,锁定的对象是Promise实例自身
synchronized (this) {
// 遍历入参数组移除监听器,有空元素直接跳出
for (GenericFutureListener<? extends Future<? super V>> listener : listeners) {
if (listener == null) {
break;
}
removeListener0(listener);
}
}
return this;
}

private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 如果Promise实例持有listeners为null,则直接设置为入参listener
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 如果当前Promise实例持有listeners的是DefaultFutureListeners类型,则调用它的add()方法进行添加
((DefaultFutureListeners) listeners).add(listener);
} else {
// 步入这里说明当前Promise实例持有listeners为单个GenericFutureListener实例,需要转换为DefaultFutureListeners实例
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}

private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
// 如果当前Promise实例持有listeners的是DefaultFutureListeners类型,则调用它的remove()方法进行移除
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
// 如果当前Promise实例持有listeners不为DefaultFutureListeners类型,也就是单个GenericFutureListener并且和传入的listener相同,
// 则Promise实例持有listeners置为null
listeners = null;
}
}

private void notifyListeners() {
EventExecutor executor = executor();
// 当前执行线程是事件循环线程,那么直接同步调用,简单来说就是调用notifyListeners()方法的线程和EventExecutor是同一个线程
if (executor.inEventLoop()) {
// 下面的ThreadLocal和listenerStackDepth是调用栈深度保护相关,博文会另起一个章节专门讲解这个问题,这里可以暂时忽略
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 当前执行线程不是事件循环线程,则把notifyListenersNow()包装为Runnable实例放到EventExecutor中执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

// 使用EventExecutor进行任务执行,execute()方法抛出的异常会使用rejectedExecutionLogger句柄打印
private static void safeExecute(EventExecutor executor, Runnable task) {
try {
executor.execute(task);
} catch (Throwable t) {
rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
}
}

// 马上通知所有监听器进行回调
private void notifyListenersNow() {
Object listeners;
// 这里加锁,在锁的保护下设置notifyingListeners的值,如果多个线程调用同一个Promise实例的notifyListenersNow()方法
// 命中notifyingListeners的线程可以直接返回
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
// 临时变量listeners存放瞬时的监听器实例,方便下一步设置Promise实例的listeners为null
listeners = this.listeners;
// 重置当前Promise实例的listeners为null
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
// 多个监听器情况下的通知
notifyListeners0((DefaultFutureListeners) listeners);
} else {
// 单个监听器情况下的通知
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 这里因为没有异常抛出的可能,不用在finally块中编写,重置notifyingListeners为false并且返回跳出循环
notifyingListeners = false;
return;
}
// 临时变量listeners存放瞬时的监听器实例,回调操作判断是基于临时实例去做 - 这里可能由另一个线程更新了listeners的值
listeners = this.listeners;
// 重置当前Promise实例的listeners为null,确保监听器只会被回调一次,下一次跳出for死循环
this.listeners = null;
}
}
}

// 遍历DefaultFutureListeners中的listeners数组,调用静态方法notifyListener0()
private void notifyListeners0(DefaultFutureListeners listeners) {
GenericFutureListener<?>[] a = listeners.listeners();
int size = listeners.size();
for (int i = 0; i < size; i ++) {
notifyListener0(this, a[i]);
}
}

// 这个静态方法是最终监听器回调的方法,也就是简单调用GenericFutureListener#operationComplete()传入的是当前的Promise实例,捕获一切异常打印warn日志
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
}

然后看wait()sync()方法体系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

// ... 省略其他代码 ...

@Override
public Promise<V> await() throws InterruptedException {
// 如果Promise执行完毕,直接返回
if (isDone()) {
return this;
}
// 如果当前线程中断则直接抛出InterruptedException
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 加锁,加锁对象是当前Promise实例
synchronized (this) {
// 这里设置一个死循环,终止条件是isDone()为true
while (!isDone()) {
// 等待线程数加1
incWaiters();
try {
// 这里调用的是Object#wait()方法进行阻塞,如果线程被中断会抛出InterruptedException
wait();
} finally {
// 解除阻塞后等待线程数减1
decWaiters();
}
}
}
return this;
}

@Override
public Promise<V> awaitUninterruptibly() {
// 如果Promise执行完毕,直接返回
if (isDone()) {
return this;
}
// 死锁检测
checkDeadLock();
boolean interrupted = false;
// 加锁,加锁对象是当前Promise实例
synchronized (this) {
// 这里设置一个死循环,终止条件是isDone()为true
while (!isDone()) {
// 等待线程数加1
incWaiters();
try {
// 这里调用的是Object#wait()方法进行阻塞,捕获了InterruptedException异常,如果抛出InterruptedException记录线程的中断状态到interrupted
wait();
} catch (InterruptedException e) {
// Interrupted while waiting.
interrupted = true;
} finally {
// 解除阻塞后等待线程数减1
decWaiters();
}
}
}
// 如果线程被中断跳出等待阻塞,则清除线程的中断标志位
if (interrupted) {
Thread.currentThread().interrupt();
}
return this;
}

// 后面的几个带超时时限的wait()方法都是调用await0()

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return await0(unit.toNanos(timeout), true);
}

@Override
public boolean await(long timeoutMillis) throws InterruptedException {
return await0(MILLISECONDS.toNanos(timeoutMillis), true);
}

@Override
public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
try {
return await0(unit.toNanos(timeout), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}

@Override
public boolean awaitUninterruptibly(long timeoutMillis) {
try {
return await0(MILLISECONDS.toNanos(timeoutMillis), false);
} catch (InterruptedException e) {
// Should not be raised at all.
throw new InternalError();
}
}

// 检查死锁,这里判断了等待线程是事件循环线程则直接抛出BlockingOperationException异常
// 简单来说就是:Promise的执行线程和等待结果的线程,不能是同一个线程,否则依赖会成环
protected void checkDeadLock() {
EventExecutor e = executor();
if (e != null && e.inEventLoop()) {
throw new BlockingOperationException(toString());
}
}

@Override
public Promise<V> sync() throws InterruptedException {
// 同步永久阻塞等待
await();
// 阻塞等待解除,如果执行存在异常,则直接抛出
rethrowIfFailed();
return this;
}

@Override
public Promise<V> syncUninterruptibly() {
// 同步永久阻塞等待 - 响应中断
awaitUninterruptibly();
// 塞等待解除,如果执行存在异常,则直接抛出
rethrowIfFailed();
return this;
}

// waiters加1,如果超过Short.MAX_VALUE则抛出IllegalStateException
private void incWaiters() {
if (waiters == Short.MAX_VALUE) {
throw new IllegalStateException("too many waiters: " + this);
}
++waiters;
}

// waiters减1
private void decWaiters() {
--waiters;
}

// cause不为null则抛出
private void rethrowIfFailed() {
Throwable cause = cause();
if (cause == null) {
return;
}
PlatformDependent.throwException(cause);
}

private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
// 如果Promise执行完毕,直接返回
if (isDone()) {
return true;
}
// 如果超时时限小于0那么返回isDone()的结果
if (timeoutNanos <= 0) {
return isDone();
}
// 如果允许中断,当前线程的中断标志位为true,则抛出InterruptedException
if (interruptable && Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死锁检测
checkDeadLock();
// 记录当前的纳秒时间戳
long startTime = System.nanoTime();
// 等待时间的长度 - 单位为纳秒
long waitTime = timeoutNanos;
// 记录线程是否被中断
boolean interrupted = false;
try {
// 死循环
for (;;) {
synchronized (this) {
// 如果Promise执行完毕,直接返回true - 这一步是先验判断,命中了就不需要阻塞等待
if (isDone()) {
return true;
}
// 等待线程数加1
incWaiters();
try {
// 这里调用的是带超时时限的Object#wait()方法进行阻塞
wait(waitTime / 1000000, (int) (waitTime % 1000000));
} catch (InterruptedException e) {
// 线程被中断并且外部允许中断,那么直接抛出InterruptedException
if (interruptable) {
throw e;
} else {
// 否则只记录中断过的状态
interrupted = true;
}
} finally {
// 解除阻塞后等待线程数减1
decWaiters();
}
}
// 解除阻塞后,如果Promise执行完毕,直接返回true
if (isDone()) {
return true;
} else {
// 步入这里说明Promise尚未执行完毕,则重新计算等待时间间隔的长度数量(修正),如果大于0则进入下一轮循环
waitTime = timeoutNanos - (System.nanoTime() - startTime);
if (waitTime <= 0) {
return isDone();
}
}
}
} finally {
// 如果线程被中断跳出等待阻塞,则清除线程的中断标志位
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
// ... 省略其他代码 ...
}

最后是几个设置结果和获取结果的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {

// ... 省略其他代码 ...
@Override
public Promise<V> setSuccess(V result) {
// 设置成功结果,如果设置成功则返回当前Promise实例
if (setSuccess0(result)) {
return this;
}
// 设置失败说明了多次设置,Promise已经执行完毕,则抛出异常
throw new IllegalStateException("complete already: " + this);
}

@Override
public boolean trySuccess(V result) {
// 设置成功结果,返回的布尔值表示成功或失败
return setSuccess0(result);
}

@Override
public Promise<V> setFailure(Throwable cause) {
// 设置失败结果,如果设置成功则返回当前Promise实例
if (setFailure0(cause)) {
return this;
}
// 设置失败说明了多次设置,Promise已经执行完毕,则抛出异常
throw new IllegalStateException("complete already: " + this, cause);
}

@Override
public boolean tryFailure(Throwable cause) {
// 设置失败结果,返回的布尔值表示成功或失败
return setFailure0(cause);
}

@SuppressWarnings("unchecked")
@Override
public V getNow() {
// 非阻塞获取结果,如果result是CauseHolder类型、SUCCESS属性实例或者UNCANCELLABLE实行实例则返回null,否则返回转换类型后的result值
// 对异常无感知,如果CauseHolder包裹了异常,此方法依然返回null
Object result = this.result;
if (result instanceof CauseHolder || result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
return (V) result;
}

@SuppressWarnings("unchecked")
@Override
public V get() throws InterruptedException, ExecutionException {
// 永久阻塞获取结果
Object result = this.result;
// 如果Promise未执行完毕则进行永久阻塞等待
if (!isDone0(result)) {
await();
// 更新结果临时变量
result = this.result;
}
// result为SUCCESS属性实例或者UNCANCELLABLE属性实例的时候直接返回null
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 如果result为CauseHolder类型,则获取其中持有的cause属性,也有可能为null
Throwable cause = cause0(result);
if (cause == null) {
// 执行成功的前提下转换类型后的result值返回
return (V) result;
}
// 取消的情况,抛出CancellationException
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 剩余的情况一律封装为ExecutionException异常
throw new ExecutionException(cause);
}

@SuppressWarnings("unchecked")
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
// 带超时时限的阻塞获取结果
Object result = this.result;
// 如果Promise未执行完毕则进行带超时时限的阻塞等待
if (!isDone0(result)) {
if (!await(timeout, unit)) {
// 等待超时直接抛出TimeoutException
throw new TimeoutException();
}
// 更新结果临时变量
result = this.result;
}
// result为SUCCESS属性实例或者UNCANCELLABLE属性实例的时候直接返回null
if (result == SUCCESS || result == UNCANCELLABLE) {
return null;
}
// 如果result为CauseHolder类型,则获取其中持有的cause属性,也有可能为null
Throwable cause = cause0(result);
if (cause == null) {
// 执行成功的前提下转换类型后的result值返回
return (V) result;
}
// 取消的情况,抛出CancellationException
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
// 剩余的情况一律封装为ExecutionException异常
throw new ExecutionException(cause);
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// CAS更新result为CANCELLATION_CAUSE_HOLDER,result的期望值必须为null
if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) {
// 判断是否需要进行等待线程的通知
if (checkNotifyWaiters()) {
// 通知监听器进行回调
notifyListeners();
}
return true;
}
return false;
}

private boolean setSuccess0(V result) {
// 设置执行成功的结果,如果入参result为null,则选用SUCCESS属性,否则使用result
return setValue0(result == null ? SUCCESS : result);
}

private boolean setFailure0(Throwable cause) {
// 设置执行失败的结果,入参是Throwable类型,封装为CauseHolder,存放在CauseHolder实例的cause属性
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}

private boolean setValue0(Object objResult) {
// CAS更新result为入参objResult,result的期望值必须为null或者UNCANCELLABLE才能更新成功
if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 判断是否需要进行等待线程的通知
if (checkNotifyWaiters()) {
// 通知监听器进行回调
notifyListeners();
}
return true;
}
return false;
}

// 判断是否需要进行等待线程的通知 - 其实是判断是否需要通知监听器回调
private synchronized boolean checkNotifyWaiters() {
// 如果等待线程数量大于0则调用Object#notifyAll()唤醒所有等待线程
if (waiters > 0) {
notifyAll();
}
// 如果listeners不为空(也就是存在监听器)的时候才返回true
return listeners != null;
}
// ... 省略其他代码 ...
}

Promise的基本使用

要使用NettyPromise模块,并不需要引入Netty的所有依赖,这里只需要引入netty-common

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.44.Final</version>
</dependency>

EventExecutor选取方面,Netty已经准备了一个GlobalEventExecutor用于全局事件处理,这里可以直接选用(当然也可以自行实现EventExecutor或者用EventExecutor的其他实现类):

1
2
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<String> promise = new DefaultPromise<>(executor);

这里设计一个场景:异步下载一个链接的资源到磁盘上,下载完成之后需要异步通知下载完的磁盘文件路径,得到通知之后打印下载结果到控制台中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class PromiseMain {

public static void main(String[] args) throws Exception {
String url = "http://xxx.yyy.zzz";
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<DownloadResult> promise = new DefaultPromise<>(executor);
promise.addListener(new DownloadResultListener());
Thread thread = new Thread(() -> {
try {
System.out.println("开始下载资源,url:" + url);
long start = System.currentTimeMillis();
// 模拟下载耗时
Thread.sleep(2000);
String location = "C:\\xxx\\yyy\\z.md";
long cost = System.currentTimeMillis() - start;
System.out.println(String.format("下载资源成功,url:%s,保存到:%s,耗时:%d ms", url, location, cost));
DownloadResult result = new DownloadResult();
result.setUrl(url);
result.setFileDiskLocation(location);
result.setCost(cost);
// 通知结果
promise.setSuccess(result);
} catch (Exception ignore) {

}
}, "Download-Thread");
thread.start();
Thread.sleep(Long.MAX_VALUE);
}

@Data
private static class DownloadResult {

private String url;

private String fileDiskLocation;

private long cost;
}

private static class DownloadResultListener implements GenericFutureListener<Future<DownloadResult>> {

@Override
public void operationComplete(Future<DownloadResult> future) throws Exception {
if (future.isSuccess()) {
DownloadResult downloadResult = future.getNow();
System.out.println(String.format("下载完成通知,url:%s,文件磁盘路径:%s,耗时:%d ms", downloadResult.getUrl(),
downloadResult.getFileDiskLocation(), downloadResult.getCost()));
}
}
}
}

执行后控制台输出:

1
2
3
开始下载资源,url:http://xxx.yyy.zzz
下载资源成功,url:http://xxx.yyy.zzz,保存到:C:\xxx\yyy\z.md,耗时:2000 ms
下载完成通知,url:http://xxx.yyy.zzz,文件磁盘路径:C:\xxx\yyy\z.md,耗时:2000 ms

Promise适用的场景很多,除了异步通知的场景也能用于同步调用,它在设计上比JUCFuture灵活很多,基于Future扩展出很多新的特性,有需要的可以单独引入此依赖直接使用。

Promise监听器栈深度的问题

有些时候,由于封装或者人为编码异常等原因,监听器的回调可能出现基于多个Promise形成的链(参考Issue-5302a promise listener chain),这样子有可能出现递归调用深度过大而导致栈溢出,因此需要设置一个阈值,限制递归调用的最大栈深度,这个深度阈值暂且称为栈深度保护阈值,默认值是8,可以通过系统参数io.netty.defaultPromise.maxListenerStackDepth覆盖设置。这里贴出前面提到过的代码块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void notifyListeners() {
EventExecutor executor = executor();
// 事件执行器必须是事件循环类型,也就是executor.inEventLoop()为true的时候才启用递归栈深度保护
if (executor.inEventLoop()) {
// 获取当前线程绑定的InternalThreadLocalMap实例,这里类似于ThreadLocal
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
// 获取当前线程的监听器调用栈深度
final int stackDepth = threadLocals.futureListenerStackDepth();
// 监听器调用栈深度如果不超过阈值MAX_LISTENER_STACK_DEPTH
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
// 调用notifyListenersNow()前先设置监听器调用栈深度 + 1
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
// 调用notifyListenersNow()完毕后设置监听器调用栈深度为调用前的数值,也就是恢复线程的监听器调用栈深度
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
// 如果监听器调用栈深度超过阈值MAX_LISTENER_STACK_DEPTH,则直接每次通知监听器当成一个新的异步任务处理
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

如果我们想模拟一个例子触发监听器调用栈深度保护,那么只需要想办法在同一个EventLoop类型的线程中递归调用notifyListeners()方法即可。

最典型的例子就是在上一个Promise监听器回调的方法里面触发下一个Promise的监听器的setSuccess()(简单理解就是套娃),画个图理解一下:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class PromiseListenerMain {

private static final AtomicInteger COUNTER = new AtomicInteger(0);

public static void main(String[] args) throws Exception {
EventExecutor executor = ImmediateEventExecutor.INSTANCE;
// root
Promise<String> root = new DefaultPromise<>(executor);
Promise<String> p1 = new DefaultPromise<>(executor);
Promise<String> p2 = new DefaultPromise<>(executor);
Promise<String> p3 = new DefaultPromise<>(executor);
Promise<String> p4 = new DefaultPromise<>(executor);
Promise<String> p5 = new DefaultPromise<>(executor);
Promise<String> p6 = new DefaultPromise<>(executor);
Promise<String> p7 = new DefaultPromise<>(executor);
Promise<String> p8 = new DefaultPromise<>(executor);
Promise<String> p9 = new DefaultPromise<>(executor);
Promise<String> p10 = new DefaultPromise<>(executor);
p1.addListener(new Listener(p2));
p2.addListener(new Listener(p3));
p3.addListener(new Listener(p4));
p4.addListener(new Listener(p5));
p5.addListener(new Listener(p6));
p6.addListener(new Listener(p7));
p7.addListener(new Listener(p8));
p8.addListener(new Listener(p9));
p9.addListener(new Listener(p10));
root.addListener(new Listener(p1));
root.setSuccess("success");
Thread.sleep(Long.MAX_VALUE);
}

private static class Listener implements GenericFutureListener<Future<String>> {

private final String name;
private final Promise<String> promise;

public Listener(Promise<String> promise) {
this.name = "listener-" + COUNTER.getAndIncrement();
this.promise = promise;
}

@Override
public void operationComplete(Future<String> future) throws Exception {
System.out.println(String.format("监听器[%s]回调成功...", name));
if (null != promise) {
promise.setSuccess("success");
}
}
}
}

因为有safeExecute()兜底执行,上面的所有Promise都会回调,这里可以采用IDEA的高级断点功能,在步入断点的地方添加额外的日志,输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-9]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-0]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-1]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-2]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-3]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-4]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-5]回调成功...
MAX_LISTENER_STACK_DEPTH(notifyListenersNow)执行---
监听器[listener-6]回调成功...
safeExecute(notifyListenersNow)执行----------
监听器[listener-7]回调成功...
safeExecute(notifyListenersNow)执行----------
监听器[listener-8]回调成功...

这里笔者有点疑惑,如果调用栈深度大于8,超出的部分会包装为Runnable实例提交到事件执行器执行,岂不是把递归栈溢出的隐患变成了内存溢出的隐患(因为异步任务也有可能积压,除非拒绝任务提交,那么具体要看EventExecutor的实现了)?

小结

Netty提供的Promise工具的源码和使用方式都分析完了,设计理念和代码都是十分值得借鉴,同时能够开箱即用,可以在日常编码中直接引入,减少重复造轮子的劳动和风险。

(本文完 e-a-20200123 c-3-d)


 评论