【Netty4核心原理⑫】【异步处理双子星 Future 与 Promise】
文章目录
- 一、前言
- 二、异步结果 Future
- 三、异步执行 Promise
- 1. Promise
- 2. ChannelPromise 接口使用示例
- 四、参考内容
一、前言
本系列虽说本意是作为 《Netty4 核心原理》一书的读书笔记,但在实际阅读记录过程中加入了大量个人阅读的理解和内容,因此对书中内容存在大量删改。
本篇涉及内容 :第十章 异步处理双子星 Future 与 Promise
本系列内容基于 Netty 4.1.73.Final 版本,如下:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.73.Final</version></dependency>
系列文章目录:
【Netty4核心原理】【全系列文章目录】
二、异步结果 Future
java.util.concurrent.Future 是 Java 原生 API 提供的接口,用来记录异步执行的状态,Future#get 方法会判断任务是否执行完成,如果完成立即返回执行结果,否则阻塞线程,直到任务完成再返回。
Netty 扩展了 Java 的 Future,在 Future 的基础上扩展了监听器(Listener)接口,通过监听器可以让异步执行更加有效率,不需要通过调用 Future#get 方法来等待异步执行结束,而是通过监听器回调来精确控制异步执行结束时间。io.netty.util.concurrent.Future 代码如下:
/*** 此接口扩展了 java.util.concurrent.Future 接口,为 Netty 中的异步操作结果提供了更多功能和状态管理。* 它允许用户检查操作状态、添加监听器、等待操作完成以及获取操作结果等。** @param <V> 异步操作结果的类型*/
public interface Future<V> extends java.util.concurrent.Future<V> {/*** 检查 I/O 操作是否已成功完成。** @return 如果 I/O 操作成功完成,返回 true;否则返回 false*/boolean isSuccess();/*** 检查操作是否可以通过 {@link #cancel(boolean)} 方法取消。** @return 如果操作可以被取消,返回 true;否则返回 false*/boolean isCancellable();/*** 若 I/O 操作失败,返回导致失败的原因。** @return 操作失败的原因(异常对象)。如果操作成功或此 future 尚未完成,返回 null*/Throwable cause();/*** 为该 future 添加一个指定的监听器。* 当此 future 完成({@linkplain #isDone() done})时,指定的监听器会收到通知。* 如果此 future 已经完成,指定的监听器会立即收到通知。** @param listener 要添加的监听器* @return 返回此 future 实例,方便链式调用*/Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);/*** 为该 future 添加多个指定的监听器。* 当此 future 完成({@linkplain #isDone() done})时,指定的监听器们会收到通知。* 如果此 future 已经完成,指定的监听器们会立即收到通知。** @param listeners 要添加的监听器数组* @return 返回此 future 实例,方便链式调用*/Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);/*** 从该 future 中移除指定监听器的首次出现。* 当此 future 完成({@linkplain #isDone() done})时,指定的监听器将不再收到通知。* 如果指定的监听器未与该 future 关联,此方法将不做任何操作并静默返回。** @param listener 要移除的监听器* @return 返回此 future 实例,方便链式调用*/Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);/*** 从该 future 中移除每个指定监听器的首次出现。* 当此 future 完成({@linkplain #isDone() done})时,指定的监听器们将不再收到通知。* 如果指定的监听器们未与该 future 关联,此方法将不做任何操作并静默返回。** @param listeners 要移除的监听器数组* @return 返回此 future 实例,方便链式调用*/Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);/*** 等待此 future 完成。如果此 future 操作失败,将重新抛出导致失败的原因。* 此方法会响应线程中断,若当前线程被中断,将抛出 InterruptedException。** @return 返回此 future 实例,方便链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/Future<V> sync() throws InterruptedException;/*** 等待此 future 完成。如果此 future 操作失败,将重新抛出导致失败的原因。* 此方法不会响应线程中断,若线程被中断,会忽略中断并继续等待。** @return 返回此 future 实例,方便链式调用*/Future<V> syncUninterruptibly();/*** 等待此 future 完成。* 此方法会响应线程中断,若当前线程被中断,将抛出 InterruptedException。** @return 返回此 future 实例,方便链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/Future<V> await() throws InterruptedException;/*** 等待此 future 完成,且不响应线程中断。* 若线程在等待过程中被中断,此方法会捕获 InterruptedException 并静默丢弃。** @return 返回此 future 实例,方便链式调用*/Future<V> awaitUninterruptibly();/*** 在指定的时间限制内等待此 future 完成。** @param timeout 等待的时间长度* @param unit 时间长度的单位* @return 当且仅当 future 在指定的时间限制内完成时,返回 true;否则返回 false* @throws InterruptedException 如果当前线程在等待过程中被中断*/boolean await(long timeout, TimeUnit unit) throws InterruptedException;/*** 在指定的毫秒时间限制内等待此 future 完成。** @param timeoutMillis 等待的毫秒数* @return 当且仅当 future 在指定的时间限制内完成时,返回 true;否则返回 false* @throws InterruptedException 如果当前线程在等待过程中被中断*/boolean await(long timeoutMillis) throws InterruptedException;/*** 在指定的时间限制内等待此 future 完成,且不响应线程中断。* 若线程在等待过程中被中断,此方法会捕获 InterruptedException 并静默丢弃。** @param timeout 等待的时间长度* @param unit 时间长度的单位* @return 当且仅当 future 在指定的时间限制内完成时,返回 true;否则返回 false*/boolean awaitUninterruptibly(long timeout, TimeUnit unit);/*** 在指定的毫秒时间限制内等待此 future 完成,且不响应线程中断。* 若线程在等待过程中被中断,此方法会捕获 InterruptedException 并静默丢弃。** @param timeoutMillis 等待的毫秒数* @return 当且仅当 future 在指定的时间限制内完成时,返回 true;否则返回 false*/boolean awaitUninterruptibly(long timeoutMillis);/*** 立即返回结果,不会阻塞当前线程。* 如果此 future 尚未完成,将返回 null。* 由于可能使用 null 值来表示 future 成功,因此还需要使用 {@link #isDone()} 方法检查 future 是否真的完成,* 而不能仅依赖返回的 null 值。** @return 如果 future 已完成,返回操作结果;如果未完成,返回 null*/V getNow();/*** 尝试取消此异步操作。* 此方法继承自 java.util.concurrent.Future 接口。* 如果取消成功,将使用 {@link CancellationException} 标记此 future 失败。** @param mayInterruptIfRunning 如果为 true,尝试中断正在执行的操作;如果为 false,允许正在进行的操作完成* @return 如果操作可以被取消(尚未完成),返回 true;否则返回 false*/@Overrideboolean cancel(boolean mayInterruptIfRunning);
}
io.netty.channel.ChannelFuture 接口又扩展了 Netty 的 Future 接口,表示一种没有返回值的异步调用,同时和一个 Channel 进行绑定。io.netty.channel.ChannelFuture 代码如下:
/*** ChannelFuture 接口扩展自 Future<Void> 接口,用于表示与 Channel 相关的异步操作的结果。* 它提供了对 Channel 相关操作的状态检查、监听器管理以及等待操作完成等功能。*/
public interface ChannelFuture extends Future<Void> {/*** 返回与此 future 关联的 I/O 操作所发生的 Channel。** @return 与此 future 关联的 Channel 实例*/Channel channel();/*** 为该 ChannelFuture 添加一个指定的监听器。* 当此 ChannelFuture 完成时,指定的监听器会收到通知。* 如果此 ChannelFuture 已经完成,指定的监听器会立即收到通知。** @param listener 要添加的监听器* @return 返回此 ChannelFuture 实例,方便链式调用*/@OverrideChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);/*** 为该 ChannelFuture 添加多个指定的监听器。* 当此 ChannelFuture 完成时,指定的监听器们会收到通知。* 如果此 ChannelFuture 已经完成,指定的监听器们会立即收到通知。** @param listeners 要添加的监听器数组* @return 返回此 ChannelFuture 实例,方便链式调用*/@OverrideChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);/*** 从该 ChannelFuture 中移除指定监听器的首次出现。* 当此 ChannelFuture 完成时,指定的监听器将不再收到通知。* 如果指定的监听器未与该 ChannelFuture 关联,此方法将不做任何操作并静默返回。** @param listener 要移除的监听器* @return 返回此 ChannelFuture 实例,方便链式调用*/@OverrideChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);/*** 从该 ChannelFuture 中移除每个指定监听器的首次出现。* 当此 ChannelFuture 完成时,指定的监听器们将不再收到通知。* 如果指定的监听器们未与该 ChannelFuture 关联,此方法将不做任何操作并静默返回。** @param listeners 要移除的监听器数组* @return 返回此 ChannelFuture 实例,方便链式调用*/@OverrideChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);/*** 等待此 ChannelFuture 完成。如果此 ChannelFuture 操作失败,将重新抛出导致失败的原因。* 此方法会响应线程中断,若当前线程被中断,将抛出 InterruptedException。** @return 返回此 ChannelFuture 实例,方便链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/@OverrideChannelFuture sync() throws InterruptedException;/*** 等待此 ChannelFuture 完成。如果此 ChannelFuture 操作失败,将重新抛出导致失败的原因。* 此方法不会响应线程中断,若线程被中断,会忽略中断并继续等待。** @return 返回此 ChannelFuture 实例,方便链式调用*/@OverrideChannelFuture syncUninterruptibly();/*** 等待此 ChannelFuture 完成。* 此方法会响应线程中断,若当前线程被中断,将抛出 InterruptedException。** @return 返回此 ChannelFuture 实例,方便链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/@OverrideChannelFuture await() throws InterruptedException;/*** 等待此 ChannelFuture 完成,且不响应线程中断。* 若线程在等待过程中被中断,此方法会捕获 InterruptedException 并静默丢弃。** @return 返回此 ChannelFuture 实例,方便链式调用*/@OverrideChannelFuture awaitUninterruptibly();/*** 判断此 ChannelFuture 是否为一个空的 future,即不允许调用以下任何方法:* <ul>* <li>{@link #addListener(GenericFutureListener)}</li>* <li>{@link #addListeners(GenericFutureListener[])}</li>* <li>{@link #await()}</li>* <li>{@link #await(long, TimeUnit)} ()}</li>* <li>{@link #await(long)} ()}</li>* <li>{@link #awaitUninterruptibly()}</li>* <li>{@link #sync()}</li>* <li>{@link #syncUninterruptibly()}</li>* </ul>** @return 如果此 ChannelFuture 是一个空的 future,返回 true;否则返回 false*/boolean isVoid();
}
三、异步执行 Promise
1. Promise
Promise 接口也是 Future 的扩展接口,它表示一种可写的 Future ,可以自定义设置异步执行的接口,io.netty.util.concurrent.Promise 代码如下:
/*** Promise 接口扩展自 Future 接口,它表示一个异步操作的结果,并提供了设置操作结果(成功或失败)的方法,* 同时也包含了 Future 接口中关于等待操作完成、添加/移除监听器等功能。** @param <V> 异步操作结果的类型*/
public interface Promise<V> extends Future<V> {/*** 将此 future 标记为成功状态,并通知所有已注册的监听器。* 如果此 future 已经处于成功或失败状态,将会抛出 IllegalStateException 异常。** @param result 异步操作的结果* @return 返回此 Promise 实例,方便链式调用* @throws IllegalStateException 如果此 future 已经处于成功或失败状态*/Promise<V> setSuccess(V result);/*** 尝试将此 future 标记为成功状态,并通知所有已注册的监听器。** @param result 异步操作的结果* @return 当且仅当成功将此 future 标记为成功状态时返回 true;* 否则返回 false,因为此 future 已经被标记为成功或失败状态*/boolean trySuccess(V result);/*** 将此 future 标记为失败状态,并通知所有已注册的监听器。* 如果此 future 已经处于成功或失败状态,将会抛出 IllegalStateException 异常。** @param cause 导致操作失败的原因(异常对象)* @return 返回此 Promise 实例,方便链式调用* @throws IllegalStateException 如果此 future 已经处于成功或失败状态*/Promise<V> setFailure(Throwable cause);/*** 尝试将此 future 标记为失败状态,并通知所有已注册的监听器。** @param cause 导致操作失败的原因(异常对象)* @return 当且仅当成功将此 future 标记为失败状态时返回 true;* 否则返回 false,因为此 future 已经被标记为成功或失败状态*/boolean tryFailure(Throwable cause);/*** 将此 future 设置为不可取消状态。** @return 当且仅当成功将此 future 标记为不可取消状态,或者此 future 已经完成且未被取消时返回 true;* 如果此 future 已经被取消,则返回 false*/boolean setUncancellable();/*** 为该 Promise 添加一个指定的监听器。* 当此 Promise 完成时,指定的监听器会收到通知。* 如果此 Promise 已经完成,指定的监听器会立即收到通知。** @param listener 要添加的监听器* @return 返回此 Promise 实例,方便链式调用*/@OverridePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);/*** 为该 Promise 添加多个指定的监听器。* 当此 Promise 完成时,指定的监听器们会收到通知。* 如果此 Promise 已经完成,指定的监听器们会立即收到通知。** @param listeners 要添加的监听器数组* @return 返回此 Promise 实例,方便链式调用*/@OverridePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);/*** 从该 Promise 中移除指定监听器的首次出现。* 当此 Promise 完成时,指定的监听器将不再收到通知。* 如果指定的监听器未与该 Promise 关联,此方法将不做任何操作并静默返回。** @param listener 要移除的监听器* @return 返回此 Promise 实例,方便链式调用*/@OverridePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);/*** 从该 Promise 中移除每个指定监听器的首次出现。* 当此 Promise 完成时,指定的监听器们将不再收到通知。* 如果指定的监听器们未与该 Promise 关联,此方法将不做任何操作并静默返回。** @param listeners 要移除的监听器数组* @return 返回此 Promise 实例,方便链式调用*/@OverridePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);/*** 等待此 Promise 完成。* 此方法会响应线程中断,若当前线程被中断,将抛出 InterruptedException。** @return 返回此 Promise 实例,方便链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/@OverridePromise<V> await() throws InterruptedException;/*** 等待此 Promise 完成,且不响应线程中断。* 若线程在等待过程中被中断,此方法会捕获 InterruptedException 并静默丢弃。** @return 返回此 Promise 实例,方便链式调用*/@OverridePromise<V> awaitUninterruptibly();/*** 等待此 Promise 完成。如果此 Promise 操作失败,将重新抛出导致失败的原因。* 此方法会响应线程中断,若当前线程被中断,将抛出 InterruptedException。** @return 返回此 Promise 实例,方便链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/@OverridePromise<V> sync() throws InterruptedException;/*** 等待此 Promise 完成。如果此 Promise 操作失败,将重新抛出导致失败的原因。* 此方法不会响应线程中断,若线程被中断,会忽略中断并继续等待。** @return 返回此 Promise 实例,方便链式调用*/@OverridePromise<V> syncUninterruptibly();
}
ChannePromise 接口扩展了 Promise 和 ChanneFuture ,绑定了 Channel,既可以写异步执行结果,又具备监听者的功能,是 Netty 中使用的表示异步执行的接口。io.netty.channel.ChannelPromise 代码如下:
/*** ChannelPromise 接口继承自 ChannelFuture 和 Promise<Void> 接口,* 它代表了一个与 Channel 相关的异步操作的可写结果,既可以获取操作结果,也可以设置操作结果。*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {/*** 返回与此 ChannelPromise 关联的 Channel。* 此方法继承自 ChannelFuture 接口,用于明确异步操作所涉及的 Channel 对象。** @return 与此 ChannelPromise 关联的 Channel 实例*/@OverrideChannel channel();/*** 将此 ChannelPromise 标记为成功状态,并设置操作结果为指定的 Void 类型结果。* 若 ChannelPromise 已处于成功或失败状态,会抛出 IllegalStateException 异常。* 此方法继承自 Promise 接口,因结果类型为 Void,通常传入 null。** @param result 异步操作的结果,这里为 Void 类型,一般传入 null* @return 返回此 ChannelPromise 实例,便于链式调用* @throws IllegalStateException 如果 ChannelPromise 已经处于成功或失败状态*/@OverrideChannelPromise setSuccess(Void result);/*** 将此 ChannelPromise 标记为成功状态,不指定具体的结果值。* 这是一个便捷方法,因为结果类型为 Void,无需显式传入结果。* 若 ChannelPromise 已处于成功或失败状态,会抛出 IllegalStateException 异常。** @return 返回此 ChannelPromise 实例,便于链式调用* @throws IllegalStateException 如果 ChannelPromise 已经处于成功或失败状态*/ChannelPromise setSuccess();/*** 尝试将此 ChannelPromise 标记为成功状态。* 若 ChannelPromise 尚未被标记为成功或失败,则标记为成功并返回 true;* 若已经被标记为成功或失败,则不做任何操作并返回 false。** @return 若成功标记为成功状态,返回 true;否则返回 false*/boolean trySuccess();/*** 将此 ChannelPromise 标记为失败状态,并指定导致失败的原因。* 若 ChannelPromise 已处于成功或失败状态,会抛出 IllegalStateException 异常。* 此方法继承自 Promise 接口,用于在操作失败时记录异常信息。** @param cause 导致操作失败的异常对象* @return 返回此 ChannelPromise 实例,便于链式调用* @throws IllegalStateException 如果 ChannelPromise 已经处于成功或失败状态*/@OverrideChannelPromise setFailure(Throwable cause);/*** 为该 ChannelPromise 添加一个指定的监听器。* 当此 ChannelPromise 完成(成功或失败)时,指定的监听器会收到通知。* 如果 ChannelPromise 已经完成,指定的监听器会立即收到通知。* 此方法继承自 Future 接口,用于异步操作完成时的回调处理。** @param listener 要添加的监听器* @return 返回此 ChannelPromise 实例,便于链式调用*/@OverrideChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);/*** 为该 ChannelPromise 添加多个指定的监听器。* 当此 ChannelPromise 完成(成功或失败)时,指定的监听器们会收到通知。* 如果 ChannelPromise 已经完成,指定的监听器们会立即收到通知。* 此方法继承自 Future 接口,用于批量添加监听器。** @param listeners 要添加的监听器数组* @return 返回此 ChannelPromise 实例,便于链式调用*/@OverrideChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);/*** 从该 ChannelPromise 中移除指定监听器的首次出现。* 当此 ChannelPromise 完成(成功或失败)时,指定的监听器将不再收到通知。* 如果指定的监听器未与该 ChannelPromise 关联,此方法将不做任何操作并静默返回。* 此方法继承自 Future 接口,用于移除已添加的监听器。** @param listener 要移除的监听器* @return 返回此 ChannelPromise 实例,便于链式调用*/@OverrideChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);/*** 从该 ChannelPromise 中移除每个指定监听器的首次出现。* 当此 ChannelPromise 完成(成功或失败)时,指定的监听器们将不再收到通知。* 如果指定的监听器们未与该 ChannelPromise 关联,此方法将不做任何操作并静默返回。* 此方法继承自 Future 接口,用于批量移除监听器。** @param listeners 要移除的监听器数组* @return 返回此 ChannelPromise 实例,便于链式调用*/@OverrideChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);/*** 等待此 ChannelPromise 完成。如果操作失败,会重新抛出导致失败的原因。* 此方法会响应线程中断,若当前线程在等待过程中被中断,会抛出 InterruptedException 异常。* 此方法继承自 Future 接口,用于同步等待操作完成。** @return 返回此 ChannelPromise 实例,便于链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/@OverrideChannelPromise sync() throws InterruptedException;/*** 等待此 ChannelPromise 完成。如果操作失败,会重新抛出导致失败的原因。* 此方法不会响应线程中断,若线程在等待过程中被中断,会忽略中断并继续等待。* 此方法继承自 Future 接口,用于不被中断地同步等待操作完成。** @return 返回此 ChannelPromise 实例,便于链式调用*/@OverrideChannelPromise syncUninterruptibly();/*** 等待此 ChannelPromise 完成。* 此方法会响应线程中断,若当前线程在等待过程中被中断,会抛出 InterruptedException 异常。* 此方法继承自 Future 接口,用于异步等待操作完成。** @return 返回此 ChannelPromise 实例,便于链式调用* @throws InterruptedException 如果当前线程在等待过程中被中断*/@OverrideChannelPromise await() throws InterruptedException;/*** 等待此 ChannelPromise 完成,且不响应线程中断。* 若线程在等待过程中被中断,此方法会捕获 InterruptedException 异常并静默丢弃。* 此方法继承自 Future 接口,用于不被中断地异步等待操作完成。** @return 返回此 ChannelPromise 实例,便于链式调用*/@OverrideChannelPromise awaitUninterruptibly();/*** 如果此 ChannelPromise 是一个 "void" Promise(即调用某些方法会受限),则返回一个新的非 "void" ChannelPromise;* 否则返回其自身。* 此方法用于处理 "void" Promise 的特殊情况,确保可以正常使用相关方法。** @return 如果是 "void" Promise,返回新的非 "void" ChannelPromise;否则返回自身*/ChannelPromise unvoid();
}
DefaultChannelPromise 是 ChannePromise 的实现类,他是实际运行时的 Promise 实例。Netty 中使用 DefaultPromise#addListener() 方法来回调异步执行结果(DefaultChannelPromise#addListener会调用父类方法,也就是 DefaultPromise#addListener),DefaultPromise#addListener 方法如下:
@Overridepublic Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {checkNotNull(listener, "listener");synchronized (this) {// 将传入的监听器添加到当前 Promise 对象的监听器集合中addListener0(listener);}// 2. 检查 Promise 是否已完成并通知监听器if (isDone()) {notifyListeners();}return this;}// 根据 listeners 的不同状态,灵活地处理监听器的添加操作,实现了将单个或多个监听器统一管理的功能。这种设计有助于在 Promise 对象完成操作时,能够方便地通知所有已添加的监听器。private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {if (listeners == null) {listeners = listener;} else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener);} else {listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);}}// 判断事件是否已经完成@Overridepublic boolean isDone() {return isDone0(result);}// 如果 result 属性不为空 && 不是 UNCANCELLABLE 状态,则认为异步事件已经执行结束。private static boolean isDone0(Object result) {return result != null && result != UNCANCELLABLE;}// notifyListeners 方法的主要功能是通知所有注册到当前 Promise 对象的监听器,告知它们 Promise 的操作已经完成。为了确保线程安全和避免栈溢出问题,该方法会根据当前线程是否在事件循环中以及调用栈深度进行不同的处理// notifyListeners 方法通过对当前线程状态和调用栈深度的检查,合理地安排通知监听器的操作,避免了在同一线程中因监听器嵌套调用导致的栈溢出问题,同时保证了通知操作的线程安全性和异步执行的特性。private void notifyListeners() {// 调用 executor() 方法获取当前 Promise 对象关联的事件执行器 EventExecutorEventExecutor executor = executor();// 判断当前线程是否在事件循环中if (executor.inEventLoop()) {// 获取线程本地映射和调用栈深度:使用 InternalThreadLocalMap.get() 获取当前线程的本地映射 InternalThreadLocalMap,并从中获取当前的监听器调用栈深度 stackDepth。final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();final int stackDepth = threadLocals.futureListenerStackDepth();// 检查调用栈深度是否超过限制// 如果当前调用栈深度小于最大允许的调用栈深度 MAX_LISTENER_STACK_DEPTH,则将调用栈深度加 1,然后调用 notifyListenersNow() 方法通知所有监听器。在 notifyListenersNow() 方法执行完成后,无论是否发生异常,都将调用栈深度恢复到原来的值。最后,方法返回。if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1);try {notifyListenersNow();} finally {threadLocals.setFutureListenerStackDepth(stackDepth);}return;}}// 若不满足上述条件,使用事件执行器异步执行通知操作 : 如果当前线程不在事件循环中,或者调用栈深度已经达到或超过最大允许深度,调用 safeExecute 方法,将通知监听器的任务封装成一个 Runnable 对象,并交给事件执行器 executor 异步执行。safeExecute 方法会确保任务的执行过程是安全的,例如捕获并处理可能出现的异常。safeExecute(executor, new Runnable() {@Overridepublic void run() {notifyListenersNow();}});}
从上述代码中看到, DefaultPromise 会判断异步任务执行状态,如果执行完成就立即通知监听者,否则加入监听者队列。通知监听者就是找一个线程来执行调用监听的回调函数。再来看监听者的接口,其实就是一个方法,即等待异步任务执行完成后,获得Future 结果,执行回调的逻辑,如下:
public interface GenericFutureListener<F extends Future<?>> extends EventListener {void operationComplete(F future) throws Exception;
}
2. ChannelPromise 接口使用示例
public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 ChannelHandlerch.pipeline().addLast(new SimpleChannelInboundHandler<>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// 处理读取到的消息System.out.println("Received: " + msg);// 向客户端发送响应消息ctx.writeAndFlush("Hello from server!").addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {System.out.println("Message sent successfully");} else {System.out.println("Failed to send message: " + future.cause());}}});}});}});// 连接到服务器ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);// 等待连接操作完成channelFuture.sync();// 获取 ChannelPromiseChannelPromise promise = channelFuture.channel().newPromise();// 设置成功状态promise.setSuccess();// 检查是否成功if (promise.isSuccess()) {System.out.println("Promise set to success");}// 尝试设置失败状态promise.tryFailure(new RuntimeException("Simulated failure"));if (promise.isFailure()) {System.out.println("Promise set to failure: " + promise.cause());}// 等待 Promise 完成promise.awaitUninterruptibly();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}
在上述示例中:
- 创建了一个 Bootstrap 来连接到服务器,并在 ChannelPipeline 中添加了一个 SimpleChannelInboundHandler 来处理读取到的消息。
- 使用
bootstrap.connect
方法发起连接操作,并通过 sync 方法等待连接完成。 - 通过
channelFuture.channel().newPromise()
获取一个 ChannelPromise 实例。 - 使用
promise.setSuccess()
方法将 Promise 设置为成功状态,并通过 isSuccess 方法检查是否成功。 - 使用
promise.tryFailure
方法尝试将 Promise 设置为失败状态,并通过 isFailure 和 cause 方法检查失败原因。 - 使用
promise.awaitUninterruptibly
方法等待 Promise 完成。
通过上述步骤,展示了如何使用 ChannelPromise 接口来管理与 Channel 相关的异步操作的结果。
四、参考内容
- 《Netty4核心原理》
- 豆包