EventBus3.0源码学习(2) post

时间:2022-04-15 00:17:43

发送事件(post)

当需要发送事件时,调用EventBus.getDefault().post(event)即可,EventBus会将事件发送给所有已经注册了监听该类事件的订阅者。post的实现如下:

 1 public void post(Object event) {
 2     PostingThreadState postingState = currentPostingThreadState.get();
 3     List<Object> eventQueue = postingState.eventQueue;
 4     eventQueue.add(event);
 5 
 6     if (!postingState.isPosting) {
 7         postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
 8         postingState.isPosting = true;
 9         if (postingState.canceled) {
10             throw new EventBusException("Internal error. Abort state was not reset");
11         }
12         try {
13             while (!eventQueue.isEmpty()) {
14                 postSingleEvent(eventQueue.remove(0), postingState);
15             }
16         } finally {
17             postingState.isPosting = false;
18             postingState.isMainThread = false;
19         }
20     }
21 }

第2-4行,currentPostingThreadState是ThreadLocal<PostingThreadState>范型类的实例,记录了每个线程的发送事件的状态,event被放入postingState.eventQueue中。

第6-20行,如果postingState.isPosting为false,表明线程没有在发送消息,则调用postSingleEvent函数发送postingState.eventQueue队首的消息,直至队列为空。finally块用于保证无论postSingleEvent是否发生异常,当前线程的postingState保存的状态都能被正确复位。 postSingleEvent的实现如下:

 1 private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
 2     Class<?> eventClass = event.getClass();
 3     boolean subscriptionFound = false;
 4     if (eventInheritance) {
 5         List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
 6         int countTypes = eventTypes.size();
 7         for (int h = 0; h < countTypes; h++) {
 8             Class<?> clazz = eventTypes.get(h);
 9             subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
10         }
11     } else {
12         subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
13     }
14     if (!subscriptionFound) {
15         if (logNoSubscriberMessages) {
16             Log.d(TAG, "No subscribers registered for event " + eventClass);
17         }
18         if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
19                 eventClass != SubscriberExceptionEvent.class) {
20             post(new NoSubscriberEvent(this, event));
21         }
22     }
23 }

第4-10行,若eventInheritance为true,则调用lookAllEventTypes将所有类和接口T满足<T super eventClass>找到。根据面向对象的里氏替换原则:一个对象可以被当作其父类对象、其实现的任意接口对象。因此,考虑类继承的情况下,当发送一个事件时,监听其超类体系的任意父类事件和任意接口事件的订阅者,也应该收到相应消息。lookAllEventTypes的实现如下:

 1 private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
 2     synchronized (eventTypesCache) {
 3         List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
 4         if (eventTypes == null) {
 5             eventTypes = new ArrayList<>();
 6             Class<?> clazz = eventClass;
 7             while (clazz != null) {
 8                 eventTypes.add(clazz);
 9                 addInterfaces(eventTypes, clazz.getInterfaces());
10                 clazz = clazz.getSuperclass();
11             }
12             eventTypesCache.put(eventClass, eventTypes);
13         }
14         return eventTypes;
15     }
16 }
17 
18 static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
19     for (Class<?> interfaceClass : interfaces) {
20         if (!eventTypes.contains(interfaceClass)) {
21             eventTypes.add(interfaceClass);
22             addInterfaces(eventTypes, interfaceClass.getInterfaces());
23         }
24     }
25 }

第7-11行,递归查找eventClass超类体系的所有类及其实现的接口。当clazz为Object、interface、void 或 基本数据类型(byte, short, int, long, float, double, boolean, char)时,clazz.getSuperClass()返回null。

第20行,由于父类和子类可能同时声明实现同一接口,因此需要判断interfaceClass是否已经被添加过了。

eventTypeCache存储了<eventClass, eventTypes>的映射关系,这是一种优化手段,避免每次发送事件时都去查找eventClass的超类体系。

postSingleEvent最终调用postSingleEventForEventType发送事件,postSingleEventForEventType的实现如下:

 1 private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
 2     CopyOnWriteArrayList<Subscription> subscriptions;
 3     synchronized (this) {
 4         subscriptions = subscriptionsByEventType.get(eventClass);
 5     }
 6     if (subscriptions != null && !subscriptions.isEmpty()) {
 7         for (Subscription subscription : subscriptions) {
 8             postingState.event = event;
 9             postingState.subscription = subscription;
10             boolean aborted = false;
11             try {
12                 postToSubscription(subscription, event, postingState.isMainThread);
13                 aborted = postingState.canceled;
14             } finally {
15                 postingState.event = null;
16                 postingState.subscription = null;
17                 postingState.canceled = false;
18             }
19             if (aborted) {
20                 break;
21             }
22         }
23         return true;
24     }
25     return false;
26 }

第3-5行,从subscriptionsByEventType中查找订阅了eventClass事件的subscriptions。该操作是在synchronized块中,因为register和unregister操作会从其他线程修改subscriptionsByEventType。

第6-22行,依次调用postToSubscription函数,将event发送给subscriptions中的每个subscription。postToSubscription调用之后,postingState.canceled被赋值给aborted,而在finally块中,postingState.canceled又被复位。显然,若postToSubscription调用过程中postingState.canceled若被置位,则subscriptions中后续的所有subscription将无法接收到本次发送的事件。由于subscriptions中的subscription是以subscrberMethod.priority排序的,因此,高优先级的订阅者可以阻止低优先级的订阅者接收到本次事件。由于postingState是线程本地变量,因此高优先级的订阅者如果想这么做,则必须以POSTING模式接收事件,调用cancelEventDelivery(event)即可。

postToSubscription是真正分发事件的函数,其实现如下:

 1 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
 2     switch (subscription.subscriberMethod.threadMode) {
 3         case POSTING:
 4             invokeSubscriber(subscription, event);
 5             break;
 6         case MAIN:
 7             if (isMainThread) {
 8                 invokeSubscriber(subscription, event);
 9             } else {
10                 mainThreadPoster.enqueue(subscription, event);
11             }
12             break;
13         case BACKGROUND:
14             if (isMainThread) {
15                 backgroundPoster.enqueue(subscription, event);
16             } else {
17                 invokeSubscriber(subscription, event);
18             }
19             break;
20         case ASYNC:
21             asyncPoster.enqueue(subscription, event);
22             break;
23         default:
24             throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
25     }
26 }
27 
28 void invokeSubscriber(Subscription subscription, Object event) {
29     try {
30         subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
31     } catch (InvocationTargetException e) {
32         handleSubscriberException(subscription, event, e.getCause());
33     } catch (IllegalAccessException e) {
34         throw new IllegalStateException("Unexpected exception", e);
35     }
36 }

postToSubscription最终调用invokeSubscriber,以event为参数调用subscriber对象处理event事件的函数(即subscriberMethod)。显然如果subscriberMethod处理event时发生异常,是可能会导致整个程序崩溃的。

第2-25行,switch块实现了EventBus中threadMode的定义:

  • POSTING:该模式不需要线程切换,为默认模式。subscriberMethod直接在post线程执行。
  • MAIN:若当前线程是主线程,则与POSTING一样;否则,由mainThreadPoster进行处理。当我们需要更新UI时,会采用这种模式。
  • BACKGROUND:若当前线程不是主线程,则与POSTING一样;否则,由backgroundPoster进行处理。backgroundPoster中的任务是串行执行的。
  • ASYNC:该模式总是需要进行线程切换,由asyncPoster进行处理。