EventBus

上一篇文章,我们讲了观察者模式,又称为发布-订阅模式。我们发现不管是同步阻塞的实现方式,还是异步非阻塞的实现方式,都要求 Observer 实现某个接口,这样可能会导致在重构过程中代码修改的范围过大;而且 Observable 都会持有 Observer 的集合,以便在发生某个事件的时候通知所有注册的 Observer,这样会导致 Observable 和 Observer 的耦合性过大。

那有没有一种实现方式可以像使用消息队列那样: Observable 不需要感知 Observer 的存在,当某个事件发生的时候,只需要发布该事件即可;Observer 也无需感知 Observable 的存在,它只需要订阅某个事件即可,当该事件发生的时候,它就会执行相应的动作。这种实现方式是存在的,也就是我们今天要讲的 EventBus。

1. EventBus 的使用

EventBus 类似于使用消息队列实现的发布-订阅模式。当事件源发生某事件的时候,我们可以调用 EventBus.post() 方法将该事件发布;那么所有监听这种事件类型的 EventListener 就会执行相对应的动作。

Google Guava EventBus 就是一个比较著名的 EventBus 框架。它的优点是:事件和事件监听者可以是任意的数据类型;它不仅仅支持同步阻塞模式,还支持异步非阻塞模式;易于使用。下面我们就通过一个具体的例子,来看看如何使用 Guava EventBus。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class UserController {
private UserService userService;
private EventBus eventBus;

public UserController() {
eventBus = new AsyncEventBus(Executors.newFixedThreadPool(20));
}

public void setObservers(List<Object> observers) {
for (Object observer : observers) {
eventBus.register(observer); // register observer
}
}

public Long register(String telephone, String password) {
// authentication...
long userId = userService.register(telephone, password);
eventBus.post(userId); // post event
return userId;
}
}

public class PromotionObserver {
private PromotionService promotionService;

@Subscribe
public void afterRegister(Long userId) {
promotionService.issueExperienceCash(userId);
}
}

public class NotificationObserver {
private NotificationService notificationService;

@Subscribe
public void notificate(Long userId) {
String message = ...;
notificationService.sendInboxMessage(userId, message);
}
}

在上述代码中,我们通过 EventBus.register() 来注册事件监听者,注意事件监听者可以是任意类型。然后,当事件发生的时候,通过 EventBus.post() 方法来发布事件,事件也可以是任意的类型。这样事件监听者中带有 @Subscribe 注解的方法,并且方法的参数类型与事件的类型匹配,那么该方法就会执行。

2. EventBus 的原理

接下来,我们就来看看 EventBus 框架的实现原理。首先,我们简单介绍下 Guava EventBus 框架的核心成员~

  • EventBus, AsyncEventBus

    EventBus 是同步阻塞模式,AsyncEventBus 是异步非阻塞模式,AsyncEventBus 继承了 EventBus。

    1
    2
    EventBus eventBus = new EvetnBus();
    EventBus eventBus = new AsyncEventBus(executor);
  • EventBus.register()

    EventBus.register() 方法是用来注册事件监听者的,事件监听者可以是任意类型的对象。

    1
    public void register(Object object);
  • EventBus.unregister()

    顾名思义,unregister() 就是用来删除事件监听者的,方法签名如下:

    1
    public void unregister(Object object);
  • EventBus.post()

    EventBus.post() 方法是用来发布事件的,事件也可以是任意类型的对象。

    1
    public void post(Object object);

    需要注意的是:事件不是发送给所有的事件监听者,而是发送给了那些可以监听该类型事件的监听者 (具体如何实现的,请继续往下看…)。

  • @Subscribe 注解

    EventBus 通过 @Subscribe 注解来标明哪些函数是订阅消息 (事件) 的函数。如果该类的对象注册到了 EventBus,并且 EventBus 发布了与该方法的参数类型相匹配的消息,那么该方法就会得到执行。具体使用方式如下:

    1
    2
    3
    4
    5
    6
    public class EventListener {
    // Omit other code...

    @Subscribe
    public void method(T event);
    }

既然我们已经清楚了如何使用 Guava EventBus,那么接下来我们就来说说它的实现原理。 EventBus 框架的核心逻辑都在 register() 和 post() 两个函数当中,搞明白了它们,基本就搞明白了整个框架。

首先我们来看看 register() 函数:当调用 register() 注册对象时,register() 会扫描对象所属类中所有带有 @Subscibe 注解的方法,并把带有 @Subscribe 注解的方法和该对象封装成一个 ObserverAction,添加到注册表 (ObserverRegistry) 中。注册表其实就是一个 Map,键是事件的类型 (也就是方法参数的类型);值是 ObserverAction 集合,也就是当该事件发生的时候,所有要执行的动作。

接下来我们来看看 post() 函数:当调用 post() 函数发布一个事件的时候,post() 函数会根据事件的类型去注册表中找到所有要执行的动作,然后通过反射执行相应的动作~

总体来说 EventBus 框架的原理还是非常简单的,清楚了原理之后,接下来我们就自己动手山寨一个 EventBus 框架。

3. 自己动手实现一个 EventBus 框架

首先,清点一下我们需要实现哪些 artifact:@Subscribe 注解,ObserverAction,ObserverRegistry,EventBus 和它的子类 AsyncEventBus。

3.1 @Subscribe

@Subscribe 是一个注解,标明该对象的哪些方法应该添加到注册表。

1
2
3
4
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
}

3.2 ObserverAction

ObserverAction 封装了注册的对象以及这个对象中带有 @Subscribe 注解的方法,这样做的目的是方便以后通过反射去执行方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ObserverAction {
private Object target;
private Method method;

public ObserverAction(Object target, Method method) {
if (target == null)
throw new NullPointerException();
this.target = target;
this.method = method;
this.method.setAccessible(true);
}

public void execute(Object event) {
try {
method.invoke(target, event);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}
}

3.3 ObserverRegistry

ObserverRegistry 是 EventBus 框架中最复杂的一个类,几乎所有的核心逻辑都在这个类中。这个类中大量使用了反射的机制,不过总体来说代码并不难懂…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ObserverRegistry {
private ConcurrentMap<Class<?>, CopyOnWriteArraySet<ObserverAction>> registry = new ConcurrentHashMap<>();

public void register(Object observer) {
Map<Class<?>, Collection<ObserverAction>> observerActions = findAllObserverActions(observer);
for (Map.Entry<Class<?>, Collection<ObserverAction>> entry : observerActions.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
CopyOnWriteArraySet<ObserverAction> registeredEventActions = registry.get(eventType);
if (registeredEventActions == null) {
registry.putIfAbsent(eventType, new CopyOnWriteArraySet<>());
registeredEventActions = registry.get(eventType);
}
registeredEventActions.addAll(eventActions);
}
}

public List<ObserverAction> getMatchedObserverActions(Object event) {
List<ObserverAction> matchedObservers = new ArrayList<>();
Class<?> postedEventType = event.getClass();
for (Map.Entry<Class<?>, CopyOnWriteArraySet<ObserverAction>> entry : registry.entrySet()) {
Class<?> eventType = entry.getKey();
Collection<ObserverAction> eventActions = entry.getValue();
if (eventType.isAssignableFrom(postedEventType)) { // if matches then add all actions
matchedObservers.addAll(eventActions);
}
}
return matchedObservers;
}

private Map<Class<?>, Collection<ObserverAction>> findAllObserverActions(Object observer) {
Map<Class<?>, Collection<ObserverAction>> observerActions = new HashMap<>();
Class<?> clazz = observer.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
if (!observerActions.containsKey(eventType)) {
observerActions.put(eventType, new ArrayList<>());
}
observerActions.get(eventType).add(new ObserverAction(observer, method));
}
return observerActions;
}

private List<Method> getAnnotatedMethods(Class<?> clazz) {
List<Method> annotatedMethods = new ArrayList<>();
for (Method method : clazz.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class)) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 1) {
throw new IllegalArgumentException("Subscriber methods must have exactly 1 parameter.");
}
annotatedMethods.add(method);
}
}
return annotatedMethods;
}
}

值得一提的是,我们使用了 CopyOnWriteArraySet 和 ConcurrentHashMap 来提升并发环境下的读写性能。

3.4 EventBus & AsyncEventBus

EventBus 是同步阻塞的,AsyncEventBus 是异步非阻塞的,AsyncEventBus 继承了 EventBus。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class EventBus {
private ObserverRegistry registry = new ObserverRegistry();
private Executor executor;

public EventBus() {
this(Executors.newFixedThreadPool(1));
}

protected EventBus(Executor executor) {
this.executor = executor;
}

public void register(Object object) {
registry.register(object);
}

public void post (Object event) {
List<ObserverAction> matchedObserverActions = registry.getMatchedObserverActions(event);
for (ObserverAction action : matchedObserverActions) {
executor.execute(() -> action.execute(event));
}
}
}

看了代码可能你会有这样的疑惑,为什么同步阻塞式的 EventBus 需要持有 Executor 对象呢?之所以这样,是为了和异步非阻塞的 AsyncEventBus 统一代码逻辑,做到代码复用。在构建 EventBus 对象的时候,传入的 Executor 参数是 Executors.newFixedThreadPool(1),也就是说 EventBus 其实还是单线程的。

AsyncEventBus 只需要继承 EventBus 即可,它的实现非常简单。

1
2
3
4
5
public class AsyncEventBus extends EventBus{
public AsyncEventBus(Executor executor) {
super(executor);
}
}

小结

在这篇文章中,我们仿照 Google Guava EventBus,用了不到 200 行代码就山寨了一个 EventBus 框架。在功能上它和 Google Guava EventBus 几乎没什么区别;但是在性能上,它们还是有很大的差距的,Google Guava EventBus 在实现细节方面做了很多的优化。感兴趣的同学可以去看看 Google Guava EventBus 的源码

0%