资讯专栏INFORMATION COLUMN

android源码分析-深入MessageQueue

LeexMuller / 1733人阅读

摘要:相当于层的初始化。注意,这里是层层自己的消息,与层的没关系。好吧,这个过程基本上分析完毕了,其实就是通过不断的处理消息,并且调用消息的回调。

承接上文在looper中会在一开始就创建一个MessageQueue,并且在loop中每次都会从其中取出一个message处理。那么我们就来看看这个MessageQueue:

</>复制代码

  1. MessageQueue(boolean quitAllowed) {
  2. mQuitAllowed = quitAllowed;
  3. mPtr = nativeInit();
  4. }

nativeInit,无可避免的又要进入c层进行分析。对应的文件是/frameworks/base/core/jni/android_os_MessageQueue.cpp:

</>复制代码

  1. static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
  2. NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
  3. if (!nativeMessageQueue) {
  4. jniThrowRuntimeException(env, "Unable to allocate native queue");
  5. return 0;
  6. }
  7. nativeMessageQueue->incStrong(env);
  8. return reinterpret_cast(nativeMessageQueue);
  9. }

这里创建了一个新的NativeMessageQueue并返回他的指针。这个类的定义也在此文件中,看看他的构造做了什么:

</>复制代码

  1. NativeMessageQueue::NativeMessageQueue() :
  2. mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
  3. mLooper = Looper::getForThread();
  4. if (mLooper == NULL) {
  5. mLooper = new Looper(false);
  6. Looper::setForThread(mLooper);
  7. }
  8. }

新建了一个Looper对象,这个肯定不是java层的那个了,但是前后都有getForThread和setForThread。那么他们分别在干什么呢?我的理解是在做tls线程本地变量的处理,确保本线程只有一个looper。具体的内容在这里不再论述,后续有机会可以剖析下。

我们下面来看看这个Looper是什么吧,他的构造函数如下:

</>复制代码

  1. Looper::Looper(bool allowNonCallbacks) :
  2. mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
  3. mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false),
  4. mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
  5. mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  6. LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s",
  7. strerror(errno));
  8. AutoMutex _l(mLock);
  9. rebuildEpollLocked();
  10. }

除了状态的值得设置外,就是rebuildEpollLocked:

</>复制代码

  1. void Looper::rebuildEpollLocked() {
  2. // Close old epoll instance if we have one.
  3. if (mEpollFd >= 0) {
  4. #if DEBUG_CALLBACKS
  5. ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
  6. #endif
  7. close(mEpollFd);
  8. }
  9. // Allocate the new epoll instance and register the wake pipe.
  10. mEpollFd = epoll_create(EPOLL_SIZE_HINT);
  11. LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
  12. struct epoll_event eventItem;
  13. memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
  14. eventItem.events = EPOLLIN;
  15. eventItem.data.fd = mWakeEventFd;
  16. int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);
  17. LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
  18. strerror(errno));
  19. for (size_t i = 0; i < mRequests.size(); i++) {
  20. const Request& request = mRequests.valueAt(i);
  21. struct epoll_event eventItem;
  22. request.initEventItem(&eventItem);
  23. int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem);
  24. if (epollResult < 0) {
  25. ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
  26. request.fd, strerror(errno));
  27. }
  28. }
  29. }

我们看到了什么?epoll。这不是linux中的epoll吗?就是这个玩意,为了控制多个fd(文件描述符)的读写等事件而诞生的,一般多用于网络开发,类似win上的完成端口。然后新建了一个eventItem用于监听mWakeEventFd,就是将唤醒的eventfd放到epoll的监听队列中,用于唤醒机制。然后呢,进行了一个循环,取出所有的request,并且都放到了epoll监听,首次调用这个for循环不会被执行,因为mRequests的size是0。这些request都是什么呢?看定义:

</>复制代码

  1. struct Request {
  2. int fd;
  3. int ident;
  4. int events;
  5. int seq;
  6. sp callback;
  7. void* data;
  8. void initEventItem(struct epoll_event* eventItem) const;
  9. };

那么他们对应的具体内容又是什么呢?先放一放,往下看。

回到java层的loop函数中,每次调用next方法获取message,那么看看这个MessageQueue的next方法:

</>复制代码

  1. Message next() {
  2. // Return here if the message loop has already quit and been disposed.
  3. // This can happen if the application tries to restart a looper after quit
  4. // which is not supported.
  5. final long ptr = mPtr;
  6. if (ptr == 0) {
  7. return null;
  8. }
  9. int pendingIdleHandlerCount = -1; // -1 only during first iteration
  10. int nextPollTimeoutMillis = 0;
  11. for (;;) {
  12. if (nextPollTimeoutMillis != 0) {
  13. Binder.flushPendingCommands();
  14. }
  15. nativePollOnce(ptr, nextPollTimeoutMillis);
  16. synchronized (this) {
  17. // Try to retrieve the next message. Return if found.
  18. final long now = SystemClock.uptimeMillis();
  19. Message prevMsg = null;
  20. Message msg = mMessages;
  21. if (msg != null && msg.target == null) {
  22. // Stalled by a barrier. Find the next asynchronous message in the queue.
  23. do {
  24. prevMsg = msg;
  25. msg = msg.next;
  26. } while (msg != null && !msg.isAsynchronous());
  27. }
  28. if (msg != null) {
  29. if (now < msg.when) {
  30. // Next message is not ready. Set a timeout to wake up when it is ready.
  31. nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
  32. } else {
  33. // Got a message.
  34. mBlocked = false;
  35. if (prevMsg != null) {
  36. prevMsg.next = msg.next;
  37. } else {
  38. mMessages = msg.next;
  39. }
  40. msg.next = null;
  41. if (DEBUG) Log.v(TAG, "Returning message: " + msg);
  42. msg.markInUse();
  43. return msg;
  44. }
  45. } else {
  46. // No more messages.
  47. nextPollTimeoutMillis = -1;
  48. }
  49. // Process the quit message now that all pending messages have been handled.
  50. if (mQuitting) {
  51. dispose();
  52. return null;
  53. }
  54. // If first time idle, then get the number of idlers to run.
  55. // Idle handles only run if the queue is empty or if the first message
  56. // in the queue (possibly a barrier) is due to be handled in the future.
  57. if (pendingIdleHandlerCount < 0
  58. && (mMessages == null || now < mMessages.when)) {
  59. pendingIdleHandlerCount = mIdleHandlers.size();
  60. }
  61. if (pendingIdleHandlerCount <= 0) {
  62. // No idle handlers to run. Loop and wait some more.
  63. mBlocked = true;
  64. continue;
  65. }
  66. if (mPendingIdleHandlers == null) {
  67. mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
  68. }
  69. mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
  70. }
  71. // Run the idle handlers.
  72. // We only ever reach this code block during the first iteration.
  73. for (int i = 0; i < pendingIdleHandlerCount; i++) {
  74. final IdleHandler idler = mPendingIdleHandlers[i];
  75. mPendingIdleHandlers[i] = null; // release the reference to the handler
  76. boolean keep = false;
  77. try {
  78. keep = idler.queueIdle();
  79. } catch (Throwable t) {
  80. Log.wtf(TAG, "IdleHandler threw exception", t);
  81. }
  82. if (!keep) {
  83. synchronized (this) {
  84. mIdleHandlers.remove(idler);
  85. }
  86. }
  87. }
  88. // Reset the idle handler count to 0 so we do not run them again.
  89. pendingIdleHandlerCount = 0;
  90. // While calling an idle handler, a new message could have been delivered
  91. // so go back and look again for a pending message without waiting.
  92. nextPollTimeoutMillis = 0;
  93. }
  94. }

首先看到获取了mPtr,这个ptr就是c层的nativeMessageQueue的地址。然后进入了一个死循环,率先走了一个nativePollOnce(ptr, nextPollTimeoutMillis);内部调用了android_os_MessageQueue_nativePollOnce:

</>复制代码

  1. static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
  2. jlong ptr, jint timeoutMillis) {
  3. NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr);
  4. nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
  5. }

这里实际上还原了地址为NativeMessageQueue对象,并调用了pollOnce方法:

</>复制代码

  1. void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
  2. mPollEnv = env;
  3. mPollObj = pollObj;
  4. mLooper->pollOnce(timeoutMillis);
  5. mPollObj = NULL;
  6. mPollEnv = NULL;
  7. if (mExceptionObj) {
  8. env->Throw(mExceptionObj);
  9. env->DeleteLocalRef(mExceptionObj);
  10. mExceptionObj = NULL;
  11. }
  12. }

保留了pollObj对象,并且调用了Looper的pollOnce。相当于c层Looper的初始化。那么来看看pollOnce:

</>复制代码

  1. int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
  2. int result = 0;
  3. for (;;) {
  4. while (mResponseIndex < mResponses.size()) {
  5. const Response& response = mResponses.itemAt(mResponseIndex++);
  6. int ident = response.request.ident;
  7. if (ident >= 0) {
  8. int fd = response.request.fd;
  9. int events = response.events;
  10. void* data = response.request.data;
  11. #if DEBUG_POLL_AND_WAKE
  12. ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
  13. "fd=%d, events=0x%x, data=%p",
  14. this, ident, fd, events, data);
  15. #endif
  16. if (outFd != NULL) *outFd = fd;
  17. if (outEvents != NULL) *outEvents = events;
  18. if (outData != NULL) *outData = data;
  19. return ident;
  20. }
  21. }
  22. if (result != 0) {
  23. #if DEBUG_POLL_AND_WAKE
  24. ALOGD("%p ~ pollOnce - returning result %d", this, result);
  25. #endif
  26. if (outFd != NULL) *outFd = 0;
  27. if (outEvents != NULL) *outEvents = 0;
  28. if (outData != NULL) *outData = NULL;
  29. return result;
  30. }
  31. result = pollInner(timeoutMillis);
  32. }
  33. }

一个死循环,里面先是一个while,优先处理应答response(一个request对应一个response),并返回。如果没有response需要处理的时候,走pollInner。这个pollInner是个关键,代码比较多,我们节选看:

</>复制代码

  1. ......
  2. struct epoll_event eventItems[EPOLL_MAX_EVENTS];
  3. int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
  4. ......
  5. for (int i = 0; i < eventCount; i++) {
  6. int fd = eventItems[i].data.fd;
  7. uint32_t epollEvents = eventItems[i].events;
  8. if (fd == mWakeEventFd) {
  9. if (epollEvents & EPOLLIN) {
  10. awoken();
  11. } else {
  12. ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
  13. }
  14. } else {
  15. ssize_t requestIndex = mRequests.indexOfKey(fd);
  16. if (requestIndex >= 0) {
  17. int events = 0;
  18. if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
  19. if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
  20. if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
  21. if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
  22. pushResponse(events, mRequests.valueAt(requestIndex));
  23. } else {
  24. ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
  25. "no longer registered.", epollEvents, fd);
  26. }
  27. }
  28. }
  29. ......

epoll_wait在mEpollFd上阻塞等待,直到有事件发生。如果等到了就执行下面的for循环,枚举每一个epoll_event,如果等待到的消息是唤醒消息(fd==mWakeEventFd),则执行awoken唤醒,否则判断epollEvents是否含有相关事件,如果有填写生成好的events,这个应该是转换一下事件为了上层使用。然后进行了pushResponse的动作,这里终于有个response生成的过程了,继续看下去:

</>复制代码

  1. void Looper::pushResponse(int events, const Request& request) {
  2. Response response;
  3. response.events = events;
  4. response.request = request;
  5. mResponses.push(response);
  6. }

看到了吧,就是个填充response的过程,并将其push到mResponses中。再回到pollInner中往下看:

</>复制代码

  1. ......
  2. Done: ;
  3. // Invoke pending message callbacks.
  4. mNextMessageUptime = LLONG_MAX;
  5. while (mMessageEnvelopes.size() != 0) {
  6. nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
  7. const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
  8. if (messageEnvelope.uptime <= now) {
  9. // Remove the envelope from the list.
  10. // We keep a strong reference to the handler until the call to handleMessage
  11. // finishes. Then we drop it so that the handler can be deleted *before*
  12. // we reacquire our lock.
  13. { // obtain handler
  14. sp handler = messageEnvelope.handler;
  15. Message message = messageEnvelope.message;
  16. mMessageEnvelopes.removeAt(0);
  17. mSendingMessage = true;
  18. mLock.unlock();
  19. #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
  20. ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
  21. this, handler.get(), message.what);
  22. #endif
  23. handler->handleMessage(message);
  24. } // release handler
  25. mLock.lock();
  26. mSendingMessage = false;
  27. result = POLL_CALLBACK;
  28. } else {
  29. // The last message left at the head of the queue determines the next wakeup time.
  30. mNextMessageUptime = messageEnvelope.uptime;
  31. break;
  32. }
  33. }
  34. ......

一上来就是一个while循环,处理一下之前堆积的事件。注意,这里是c层(native层)自己的消息,与java层的没关系。这里有个时间的对比,如果每个messageEnvelope的uptime<=now,也即是小于等于当前时间,那么这个uptime是个什么呢?我的理解是一个唤醒时间,也就是message的执行时间,因为message是允许被后置一段时间执行的。如果需要被执行的时间比当前时间晚,就调用这个message的handler的handleMessage。看起来很合理,就是为了清除一下之前堆积还未执行的事件的handle的回调。
之后又是一个for循环:

</>复制代码

  1. ......
  2. for (size_t i = 0; i < mResponses.size(); i++) {
  3. Response& response = mResponses.editItemAt(i);
  4. if (response.request.ident == POLL_CALLBACK) {
  5. int fd = response.request.fd;
  6. int events = response.events;
  7. void* data = response.request.data;
  8. #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
  9. ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
  10. this, response.request.callback.get(), fd, events, data);
  11. #endif
  12. // Invoke the callback. Note that the file descriptor may be closed by
  13. // the callback (and potentially even reused) before the function returns so
  14. // we need to be a little careful when removing the file descriptor afterwards.
  15. int callbackResult = response.request.callback->handleEvent(fd, events, data);
  16. if (callbackResult == 0) {
  17. removeFd(fd, response.request.seq);
  18. }
  19. // Clear the callback reference in the response structure promptly because we
  20. // will not clear the response vector itself until the next poll.
  21. response.request.callback.clear();
  22. result = POLL_CALLBACK;
  23. }
  24. }
  25. ......

这里就是处理response了,就是走一个response.request.callback->handleEvent。
我们现在继续找线索下,在Looper的构造中出现了mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);,这个eventfd就是用来支持进程或者线程间通讯的通道,类似管道。

好吧,这个过程基本上分析完毕了,其实就是通过epoll不断的处理消息,并且调用消息的回调。但是其实整个过程还有很多不是很明确的地方,例如:1.这个epoll绑定的fd到底是个什么东西?是管道吗?网上的文章基本上都是说管道,这里我没有找到线索,不好确定。2.这个c层的looper中的sendmessage已经很明确是根据传递进来的参数来设定messageEnvelope的handler。但是调用他的是哪个东西呢?怎么和java层结合起来呢?有不少问题。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66783.html

相关文章

  • android源码分析-深入消息机制

    摘要:辅助功能类,提供接口向消息池中发送各类消息事件,并且提供响应消息的机制。进入消息泵循环体,以阻塞的方式获取待处理消息。执行消息的派发。并且将返回值保存在了。我们深入下去看看层的部分,在这里明显生成了一个新的,并且将地址作为返回值返回了。 概述 android里的消息机制是非常重要的部分,这次我希望能够系统的剖析这个部分,作为一个总结。首先这里涉及到几个部分,从层次上看,分为java层和...

    superw 评论0 收藏0
  • android源码分析-深入looper handler message

    摘要:在这里表示的是不允许退出。这之后会调用到进入实质性的工作函数中。看到了吧,由于函数运行在主线程中,因此以上这些都是在主线程中运行的代码。注意,这个是排他性的,如果前面的可以执行就不会走后面的。现在比较清楚了吧,整个消息循环是如何运转的。 本来是不想写这篇文章的,但是很早以前看过的东西容易遗忘,希望还是给自己一个记录吧,另外此篇希望能够写的深入一些。looper是什么就不介绍了吧,一个线...

    DobbyKim 评论0 收藏0
  • Android异步消息机制

    摘要:在子线程中发送消息,主线程接受到消息并且处理逻辑。也称之为消息队列,特点是先进先出,底层实现是单链表数据结构得出结论方法初始话了一个对象并关联在一个对象,并且一个线程中只有一个对象,只有一个对象。 目录介绍 1.Handler的常见的使用方式 2.如何在子线程中定义Handler 3.主线程如何自动调用Looper.prepare() 4.Looper.prepare()方法源码分析...

    王晗 评论0 收藏0
  • Android异步消息机制

    摘要:在子线程中发送消息,主线程接受到消息并且处理逻辑。子线程往消息队列发送消息,并且往管道文件写数据,主线程即被唤醒,从管道文件读取数据,主线程被唤醒只是为了读取消息,当消息读取完毕,再次睡眠。 目录介绍 1.Handler的常见的使用方式 2.如何在子线程中定义Handler 3.主线程如何自动调用Looper.prepare() 4.Looper.prepare()方法源码分析 5....

    blair 评论0 收藏0

发表评论

0条评论

LeexMuller

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<