android消息机制

1 序

基于Android9.0分析更新

2 概述

什么是消息机制呢?简单来说,Android消息机制是一套以“消息”为中介来实现线程之间的任务切换或同一线程中任务的按需执行的机制,其中涉及到消息的发送、存储消息、消息循环以及消息的分发和处理。本文将通过分析源码来进一步了解消息机制的内部实现方式。

Android消息机制涉所及到的类: 类

3 消息机制分析

3.1 我们先创建一个简单的Handler使用代码片段用于发送和处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    private Handler mHandler = new Handler() {

        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            textView.setText(msg.obj.toString());
        }
    };

    private void sendMessage(){
        Message msg = Message.obtain();
        msg.obj = Thread.currentThread().getName();
        mHandler.sendMessage(msg);
    }

上述代码片段发生了什么:

  1. 创建了一个Handler并复写了handleMessage方法来处理消息
  2. 调用了Message.obtain()来创建一个消息
  3. 调用了mHandler.sendMessage(msg)发送消息
  4. handleMessage(Message msg)收到发送的消息并进行处理

我们会按照这个执行流程一步一步分析消息机制

3.2 创建Handler

构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    public Handler() {
        this(null, false);
    }
    /*
     * @hide 该构造方法是隐藏的,只允许内部调用
     */
    public Handler(Callback callback, boolean async) {
        if (FIND_POTENTIAL_LEAKS) {
            final Class<? extends Handler> klass = getClass();
            if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                    (klass.getModifiers() & Modifier.STATIC) == 0) {
                Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                    klass.getCanonicalName());
            }
        }

        //返回当前线程关联的Looper对象
        mLooper = Looper.myLooper();
        if (mLooper == null) {
            throw new RuntimeException(
                "Can't create handler inside thread " + Thread.currentThread()
                        + " that has not called Looper.prepare()");
        }
        //保存当前Looper关联的消息队列
        mQueue = mLooper.mQueue;
        mCallback = callback;
        mAsynchronous = async;
    }

Handler 的构造方法需要传入两个参数,第一个参数是 Handler.Callback 接口的实现,第二个参数是标志传递的 Message 是否是异步。第一个参数的作用我们后面会说到,第二个参数我们使用时一般使用的是同步消息,看默认构造方法也能知道,异步消息系统会在某些时候使用。

3.3 创建Message

此时Handler对象已经创建,按照上面的步骤我们看看Message.obtain()方法中发生了什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    /**
     * Return a new Message instance from the global pool. Allows us to
     * avoid allocating new objects in many cases.
     */
    public static Message obtain() {
        synchronized (sPoolSync) {
            if (sPool != null) {
                Message m = sPool;
                sPool = m.next;
                m.next = null;
                m.flags = 0; // clear in-use flag
                sPoolSize--;
                return m;
            }
        }
        return new Message();
    }

通过上面代码可以看到,Message内部维护了一个链表方式实现的缓存池用于降低创建Message的开销,默认最大的缓存数量MAX_POOL_SIZE = 50为50个,这就是为什么我们使用Handler发送消息时建议使用Message.obtain()方法创建消息的原因了。

3.4 sendMessage发送消息

接着我们来看看sendMessage(Message msg)方法发生了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
    public final boolean sendMessage(Message msg)
    {
        return sendMessageDelayed(msg, 0);
    }

    public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }

    private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
        //将handler绑定到message对象
        msg.target = this;
        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

sendMessage(Message msg)方法内部调用的实际上是sendMessageDelayed,最终会走到sendMessageAtTime然后调用enqueueMessage方法将消息放入消息队列MessageQueue并将handler和发送出去的message对象绑定。

通过上面我们已经知道Handler是如何将消息发送到MessageQueue中了,那么MessageQueue中的消息是如何分发出去的呢? 答案就是Looper。

3.5 Looper

用于为线程运行消息循环的类。默认情况下,线程没有与之关联的消息循环;创建一个,调用 prepare()运行循环的线程,然后 loop()让它处理消息,直到循环停止。

大多数与消息循环的交互是通过 Handler类进行的。 这是一个Looper线程实现的典型例子,使用分离prepare()和loop()创建初始Handler与Looper进行通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 class LooperThread extends Thread {
      public Handler mHandler;

      public void run() {
          Looper.prepare();

          mHandler = new Handler(Looper.myLooper()) {
              public void handleMessage(Message msg) {
                  // process incoming messages here
              }
          };

          Looper.loop();
      }
  }

上述是官方文档给予Looper的描述,通过代码片段我们可以看到创建Handler前需要调用Looper.prepare()实例化Looper,之后需要调用Looper.loop() 开启Looper循环分发消息。但是我们平常使用Handler过程中是不是都没有做这些操作呀,这是为什么呢? 我们来看下ActivityThread的Main方法,这是应用程序的入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
        
        ...
        //初始化Looper
        Looper.prepareMainLooper();

        ...

        //创建ActivityThread对象
        ActivityThread thread = new ActivityThread();
        //建立Binder通道 (创建新线程)
        thread.attach(false, startSeq);

        ...
        
        //开启循环
        Looper.loop();

        throw new RuntimeException("Main thread loop unexpectedly exited");
    }

通过上面我们知道,系统早已经给我们准备好了主线程的Looper,所以我们平常在主线程使用Handler时不需要调用Looper.prepare()Looper.loop()方法,但是如果在子线程创建Handler并使用,上面步骤是必不可少的,我们需要自己调用Looper.prepare()Looper.loop()方法。

那我们来看看Looper.prepare()和Looper.loop()中做了些什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
    /** Initialize the current thread as a looper.
      * This gives you a chance to create handlers that then reference
      * this looper, before actually starting the loop. Be sure to call
      * {@link #loop()} after calling this method, and end it by calling
      * {@link #quit()}.
      */
    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        //将Looper保存在当前线程的ThreadLocal中
        sThreadLocal.set(new Looper(quitAllowed));
    }

    private Looper(boolean quitAllowed) {
        //实例化MessageQueue
        mQueue = new MessageQueue(quitAllowed);
        mThread = Thread.currentThread();
    }


    /**
     * Run the message queue in this thread. Be sure to call
     * {@link #quit()} to end the loop.
     */
    public static void loop() {
        //获取当前线程的Looper
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        // 清空远程调用端进程的身份,用本地进程的身份代替,确保此线程的身份是本地进程的身份,并跟踪该身份令牌
        // 这里主要用于保证消息处理是发生在当前 Looper 所在的线程
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        // Allow overriding a threshold with a system prop. e.g.
        // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
        final int thresholdOverride =
                SystemProperties.getInt("log.looper."
                        + Process.myUid() + "."
                        + Thread.currentThread().getName()
                        + ".slow", 0);

        boolean slowDeliveryDetected = false;
        //死循环 有消息时分发消息,无消息时阻塞
        for (;;) {
            //获取消息队列的消息
            Message msg = queue.next(); // might block
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }

            final long traceTag = me.mTraceTag;
            long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
            long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
            if (thresholdOverride > 0) {
                slowDispatchThresholdMs = thresholdOverride;
                slowDeliveryThresholdMs = thresholdOverride;
            }
            final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
            final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);

            final boolean needStartTime = logSlowDelivery || logSlowDispatch;
            final boolean needEndTime = logSlowDispatch;

            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }

            final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
            final long dispatchEnd;
            try {
                //msg.target 就是之前发送消息时绑定的Handler对象,这儿回调到了
                //handler.dispatchMessage(msg)方法来分发消息
                msg.target.dispatchMessage(msg);
                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
            } finally {
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
            if (logSlowDelivery) {
                if (slowDeliveryDetected) {
                    if ((dispatchStart - msg.when) <= 10) {
                        Slog.w(TAG, "Drained");
                        slowDeliveryDetected = false;
                    }
                } else {
                    if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
                            msg)) {
                        // Once we write a slow delivery log, suppress until the queue drains.
                        slowDeliveryDetected = true;
                    }
                }
            }
            if (logSlowDispatch) {
                showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
            }

            if (logging != null) {
                logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
            }

            // Make sure that during the course of dispatching the
            // identity of the thread wasn't corrupted.
            final long newIdent = Binder.clearCallingIdentity();
            if (ident != newIdent) {
                Log.wtf(TAG, "Thread identity changed from 0x"
                        + Long.toHexString(ident) + " to 0x"
                        + Long.toHexString(newIdent) + " while dispatching to "
                        + msg.target.getClass().getName() + " "
                        + msg.callback + " what=" + msg.what);
            }
            //回收消息将Message放入消息池
            msg.recycleUnchecked();
        }
    }

  1. Looper.prepare()内部做了3件事情,创建Looper,创建MessageQueue,将Looper对象保存在当前线程的ThreadLocal中
  2. Looper.loop()内部获取当前线程对应的Looper对象,然后开启个死循环来调用queue.next()获取当前Looper的MessageQueue对象的消息,有新消息则调用handler.dispatchMessage(msg)方法来分发消息,没有就阻塞。

3.6 MessageQueue

Looper.loop()方法内部获取消息时调用的是message.nex()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
//消息出队
Message next() {
        // Return here if the message loop has already quit and been disposed.
        // This can happen if the application tries to restart a looper after quit
        // which is not supported.
        // 如果消息队列退出,则直接返回
        // 正常运行的应用程序主线程的消息队列是不会退出的,一旦退出则应用程序就会崩溃
        final long ptr = mPtr;
        if (ptr == 0) {
            return null;
        }

        // 记录空闲时处理的 IdlerHandler 数量,可先忽略
        int pendingIdleHandlerCount = -1; // -1 only during first iteration
        // native 层使用的变量,设置的阻塞超时时长
        int nextPollTimeoutMillis = 0;
        // 开始循环获取消息
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }

            // 调用 native 方法阻塞,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒,都会停止阻塞
            nativePollOnce(ptr, nextPollTimeoutMillis);

            synchronized (this) {
                // 尝试获取下一条消息,获取到则返回该消息
                final long now = SystemClock.uptimeMillis();
                // 获取消息队列中的第一条消息
                Message prevMsg = null;
                Message msg = mMessages;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    // 如果 msg 为 Barrier 类型的消息,则拦截所有同步消息,获取第一个异步消息
                    // 循环获取第一个异步消息
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        // 如果 msg 的触发时间还没有到,设置阻塞超时时长
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        // 获取消息并返回
                        mBlocked = false;
                        if (prevMsg != null) {
                            // 如果 msg 不是消息队列的第一条消息,上一条消息的 next 指向 msg 的 next。
                            prevMsg.next = msg.next;
                        } else {
                            // 如果 msg 是消息队列的第一条消息,则 msg 的 next 作为消息队列的第一条消息 // msg 的 next 置空,表示从消息队列中取出了 msg。
                            mMessages = msg.next;
                        }
                        // msg 的 next 置空,表示从消息队列中取出了 msg
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        // 标记 msg 为正在使用
                        msg.markInUse();
                        // 返回该消息,退出循环
                        return msg;
                    }
                } else {
                    // No more messages.
                    // 如果没有消息,则设置阻塞时长为无限,直到被唤醒
                    nextPollTimeoutMillis = -1;
                }

                // Process the quit message now that all pending messages have been handled.
                // 如果消息正在退出,则返回 null
                // 正常运行的应用程序主线程的消息队列是不会退出的,一旦退出则应用程序就会崩溃
                if (mQuitting) {
                    dispose();
                    return null;
                }

                // If first time idle, then get the number of idlers to run.
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                // 第一次循环 且 (消息队列为空 或 消息队列的第一个消息的触发时间还没有到)时,表示处于空闲状态
                // 获取到 IdleHandler 数量
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    // No idle handlers to run.  Loop and wait some more.
                    // 没有 IdleHandler 需要运行,循环并等待
                    // 设置阻塞状态为 true
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            // Run the idle handlers.
            // We only ever reach this code block during the first iteration.
            // 运行 IdleHandler,只有第一次循环时才会运行
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; // release the reference to the handler

                boolean keep = false;
                try {
                    keep = idler.queueIdle();
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

            // Reset the idle handler count to 0 so we do not run them again.
            // 重置 IdleHandler 的数量为 0,确保不会重复运行
            // pendingIdleHandlerCount 置为 0 后,上面可以通过 pendingIdleHandlerCount < 0 判断是否是第一次循环,不是第一次循环则 pendingIdleHandlerCount 的值不会变,始终为 0。
            pendingIdleHandlerCount = 0;

            // While calling an idle handler, a new message could have been delivered
            // so go back and look again for a pending message without waiting.
            // 在执行 IdleHandler 后,可能有新的消息插入或消息队列中的消息到了触发时间,所以将 nextPollTimeoutMillis 置为 0,表示不需要阻塞,重新检查消息队列。
            nextPollTimeoutMillis = 0;
        }
    }

    //消息入队 见上面handler.enqueueMessage方法
    boolean enqueueMessage(Message msg, long when) {
    // 消息对象的目标是 null 时直接抛出异常,因为这意味这个消息无法进行分发处理,
    // 是不合法的消息对象。
    if (msg.target == null) {
        throw new IllegalArgumentException("Message must have a target.");
    }
    // 消息正在使用时抛出异常,消息不能并发使用。
    if (msg.isInUse()) {
        throw new IllegalStateException(msg + " This message is already in use.");
    }

    synchronized (this) {
        // 正在退出消息循环时,回收消息对象。
        if (mQuitting) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
            msg.recycle();
            return false;
        }

        msg.markInUse();
        msg.when = when;
        Message p = mMessages;
        boolean needWake;
        
        // 把消息对象添加到消息队列的合适位置
        if (p == null || when == 0 || when < p.when) {
            // New head, wake up the event queue if blocked.
            // 消息队列为空或者当前消息对象的时间最近,直接放在链表首部。
            msg.next = p;
            // 更新链表指针
            mMessages = msg;
            // 如果原来消息循环处于阻塞状态就重新唤醒
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don't have to wake
            // up the event queue unless there is a barrier at the head of the queue
            // and the message is the earliest asynchronous message in the queue.
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            // 根据消息对象中的时间信息寻找合适的插入位置
            for (;;) {
                prev = p;
                p = p.next;
                if (p == null || when < p.when) {
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            // 找到合适位置后插入链表
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }

        // We can assume mPtr != 0 because mQuitting is false.
        // 唤醒等待,这时消息循环可以继续获取消息了,之前有可能处于阻塞等待状态。
        if (needWake) {
            nativeWake(mPtr);
        }
    }
    return true;
}

nativePollOnce(ptr, nextPollTimeoutMillis)是调用 native 层的方法执行阻塞操作,其中 nextPollTimeoutMillis 表示阻塞超时时长: nextPollTimeoutMillis = 0 则不阻塞 nextPollTimeoutMillis = -1 则一直阻塞,除非消息队列被唤醒

延时消息发送也是基于这个机制。

这儿说下为什么在主线程中死循环没有造成线程卡死呢?这就涉及到Linux pipe/epoll机制,简单说就是在主线程的MessageQueue没有消息时,便阻塞在loop的queue.next()中的nativePollOnce()方法里,此时主线程会释放CPU资源进入休眠状态,直到下个消息到达或者有事务发生,通过往pipe管道写端写入数据来唤醒主线程工作。这里采用的epoll机制,是一种IO多路复用机制,可以同时监控多个描述符,当某个描述符就绪(读或写就绪),则立刻通知相应程序进行读或写操作,本质同步I/O,即读写是阻塞的。 所以说,主线程大多数时候都是处于休眠状态,并不会消耗大量CPU资源。

3.7 Handler处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    /**
     * Handle system messages here.
     */
    public void dispatchMessage(Message msg) {
        if (msg.callback != null) {
            handleCallback(msg);
        } else {
            if (mCallback != null) {
                if (mCallback.handleMessage(msg)) {
                    return;
                }
            }
            handleMessage(msg);
        }
    }

经过前面分析,最终是走到Looper.loop()方法中,调用handler.dispatchMessage(msg)方法来分发消息,这儿就能看到我们创建Handler对象时Handler(Callback callback, boolean async)第一个参数的作用了,dispatchMessage方法内部会判断,有msg的callback直接run,这儿的msg.callback实际上是一个Runnable,没有的话,判断实例化时是否设置了Handler的callback,有则先回调处理这个消息,没有设置或消息未处理过会继续调用默认的handleMessage(msg)方法处理。

4 总结

  1. 主线程调用 Lopper.prepareMainLooper() 方法创建当前线程的 Looper 对象(主线程中这一步由 Android 系统在应用启动时完成),子线程则需要自己调用Looper.prepare()创建对象和Looper.loop()方法开启循环。
  2. 创建 Looper 对象时会内部会创建一个消息队列 MessageQueue
  3. Looper 通过 loop() 方法获取到当前线程的 Looper 并启动循环,从 MessageQueue 不断提取 Message,若 MessageQueue 没有消息,处于阻塞状态。
  4. 使用当前线程创建的 Handler 在其它线程通过 sendMessage() 发送 MessageMessageQueue
  5. MessageQueue 插入新 Message 并唤醒阻塞,重新检查 MessageQueue 获取新插入的 Message
  6. Looper 获取到 Message 后,通过 Messagetarget 即 Handler 调用 dispatchMessage(Message msg) 方法分发提取到的 Message,然后回收 Message 并继续循环获取下一个 Message
  7. Handler 使用 handlerMessage(Message msg) 方法处理 Message
  8. MessageQueue 没有 Message 时,重新进入阻塞状态