资讯专栏INFORMATION COLUMN

EventBus源码分析

Tecode / 1174人阅读

摘要:第二种是通过方法来检查,源码如下通过以及来生成一个,来存储方法所在的类。先看下的方法继承了,调用方法后会向事件队列中插入一个事件,然后将标记位设置为表示正在处理事件,然后调用发送消息通知处理事件。

首先从订阅开始

EventBus.getDefault().register(this);

register方法会获取传入的object对象的class,通过findSubscriberMethods方法来查找这个class中订阅的方法,如下

    public void register(Object subscriber) {
        Class<");synchronized (this) {
            for (SubscriberMethod subscriberMethod : subscriberMethods) {
                subscribe(subscriber, subscriberMethod);
            }
        }
    }

findSubscriberMethods方法实现如下,其中有一个ConcurrentHashMap类型的静态对象METHOD_CACHE,是用来缓存对应类的订阅方法的,以便后续再次订阅时不用重新去findMethods,可以直接从缓存中读取。

    List findSubscriberMethods(Class<"); {
        List subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

查找订阅方法通过ignoreGeneratedIndex字段分为两种方式

第一种findUsingReflection是通过反射来查找,找到被@Subscribe注解修饰的方法,并且根据具体的注解以及方法参数生成一个SubscriberMethod对象:

findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));

第二种findUsingInfo是通过apt的方式,提前找到订阅的方法,可以避免通过反射查找方法带来的耗时。

具体使用方法:在gradle配置apt,rebuild项目,会生成一个注解方法索引类,在EventBusBuilder中通过addIndex方法新建一个该类的对象传入即可。

这边还有一个问题,对于子类重写父类的订阅方法如何处理。在上面的两种方式中在查找完子类的订阅方法后都会继续去查找父类的订阅方法,都通过一个叫做checkAdd的方法进行支撑,该方法返回true表示可以添加到订阅方法的集合中去。

boolean checkAdd(Method method, Class<"); {
    // 2 level check: 1st level with event type only (fast), 2ndlevelwith complete signature when required.
    // Usually a subscriber doesn"t have methods listening to thesameevent type.
    Object existing = anyMethodByEventType.put(eventType, method);
    if (existing == null) {
        return true;
    } else {
        if (existing instanceof Method) {
            if (!checkAddWithMethodSignature((Method)existing,eventType)) {
                // Paranoia check
                throw new IllegalStateException();
            }
            // Put any non-Method object to "consume" theexistingMethod
            anyMethodByEventType.put(eventType, this);
        }
        return checkAddWithMethodSignature(method, eventType);
    }
}

checkAdd中设置了两种检查方式,第一种是通过eventType也就是订阅方法的入参来检查,这种方式比较快,只需要看下之前有没有这种入参的方法就可以了。注释中也指出了通常一个类不会有多个入参相同的订阅方法。

第二种是通过checkAddWithMethodSignature方法来检查,源码如下:

private boolean checkAddWithMethodSignature(Method method,Class<"); {
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(method.getName());
    methodKeyBuilder.append(">").append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();
    Class<");if (methodClassOld == null||methodClassOld.isAssignableFrom(methodClass)) {
        // Only add if not already found in a sub class
        return true;
    } else {
        // Revert the put, old class is further down theclasshierarchy
        subscriberClassByMethodKey.put(methodKey, methodClassOld);
        return false;
    }
}

通过method以及eventType来生成一个key,来存储方法所在的类。其中methodClassOld == null ||methodClassOld.isAssignableFrom(methodClass)这个判断条件对应着两种情况,methodClassOld == null说明是入参相同但是方法名不同的方法正在被添加,直接返回true就可以了methodClassOld.isAssignableFrom(methodClass)这个条件是为了过滤掉父类被子类重写的方法,前面说过了查找订阅方法是从子类开始遍历的,此时如果子类重写了父类的订阅方法,那么methodClassOld对应的是子类,methodClass对应的是父类,显然这个判断就会为false,之后进入下面的else分支return false,也就是忽略掉父类被子类重写的方法。

查找订阅方法基本就这么点,查找完毕之后需要执行订阅操作,也就是register方法中的subscribe(subscriber, subscriberMethod);操作,直接看下该方法的实现:

private void subscribe(Object subscriber, SubscriberMethodsubscriberMethod) {
    Class<");new Subscription(subscriber,subscriberMethod);
    CopyOnWriteArrayList subscriptions =subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " +subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    Listif (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventTypehave to be considered.
            // Note: Iterating over all events may be inefficientwith lots of sticky events,
            // thus data structure should be changed to allow a moreefficient lookup
            // (e.g. an additional map storing sub classes of superclasses: Class -> List).
            Setfor (Map.Entryif (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscriptin, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription,stickyEvent);
        }
    }
}

subscriptionsByEventType这个Map是将eventType作为key保存其所对应的订阅方法的集合。该方法将刚查找到的方法添加到对应的集合中去,添加时有这样一层判断i == size || subscriberMethod.priority >subscriptions.get(i).subscriberMethod.priority这表示这个集合里的方法会按照所设定的优先级进行排序。紧接着又出现了个MaptypesBySubscriber将订阅者作为key保存一个Class的集合,暂时看不出有啥用,就先不管,最后再检查下是不是粘性事件,如果是粘性事件就根据所保存的粘性事件来执行该方法。eventInheritance也是在bulider中设置的,如果为true则会考虑事件的继承性,如果现在有eventType为正在订阅的方法的eventType的子类的粘性事件存在,那么这个粘性事件也会被正在订阅的方法接收到,直接说可能比较绕,举个栗子,现在我有两个事件,其中一个是另一个的子类,并且有两个粘性订阅方法,如下:

    class EventMessage {
  
    }

    class SubEventMessage extends EventMessage {

    }
    
    @Subscribe(sticky =  true)
    public void onEvent(EventMessage message) {
        // do something
    }

    @Subscribe(sticky =  true)
    public void onSubEvent(SubEventMessage message) {
        // do something
    }

当执行register时,如果内存中存在着一个类型为SubEventMessage的事件,那么订阅的时候onEvent方法会被执行,入参是内存中类型为SubEventMessage的事件。

现在register大致就分析完了,再来看下unregister方法:

    public synchronized void unregister(Object subscriber) {
        Listif (subscribedTypes != null) {
            for (Class<");else {
            logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
        }
    }

unregister方法十分简单,typesBySubscriber是刚才在进行订阅的时候不知道用来干什么的Map,现在知道是在取消订阅时用到的,这个Map将订阅者作为key,将其所有的订阅方法的eventType存入到对应的List中去,取消订阅时将这个List取出来,遍历去移除对应的订阅方法,具体实现在unsubscribeByEventType中,也十分简单,就不赘述了。

订阅和取消订阅都看过了,还差个发送事件,发送事件分为postpostSticky两种,先看post

    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

currentPostingThreadState是个ThreadLocal,然后从中取出当前线程的postingState,也就是说每个线程都会维护一个自己的posting状态,之后会有个循环将事件队列清空,通过postSingleEvent方法来进一步处理:

private void postSingleEvent(Object event, PostingThreadStatepostingState) throws Error {
    Class<");boolean subscriptionFound = false;
    if (eventInheritance) {
        Listint countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<");else {
        subscriptionFound = postSingleEventForEventType(event,postingState, eventClass);
    }
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered forevent " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass !=NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

同样是通过eventInheritance来判断是否要涉及eventType的父类,之后再通过postSingleEventForEventType方法的返回值来得到该事件是否被处理,如果没有被处理,那么会返回false进入下一个分支,logNoSubscriberMessagessendNoSubscriberEvents都是在builder中传入的,前者用于没有订阅者处理事件时打印日志,后者用于没有订阅者处理事件时发送一个NoSubscriberEvent类型的事件,所以具体是怎么处理事件的还要继续看postSingleEventForEventType方法:

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<"); {
        CopyOnWriteArrayList subscriptions;
        synchronized (this) {
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

postSingleEventForEventType方法从subscriptionsByEventType中去获取对应事件类型的所有订阅者,如果没有订阅者就返回false表示事件没有被处理,否则就遍历所有的订阅者,通过postToSubscription方法来处理事件,接着往里看:

private void postToSubscription(Subscription subscription, Objectevent, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster notdecoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " +subscription.subscriberMethod.threadMode);
    }
}

在这个方法内终于看到通过区分注解中的threadMode来区分不同的处理方式了,先来看下这几种threadMode分别代表什么意思:

Mode 含义
POSTING 在当前线程执行
MAIN 在主线程执行
MAIN_ORDERED 在主线程有序执行
BACKGROUND 在后台线程执行
ASYNC 在新的线程执行

可以看到有几个差不多,那具体有什么区别呢?直接从代码里看,先说明几个东西,invokeSubscriber就是直接调用订阅方法,还有几个后缀为poster的变量暂时先理解为调用了enqueue方法后,订阅方法就会在某个时间被执行,后面再详细讲。

现在可以看代码了,POSTING没什么好说的,直接调用invokeSubscriber,也就是说在调用eventBus.post的线程执行。

MAINMAIN_ORDERED都是在主线程执行,后者的ORDERED体现在什么地方呢,先看下MAIN的分支,其中通过mainThreadPoster.enqueue插入的事件会在主线程执行,判断当前线程是否是主线程来决定直接调用订阅方法还是通过mainThreadPoster来发布,这里应该没什么疑惑的,主要是MAIN_ORDERED

 if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster notdecoupled from subscriber
                invokeSubscriber(subscription, event);
            }

mainThreadPoster不为空时,通过mainThreadPoster来发布事件,为空时直接调用订阅方法,说好的在主线程调用呢?这里注释也说明了是不正确的,实际上mainThreadPoster为空本身就是种异常情况,具体可以看下它的初始化过程,这里就不细说了。所以下面的else分支就先不管了,那么为什么说通过mainThreadPoster发布的事件就是“有序”的呢,实际上mainThreadPoster内部实现是个handler,可以将事件post到主线程中去执行,所以说是有序的,这里简单说明下原因:

主线程维护着一个消息队列,循环从里面取出消息来处理,我们知道可以通过viewpost方法来获取它绘制完成之后的宽高,原因是post方法里的事件会被插入到消息队列的尾部,而viewmeasure,layout,draw都在新插入的消息的前面,所以当post的方法执行时,view肯定已经绘制好了。

handler通过sendMessage发送的消息也会被插入到主线程消息队列的尾部,这就是“有序”,比如现在有一个ImageView,在它的onMeasure中去发布一个事件,如果订阅方法的模式是MAIN那么会在onMeasure中调用订阅方法,而如果模式是MAIN_ORDERED那么会在ImageView绘制完成后调用订阅方法。

再来看下BACKGROUNDASYNC的区别:

case BACKGROUND:
    if (isMainThread) {
        backgroundPoster.enqueue(subscription, event);
    } else {
        invokeSubscriber(subscription, event);
    }
    break;
case ASYNC:
    asyncPoster.enqueue(subscription, event);
    break;

其中backgroundPosterasyncPoster都会开启一个新线程来执行订阅方法,暂时当成是一样的就行,那么区别就是BACKGROUND模式如果在子线程post一个事件,那么会直接在该线程调用订阅方法,只有在主线程post事件才会开启一个新线程。而ASYNC模式,不管是在哪post事件,都会开启一个新线程来调用订阅方法。

最后再看下几个poster基本上就看完了,几个poster都实现了同一个接口Poster

interface Poster {

    /**
     * Enqueue an event to be posted for a particular subscription.
     *
     * @param subscription Subscription which will receive the event.
     * @param event        Event that will be posted to subscribers.
     */
    void enqueue(Subscription subscription, Object event);
}

可以看到里面只有一个需要实现的方法enqueue,是用来插入事件的,这个接口被三个类实现,分别是HandlerPosterBackgroundPosterAsyncPoster,上面的mainThreadPoster对应的就是HandlerPoster,这三个类中都有个类型为PendingPostQueue的成员变量,这是个事件队列,具体实现就不看了,这个队列提供了入队和出队的方法。

先看下HandlerPosterenqueue方法:

public class HandlerPoster extends Handler implements Poster {
    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
}

HandlerPoster继承了Handler,调用enqueue方法后会向事件队列中插入一个事件,然后将标记位handlerActive设置为true表示正在处理事件,然后调用sendMessage发送消息通知处理事件。PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);这行是用来获取一个消息队列的Node用来插入到队列中去,EventBus维护着一个pool用来保存闲置的Node当有需要时从中取出一个给事件使用,pool不够用时才会new新的Node出来,具体可以看下PendingPost,这样做的好处是可以避免频繁创建对象带来的开销。

再看下HandlerPosterhandleMessage方法:

    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

首先会记录下开始处理事件的时间,然后从事件队列中取出事件,如果为空就将handlerActive设置为false直接return了,如果不为空,就调用eventBus.invokeSubscriber(pendingPost);来调用订阅方法,执行完后,再看下时间,如果超出了规定的时间那么重新发送一条消息,本次消息处理结束,等下次轮到自己的时候再处理事件,毕竟不能一直处理队列里的事件而阻塞了主线程,如果没有超出规定事件,那么说明还可以有事件可以处理下一个事件,就会再次进入循环。

BackgroundPosterAsyncPoster其实和HandlerPoster差不多,只是没有用Handler而是用了线程池去处理事件,具体就不看了。

对了,还有个发送粘性事件:

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriberwants to remove immediately
    post(event);
}

就是在stickyEvents这个map里存一下。

好了,完了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/7397.html

相关文章

  • EventBus源码分析

    摘要:进入源码分析我们从的注册开始入手。关闭此功能将改善事件的发布。创建线程池我们将此段代码逐步分析这步主要是进行初始化话一下必要的参数,如代码注解所示。必须在线程同步中进行。必须保持线程安全的,所以这里使用了。 简介 前面我学习了如何使用EventBus,还有了解了EventBus的特性,那么接下来我们一起来学习EventBus的源码,查看EventBus的源码,看看EventBus给我们...

    ChristmasBoy 评论0 收藏0
  • Android开源架构

    摘要:音乐团队分享数据绑定运行机制分析一个项目搞定所有主流架构单元测试一个项目搞定所有主流架构系列的第二个项目。代码开源,展示了的用法,以及如何使用进行测试,还有用框架对的进行单元测试。 Android 常用三方框架的学习 Android 常用三方框架的学习 likfe/eventbus3-intellij-plugin AS 最新可用 eventbus3 插件,欢迎品尝 简单的 MVP 模...

    sutaking 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<