EventListener与EventBus
EventListener
JDK
JDK1.1开始就提供EventListener,一个标记接口,源码如下:
/*** A tagging interface that all event listener interfaces must extend.*/
public interface EventListener {
}
JDK提供的java.util.EventObject
:
public class EventObject implements Serializable {protected transient Object source;
}
用户需要定义继承自EventObject的事件对象,然后定义具体的Listener接口继承EventListener。事件源通过添加监听器(addXXXListener方法)手动管理事件的订阅和发布,需要用户手动调用监听器的回调方法,事件管理逻辑往往由开发者编码完成。
典型场景:主要用于早期GUI应用开发(如AWT和Swing),用于组件间的事件通信。
优点:轻量级,易于理解;适合简单场景或单一模块。
缺点:
- 需显式注册和管理事件监听器,手动处理事件流转,容易导致代码复杂度增加;
- 缺乏高级特性,如事件异步处理、条件过滤等。
Spring
@EventListener
当在实现某些特定业务逻辑时,通常可通过发送事件的方式实现解耦,这也是观察者模式的一种体现。从Spring 4.2开始提供注解@EventListner,不再需要单独编写监听器类,只需在Spring Bean方法上标记@EventListener即可。
源码如下:
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EventListener {@AliasFor("classes")Class<?>[] value() default {};@AliasFor("value")Class<?>[] classes() default {};String condition() default "";
}
解读:
- value和classes:作用相同,表示监听的一个或一组事件,用于支持方法中同一个父类的事件;
- condition:支持Spring EL表达式,用来做Event中的变量或者方法判断。
Event的整个生命周期,从Publisher发出,经过applicationContext容器通知到EventListener,都是发生在单个Spring容器中。
事件可直接使用ApplicationEvent,或继承它。源码如下:
public abstract class ApplicationEvent extends java.util.EventObject {// 事件发生时间private final long timestamp;public ApplicationEvent(Object source) {super(source);this.timestamp = System.currentTimeMillis();}public ApplicationEvent(Object source, Clock clock) {super(source);this.timestamp = clock.millis();}
}
示例
多个监听器,监听Account创建,完成不同的业务逻辑。
创建Event事件监听
/*** 账号监听,处理账号创建成功的后续逻辑*/
@Component
public class AccountListener {@EventListener@Asyncpublic void processAccountCreatedEvent1(AccountCreatedEvent event) {// 1. 发送邮件、短信}@EventListener@Order(100)public void processAccountCreatedEvent2(AccountCreatedEvent event) {// 2. 添加积分等,@Order(100)用来设定执行顺序}
}
使用ApplicationEventPublisher发送事件:
@Autowired
private ApplicationEventPublisher publisher;public boolean save(Account account) {// 数据库保存成功if (true) {publisher.publishEvent(new AccountCreatedEvent(account));}return false;
}
一个发布者可对应多个监听者:
@EventListener(value = {AccountCreatedEvent.class, AccountUpdatedEvent.class}, condition = "#event.account.age > 10")
public void processAccountCreatedEvent2(AccountEvent event) {// 业务逻辑
}
监听执行顺序:可使用@Order(100)
来标记事件的执行顺序,异步情况下只保证按顺序将监听器丢入进线程池,具体执行顺序得看线程。
监听异步执行:使用@Async标记即可,前提条件:使用@EnableAsync开启Spring异步。
优点:
- 更加简洁,方法注解即声明监听;
- 支持异步处理,条件过滤,功能更强大;
- 与其他Spring特性(如AOP和事务)集成更好。
缺点:
- 不如ApplicationListener明确(可能难以追踪监听器逻辑);
- 依赖注解,可能不适合部分代码风格偏向接口设计的团队。
ApplicationListener
Spring提供两种事件监听机制:
- 基于@EventListener注解:
- 基于ApplicationListener泛型接口:早期提供,实现onApplicationEvent方法
通过上下文来发布一个事件,监听器收到订阅的事件作相应的处理,Spring提供的方便高效的事件驱动模型。
ApplicationListener源码:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends java.util.EventListener {void onApplicationEvent(E event);default boolean supportsAsyncExecution() {return true;}static <T> ApplicationListener<PayloadApplicationEvent<T>> forPayload(Consumer<T> consumer) {return event -> consumer.accept(event.getPayload());}}
简单使用:
@Component
public class MyEventListener implements ApplicationListener<MyEvent> {@Overridepublic void onApplicationEvent(MyEvent event) {System.out.println("Received event: " + event.getMessage());}
}
优点:
- 明确,适合固定、简单的监听逻辑;
- 提供对所有事件的统一入口,可基于事件类型实现复杂的分发逻辑。
缺点:
- 相对冗长,代码侵入性高(必须实现接口);
- 不支持直接使用条件过滤和异步处理;
- 扩展性和灵活性低。
@EventListener与ApplicationListener
特性 | ApplicationListener | @EventListener |
---|---|---|
定义方式 | 实现ApplicationListener接口 | 使用@EventListener注解 |
写法复杂度 | 必须实现接口 | 注解方式更简洁 |
类型绑定 | 泛型绑定到特定事件类型 | 自动通过方法参数类型推断 |
条件过滤 | 不支持 | 支持(condition属性) |
异步处理 | 需手动实现多线程 | 支持@Async开箱即用 |
事务集成 | 支持,但需手动配置 | 与Spring的事务注解自然集成 |
适用场景 | 简单、固定事件监听逻辑 | 更现代化、复杂事件管理逻辑 |
引入版本 | 早期版本已有 | Spring 4.2+ |
JDK与Spring
特性 | JDK EventListener | Spring @EventListener |
---|---|---|
触发方式 | 显式调用监听器 | 基于事件发布自动触发 |
实现方式 | 通过接口 | 通过注解声明 |
事件管理 | 手动注册、调用 | 自动管理(基于Spring容器) |
适用场景 | 简单GUI或模块内部通信 | 企业级应用、跨模块解耦 |
异步支持 | 不支持 | 支持 |
条件过滤 | 不支持 | 支持 |
事务集成 | 不支持 | 支持 |
复杂性 | 简单,手动管理 | 较高,但更自动化和灵活 |
EventBus
使用异步的方式来发送事件,或触发另外一个动作,即Publish/Subscribe Event。
EventBus有不同的实现框架,一般都指Guava EventBus。
框架
Guava EventBus包路径下:
类之间的关系如:
EventBus组成部分:
- EventBus、AsyncEventBus:事件发送器
- Event:事件承载单元
- SubscriberRegistry:订阅者注册器,将订阅者注册到Event上,即将有注解Subscribe的方法和Event绑定起来
- Dispatcher:事件分发器,将事件的订阅者调用来执行
- Subscriber、SynchronizedSubscriber:订阅者,并发订阅还是同步订阅
原理
EventBus是基于注册监听的方式来运行的,首先需将EventBus实例化,然后才会有事件及监听者:
// 同步
EventBus eventBus = new EventBus();
注册监听者,底层就是将类eventListener中所有注解有Subscribe的方法与其Event对放在一个map中(一个Event可以对应多个Subscribe的方法):
eventBus.register(eventListener);
在高并发的环境下使用AsyncEventBus时,发送事件可能会出现异常,因为它使用的线程池,当线程池的线程不够用时,会拒绝接收任务,就会执行线程池的拒绝策略,如果需要关注是否提交事件成功,就需要将线程池的拒绝策略设为抛出异常,并且try-catch来捕获异常:
try {eventBus.post(new LoginEvent("user", "pass"));
} catch (Exception e) {// log
}
内部的订阅通知模型,无需使用事件+事件listener模型,只有一个事件类。
示例
一个简单的实例,值得一提的是,注册和发布事件,与消费事件不在一个类里:
// 事件类中方法以@Subscribe注解
class EventBusChangeRecorder {@Subscribepublic void recordCustomerChange(ChangeEvent e) {// 业务逻辑System.out.println("事件触发");}
}
// 创建事件总线
EventBus eventBus = new EventBus();
// 注册事件
eventBus.register(new EventBusChangeRecorder());
ChangeEvent event = new ChangeEvent(new EventBusChangeRecorder());
// 发布事件
eventBus.post(event);
// 需要异步执行可使用EventBus的子类AsyncEventBus
另外,EventBus也可作为Spring Bean使用,可被注入:
@Service
public class TestService implements InitializingBean {@Resourceprivate MyListener myListener;@Resourceprivate EventBus eventBus;public void postEvent() {eventBus.post(new LoginEvent("johnny", "success"));}@Overridepublic void afterPropertiesSet() throws Exception {eventBus.register(myListener);}
}
源码
EventBus的register方法:
void register(Object listener) {Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {Class<?> eventType = entry.getKey();Collection<Subscriber> eventMethodsInListener = entry.getValue();CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);if (eventSubscribers == null) {CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);}eventSubscribers.addAll(eventMethodsInListener);}
}
事件发送:执行指定事件类型的订阅者(包含method),从订阅者中获取指定事件的订阅者,然后按照规则(同步、异步)执行指定的方法,如果事件没有监听者,就当作死亡事件来对待。EventBus的post方法:
public void post(Object event) {Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);if (eventSubscribers.hasNext()) {dispatcher.dispatch(event, eventSubscribers);} else if (!(event instanceof DeadEvent)) {// the event had no subscribers and was not itself a DeadEventpost(new DeadEvent(this, event));}
}
EventBus的dispatcher为PerThreadQueuedDispatcher,其dispatch方法如下:
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {checkNotNull(event);checkNotNull(subscribers);Queue<Event> queueForThread = queue.get();queueForThread.offer(new Event(event, subscribers));if (!dispatching.get()) {dispatching.set(true);try {Event nextEvent;while ((nextEvent = queueForThread.poll()) != null) {while (nextEvent.subscribers.hasNext()) {nextEvent.subscribers.next().dispatchEvent(nextEvent.event);}}} finally {dispatching.remove();queue.remove();}}
}
dispatchEvent方法:
// Dispatches event to this subscriber using the proper executor.
final void dispatchEvent(final Object event) {executor.execute((Runnable) () -> {try {invokeSubscriberMethod(event);} catch (InvocationTargetException e) {bus.handleSubscriberException(e.getCause(), context(event));}});
}
execute方法由Executor来执行。EventBus的executor为MoreExecutors.directExecutor()
:
public static Executor directExecutor() {return DirectExecutor.INSTANCE;
}enum DirectExecutor implements Executor {INSTANCE;@Overridepublic void execute(Runnable command) {command.run();}
}
其execute方法直接执行线程的run方法,即同步调用run方法。另外,invokeSubscriberMethod方法如下:
void invokeSubscriberMethod(Object event) throws InvocationTargetException {try {method.invoke(target, checkNotNull(event));} catch (IllegalArgumentException | IllegalAccessException | InvocationTargetException e) {// 省略catch代码}
}
因此,整个执行过程如下:
整个过程都是同步方式执行,EventBus是同步的。
AsyncEventBus
AsyncEventBus是异步EventBus,其dispatcher为LegacyAsyncDispatcher,executor为自己指定的线程池,如
@Configuration
public class ConfigBean {@Beanpublic EventBus executorService() {BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, workQueue);return new AsyncEventBus(executor);}
}
运行流程如下:
AllowConcurrentEvents
Guava提供的注解:
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@ElementTypesAreNonnullByDefault
public @interface AllowConcurrentEvents {
}
注解判断逻辑位于Subscriber.isDeclaredThreadSafe
方法内:
private static boolean isDeclaredThreadSafe(Method method) {return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
此方法被create()
方法调用:
static Subscriber create(EventBus bus, Object listener, Method method) {return isDeclaredThreadSafe(method)? new Subscriber(bus, listener, method): new SynchronizedSubscriber(bus, listener, method);
}
解读:如果订阅者方法上有注解@AllowConcurrentEvents,则返回Subscriber(异步),否则,返回SynchronizedSubscriber(同步)。即没有使用注解AllowConcurrentEvents的订阅者,在并发环境中是串行执行,会影响性能。
SynchronizedSubscriber是同步的,从其类名可知,从其实现的invokeSubscriberMethod方法里也可看出:
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {synchronized (this) {super.invokeSubscriberMethod(event);}
}
拓展
EventListener和EventBus
相同点:都是基于事件驱动模式,都可用于实现解耦的组件通信。
不同点:
- 功能:
- EventListener:订阅事件并执行操作的组件;粒度更细,聚焦于局部业务逻辑;一般是自定义类或方法,绑定到特定事件类型;
- EventBus:管理事件的发布、订阅、流转;侧重于全局管理事件,通常是单例模式或共享对象。
- 实现方式:
- EventListener:Spring的@EventListener是一种更广义的事件监听机制,可配合ApplicationEvent或自定义事件;
- EventBus:Guava EventBus通过注解定义事件处理器,自动绑定到特定事件类型。
联系:EventListener通常依赖EventBus进行注册,接收由EventBus发布的事件。
EventBus对比MQ
特性 | EventBus | MQ |
---|---|---|
使用范围 | 单JVM | 跨进程、分布式 |
通信模式 | 内存中发布-订阅 | 发布-订阅、点对点 |
可靠性 | 无内置持久化或重试机制 | 支持持久化、重试、消息确认 |
延迟 | 低延迟 | 相对稍高(取决于网络和队列机制) |
复杂性 | 简单,适合轻量场景 | 较高,需要额外运维 |
Guava EventBus现状
- 局限性:
基于内存的轻量级工具,仅适用于单JVM内的事件管理;在分布式跨服务场景下,无法满足可靠性、持久性和扩展性要求。 - 依然适用的场景:
- 轻量化需求:在小型项目中,快速实现解耦的优秀选择;
- 单JVM内部事件管理:用于模块间事件流转(如前端事件或服务内异步操作);
- 开发测试:在测试环境中模拟事件驱动的逻辑。
Netflix EventBus
除了Guava EventBus外,Netflix也提供一套框架:
<dependency><groupId>com.netflix.netflix-commons</groupId><artifactId>netflix-eventbus</artifactId><version>0.3.0</version>
</dependency>
参考
- EventBus原理深度解析