Promise 和 Future

Promise 和 Future

netty中的I/O操作大部分为异步,netty在Java的Future的基础上封装了Future和Promise

Future

继承java.util.concurrent.Future接口,并且在Future基础上做了如下的强化。
future是read-only的。我们没有办法在Future中改变状态,只能获取状态。进行后续操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public interface Future<V> extends java.util.concurrent.Future<V>{

/**
* 判断I/O是否成功,和Future的isDone()相比,能得到是否真正完成的结果(有可能成功、失败、取消)
*/
boolean isSuccess();

/**
* 是否已经中断
*/
@Override
boolean cancel(boolean mayInterruptIfRunning);

/**
* 方法表示如果I/O操作失败,返回异常信息
* Returns the cause of the failed I/O operation if the I/O operation has
* failed.
*
* @return the cause of the failure.
* {@code null} if succeeded or this future is not
* completed yet.
*/
Throwable cause();

/**
* 用观察者模式对future操作进行更精准的管理调用,如果get(),需要在代码中显示的调用在完成后续操作,如果用Listerner可以通过noify完成后续操作。
* Adds the specified listener to this future. The
* specified listener is notified when this future is
* {@linkplain #isDone() done}. If this future is already
* completed, the specified listener is notified immediately.
*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}

ChannelFuture

ChannelFuture可以通过添加ChannelFutureListener监听器,当I/O操作完成的时候来通知调用。相比于wait()方式也更推荐这种方式来获取结果状态或者执行后续操作。
此外,不建议在ChannelHandler中调用await(),因为ChannelHandler中事件驱动的方法被一个I/O线程调用,可能一直不回完成,那么await()也可能被I/O线程调用,同样会一直block,因此会产生死锁。
另外在,在Future的基础上增加了获取channle的方法

Promise

对比Future,Promise是writeable的,可以修改状态,调用noify执行Future(Promis)的listerner方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public interface Promise<V> extends Future<V> {

//一下是定义了标记了Future状态的方法,有且只能标记一次,
Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();

@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

@Override
Promise<V> await() throws InterruptedException;

@Override
Promise<V> awaitUninterruptibly();

@Override
Promise<V> sync() throws InterruptedException;

@Override
Promise<V> syncUninterruptibly();
}

DefaultPromise

在DefaultChannelPromise中会,改变状态会通知listerner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

@Override
public Promise<V> setSuccess(V result) {
if (setSuccess0(result)) {
//todo notify
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}

private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
...
notifyListenersNow();
return;
...
}
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}

private void notifyListenersNow() {
...
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
...
}
}

eg:在register过程中
AbstactServerBootstrap.initAndRegister
–>register()
–>SingleThreadEventLoop.register(channel)
–>SingleThreadEventLoop.register(new DefaultChannelPromise(channel, this))
–>AbstractUnsafe.register(promise,evenlop)–>AbstractUnsafe.resiter0(promise)–>safeSetSuccess(promise)

在safeSetSuccess调用promise.traSuccess()。notifyPromise中的listerner