OkHttp源码分析

1 序

基于OkHttp 3.14.9版本

作为一个Android开发者,不知道OkHttp网络库的几乎是没有了吧。很多时候我们在使用三方库的时候因为时间关系不会去深究其原理,只要会用就行。但是闲下来还是有必要阅读下这些优秀的三方库源码,探究其实现方式和设计模式。阅读优秀的源码,学以致用,是我们快速成长的方式。

2 OkHttp概述

HTTP 是现代应用程序网络的方式。这就是我们交换数据和媒体的方式。有效地执行 HTTP 可以使您的内容加载速度更快并节省带宽。 OkHttp 是一个默认高效的 HTTP 客户端:

  • HTTP/2 支持 允许对同一主机的所有请求共享一个套接字。
  • 连接池减少了请求延迟(如果 HTTP/2 不可用)。
  • transparent GZIP 可缩小下载大小。
  • Response缓存完全避免网络重复请求。
  • OkHttp在网络故障时会自动重试:它会从常见的连接问题中默默恢复。如果您的服务有多个 IP 地址,则 OkHttp 将在第一次连接失败时尝试备用地址。这对于在冗余数据中心托管的 IPv4+IPv6 和服务是必需的。OkHttp 还支持现代 TLS 功能(TLS 1.3,ALPN,certificate pinning)。It can be configured to fall back for broad connectivity.

以上是官方对于OkHttp的描述,本文会从OkHttp的使用方法出发,通过使用来探究其内部是如何实现的。

3 OkHttp源码分析

3.1 基本用例

使用OkHttp实现GET请求非常简单,例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private final OkHttpClient client = new OkHttpClient();

  //同步请求方式
  public void run() throws Exception {
    Request request = new Request.Builder()
        .url("https://publicobject.com/helloworld.txt")
        .build();
    //使用同步的方式请求
    try (Response response = client.newCall(request).execute()) {
        if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
        //打印结果
        System.out.println(response.body().string());
    }
}

//异步请求方式
public void runAsync() throws Exception {
    Request request = new Request.Builder()
        .url("http://publicobject.com/helloworld.txt")
        .build();

    client.newCall(request).enqueue(new Callback() {

      @Override public void onFailure(Call call, IOException e) {
        e.printStackTrace();
      }

      @Override public void onResponse(Call call, Response response) throws IOException {
        try (ResponseBody responseBody = response.body()) {
          if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);

          //打印结果
          System.out.println(responseBody.string());
        }
      }
    });
}

这个简单的例子,展示了OkHttp的使用过程:

  1. 创建OkHttpClient对象
  2. 构造请求Request
  3. 调用OkHttpClient发送Request
  4. 解析请求结果

我们将以这个例子为切入点,来分析OkHttp的内部实现:

3.2 创建OkHttpClient对象

OkHttpClient的配置参数太多,使用了建造者模式来创建对象,我们来看下OkHttpClient构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
...

    public OkHttpClient() {
        this(new Builder());
    }

    OkHttpClient(Builder builder) {
        this.dispatcher = builder.dispatcher;
        this.proxy = builder.proxy;
        this.protocols = builder.protocols;
        this.connectionSpecs = builder.connectionSpecs;
        this.interceptors = Util.immutableList(builder.interceptors);
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
        this.eventListenerFactory = builder.eventListenerFactory;
        this.proxySelector = builder.proxySelector;
        this.cookieJar = builder.cookieJar;
        this.cache = builder.cache;
        this.internalCache = builder.internalCache;
        this.socketFactory = builder.socketFactory;
        ...
    }
  ...

可以看到,OkHttpClient实例化实际上是调用了OkHttpClient(Builder builder),传入了一个OkHttpClient.Builder建造者对象,然后将参数赋值给了自己。

看一看OkHttpClient.Builder的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    public Builder() {
      dispatcher = new Dispatcher();// 分发器,另有一个带线程池参数的构造器
      protocols = DEFAULT_PROTOCOLS;// 支持的协议,默认为HTTP_2、HTTP_1_1
      connectionSpecs = DEFAULT_CONNECTION_SPECS; // 传输层版本、连接协议
      eventListenerFactory = EventListener.factory(EventListener.NONE);/ 事件监听器
      proxySelector = ProxySelector.getDefault();// 代理选择器
      if (proxySelector == null) {
        proxySelector = new NullProxySelector();
      }
      cookieJar = CookieJar.NO_COOKIES;// 读写Cookie的容器
      socketFactory = SocketFactory.getDefault();// Socket工厂
      hostnameVerifier = OkHostnameVerifier.INSTANCE;// 主机名验证器
      certificatePinner = CertificatePinner.DEFAULT;//证书固定
      proxyAuthenticator = Authenticator.NONE;// 代理认证器
      authenticator = Authenticator.NONE;// 本地认证器
      connectionPool = new ConnectionPool();// 连接池
      dns = Dns.SYSTEM;// 域名解析
      followSslRedirects = true;// SSL重定向
      followRedirects = true;// 普通重定向
      retryOnConnectionFailure = true;// 连接失败重试
      callTimeout = 0;//默认调用超时
      connectTimeout = 10_000;// 连接超时时间
      readTimeout = 10_000;// 读超时时间
      writeTimeout = 10_000;// 写超时时间
      pingInterval = 0;// ping间隔
    }

内部对很多属性都赋予了默认值,我们根据需要可以通过Builder中各种对应的方法传入自定义的值来替换它们。

3.3 发起请求

构建Request

1
2
3
4
5
6
7
Request(Builder builder) {
    this.url = builder.url;// 请求的url
    this.method = builder.method;// 请求方式
    this.headers = builder.headers.build();// 请求头
    this.body = builder.body;// 请求体
    this.tags = Util.immutableMap(builder.tags); // 请求的tag
  }

Request类很简单,如上。封装的就是我们发起http请求的各种要素,请求地址,请求方式(GET、POST…),请求头,请求体等。不清楚的可以去看下http协议。

3.3.1 同步请求execute()

如上面示例所示:

1
2
3
4
5
6
 client.newCall(request).execute()

//OkHttpClient
@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}

newCall创建了一个RealCall对象,而RealCall实现了Call接口。Call接口声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public interface Call extends Cloneable {
    /** 获得原始请求 */
    Request request();

    /** 同步执行请求 */
    Response execute() throws IOException;

    /** 异步执行请求 */
    void enqueue(Callback responseCallback);

    /** 取消请求。已经完成了的请求不能被取消 */
    void cancel();

    /**
    * 调用了execute()或者enqueue(Callback)后都是true
    */
    boolean isExecuted();

    boolean isCanceled();

    /**
    * 返回跨越整个调用的超时时间
    */
    Timeout timeout();

    /** 创建一个新的、完全一样的Call对象,即使原对象状态为enqueued或者executed */
    Call clone();

    interface Factory {
        Call newCall(Request request);
    }
}

可以看到Call接口主要是对请求控制操作,例如调用已准备好执行的请求,取消请求等。Call的实现就是RealCall,我们来看看newCall内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    //RealCall

    final OkHttpClient client;
    //发送器,OkHttp的应用程序和网络层之间的桥梁
    private Transmitter transmitter;
    final Request originalRequest;
    final boolean forWebSocket;
    // Guarded by this.
    private boolean executed;

    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.transmitter = new Transmitter(client, call);
        return call;
    }

接着来看重点执行同步请求的RealCall.execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    @Override 
    public Response execute() throws IOException {
        synchronized (this) {
        //避免多次执行
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
        }
        //如果配置了callTimeout 内部会启动一个Watchdog线程检测超时时间
        transmitter.timeoutEnter();
        //回调callStart事件
        transmitter.callStart();
        try {
        //将Call放入Dispatcher中的runningSyncCalls队列
        client.dispatcher().executed(this);
        //进行网络请求并获取Response
        return getResponseWithInterceptorChain();
        } finally {
        //Call执行完毕后将其从Dispatcher.runningSyncCalls队列中移除
        client.dispatcher().finished(this);
        }
    }

我们先来看client.dispatcher().executed(this)方法:

1
2
3
4
5
6
7
8
9
//Dispatcher

  //同步调用队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

方法很简单,就是将call加入到Dispatcher.runningSyncCalls队列中。

接着调用getResponseWithInterceptorChain进行网络请求并获取Response,该方法是OkHttp中的最重要的点,稍后我们在介绍RealCall.enqueue方法时再一起说。

紧接着就是finally代码块里面的client.dispatcher().finished(this):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//Dispatcher

 /** Used by {@code Call#execute} to signal completion. */
    void finished(RealCall call) {
        finished(runningSyncCalls, call);
    }

    private <T> void finished(Deque<T> calls, T call) {
        Runnable idleCallback;
        synchronized (this) {
            if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
            idleCallback = this.idleCallback;
        }
        //返回当前是否有任务执行
        boolean isRunning = promoteAndExecute();

        if (!isRunning && idleCallback != null) {
            idleCallback.run();
        }
    }

该方法也很简单,就是Call执行完毕后将其从Dispatcher.runningSyncCalls队列中移除,同时还会执行promoteAndExecute方法,此方法是给异步调用准备的,具体代码后面会谈到;最后如果runningAsyncCallsrunningSyncCalls这俩正在执行的同步、异步队列之和为0,说明dispatcher处理空闲状态,那么调用idleCallback.run通知外界dispatcher已经空闲了.

3.3.2 异步请求enqueue()

1
2
3
4
5
6
7
8
9
10
11
12
    //RealCall
    @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
        //避免多次执行
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
        }
        //回调callStart事件
        transmitter.callStart();
        //将AsyncCall放入Dispatcher中的readyAsyncCalls队列
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }

AsyncCallRealCall中的一个内部类,实现了Runnable接口。先来看下AsyncCall相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
  final class AsyncCall extends NamedRunnable {
    private final Callback responseCallback;
    private volatile AtomicInteger callsPerHost = new AtomicInteger(0);

    AsyncCall(Callback responseCallback) {
      super("OkHttp %s", redactedUrl());
      this.responseCallback = responseCallback;
    }
    ...
    //传入一个线程池执行自己 会调用Runnable的run方法
    void executeOn(ExecutorService executorService) {
      assert (!Thread.holdsLock(client.dispatcher()));
      boolean success = false;
      try {
        executorService.execute(this);
        success = true;
      } catch (RejectedExecutionException e) {
        InterruptedIOException ioException = new InterruptedIOException("executor rejected");
        ioException.initCause(e);
        transmitter.noMoreExchanges(ioException);
        responseCallback.onFailure(RealCall.this, ioException);
      } finally {
        if (!success) {
          client.dispatcher().finished(this); // This call is no longer running!
        }
      }
    }

    //在父类的run方法中调用
    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

可以看到AsyncCall实际上是一个Runnable,执行自己是实际上执行的是内部的execute()方法。

了解了AsyncCall的大致结构后,我们返回Dispatcher.enqueue方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//Dispatcher
    //准备好执行的异步任务队列
    private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    //正在执行的异步任务队列
    private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    ...
    void enqueue(AsyncCall call) {
        synchronized (this) {
            //加入readyAsyncCalls队列
            readyAsyncCalls.add(call);
            //不是webSocket,主机相同的话共享现有运行调用的AtomicInteger
            if (!call.get().forWebSocket) {
                AsyncCall existingCall = findExistingCallWithHost(call.host());
                if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
            }
        }
        //进行异步调用
        promoteAndExecute();
    }

我们来看下promoteAndExecute()方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    //标记当前是否还有请求在执行
    boolean isRunning;
    synchronized (this) {
      //遍历所有准备执行的异步任务
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();
        //如果正在执行的异步任务队列超过64个 直接跳出
        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
        //最多5个host
        if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
        //从readyAsyncCalls中移除
        i.remove();
        //callsPerHost数量+1
        asyncCall.callsPerHost().incrementAndGet();
        //加入可执行列表
        executableCalls.add(asyncCall);
        //加入runningAsyncCalls队列
        runningAsyncCalls.add(asyncCall);
      }
      //当前正在执行的call个数(同步+异步)是否大于0
      isRunning = runningCallsCount() > 0;
    }
    //便利可执行列表
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      //执行异步请求
      asyncCall.executeOn(executorService());
    }

    return isRunning;
}

我们来看核心的asyncCall.executeOn(executorService())方法,先看executorService()

1
2
3
4
5
6
7
8
9
10
11
12
//Dispatcher
    ...
    private @Nullable ExecutorService executorService;
    ...
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
        executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
            new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    }
    ...

这里我们可以看出来,这是一个典型的CachedThreadPool。是一个线程数量不定的线程池,他只有非核心线程,并且其最大线程数为Integer.MAX_VALUE。线程池中的空闲线程都有超时机制,这个超时时长为60s,超过这个时间的闲置线程就会被回收。SynchronousQueue可以简单的理解为一个无法存储元素的队列,因此这将导致任何任务都会立刻执行。

从其特性来看,这类线程池适合执行大量耗时较少的任务。当整个线程池处于闲置状态时,线程池中的线程都会因为超时而被停止,这个时候CachedThreadPool之中实际上是没有线程的,它几乎不占用任何系统资源。

我们之前在讲AsyncCall结构时说过executeOn方法实际是把自己放入一个线程池执行,最终执行的是AsyncCall.execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    //AsyncCall
    @Override protected void execute() {
      boolean signalledCallback = false;
      transmitter.timeoutEnter();
      try {
        //进行网络请求并获取response
        Response response = getResponseWithInterceptorChain();
        signalledCallback = true;
        responseCallback.onResponse(RealCall.this, response);
      } catch (IOException e) {
        if (signalledCallback) {
          // Do not signal the callback twice!
          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
        } else {
          responseCallback.onFailure(RealCall.this, e);
        }
      } catch (Throwable t) {
        cancel();
        if (!signalledCallback) {
          IOException canceledException = new IOException("canceled due to " + t);
          canceledException.addSuppressed(t);
          responseCallback.onFailure(RealCall.this, canceledException);
        }
        throw t;
      } finally {
        //Call执行完毕后将其从Dispatcher.runningAsyncCalls队列中移除
        client.dispatcher().finished(this);
      }
    }

可以看到不管是同步请求RealCall.execute还是异步请求RealCall.enqueue最终都会通过调用getResponseWithInterceptorChain()方法去请求并获取结果。我们来看下核心的getResponseWithInterceptorChain方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    //RealCall
    Response getResponseWithInterceptorChain() throws IOException {
        //创建了一个拦截器集合
        List<Interceptor> interceptors = new ArrayList<>();
        //用户自定义拦截器
        interceptors.addAll(client.interceptors());
        //添加重试、重定向拦截器RetryAndFollowUpInterceptor
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        //添加桥接拦截器BridgeInterceptor
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        //添加缓存拦截器CacheInterceptor
        interceptors.add(new CacheInterceptor(client.internalCache()));
        //添加ConnectInterceptor拦截器
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
            //用户自定义networkInterceptors
            interceptors.addAll(client.networkInterceptors());
        }
        //链中的最后一个拦截器,它对服务器进行网络调用
        interceptors.add(new CallServerInterceptor(forWebSocket));
        //创建一个拦截器责任链来执行
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
            originalRequest, this, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());

        boolean calledNoMoreExchanges = false;
        try {
            //链式执行,最终返回Response
            Response response = chain.proceed(originalRequest);
            if (transmitter.isCanceled()) {
                closeQuietly(response);
                throw new IOException("Canceled");
            }
            return response;
        } catch (IOException e) {
            calledNoMoreExchanges = true;
            throw transmitter.noMoreExchanges(e);
        } finally {
            if (!calledNoMoreExchanges) {
                transmitter.noMoreExchanges(null);
            }
        }
    }

我们在这儿说一下系统创建的几个拦截器的作用:

  • RetryAndFollowUpInterceptor:用于失败重试或者根据需要进行重定向。
  • BridgeInterceptor:从应用程序代码到网络代码的桥梁。首先它根据用户请求构建网络请求。然后它继续调用网络。最后它根据网络响应构建用户响应。
  • CacheInterceptor:为来自缓存的请求提供服务,并将响应写入缓存
  • ConnectInterceptor:与目标服务器的连接
  • CallServerInterceptor:这是链中的最后一个拦截器,它对服务器进行网络调用。

用户自定义的interceptor将最先被执行,自定义的networkInterceptor将在与目标服务器建立连接后执行。

可以看到getResponseWithInterceptorChain方法是使用了责任链模式,将所有拦截器依次加入到List中,并创建了一个拦截器责任链RealInterceptorChain链式调用其proceed方法来处理网络请求。这样的好处是这条链中所有的对象都能处理请求,每个类只需要处理自己该处理的工作,不该处理的传递给下一个对象完成,明确各类的责任范围,符合类的单一职责原则,也避免了请求的发送者和接受者之间的耦合关系。

3.4 责任链模式和拦截器

3.4.1 责任链模式的实现

我们来看一下RealInterceptorChain内部的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final class RealInterceptorChain implements Interceptor.Chain {
    //拦截器集合
    private final List<Interceptor> interceptors;
    ...
    //当前执行到拦截器的下标
    private final int index;
    private final Request request;
    private final Call call;

    ...

    public RealInterceptorChain(List<Interceptor> interceptors, Transmitter transmitter,
        @Nullable Exchange exchange, int index, Request request, Call call,
        int connectTimeout, int readTimeout, int writeTimeout) {
        this.interceptors = interceptors;
        ...
        this.index = index;
        this.request = request;
        this.call = call;
        ...
    }

    ...

    @Override public Response proceed(Request request) throws IOException {
        return proceed(request, transmitter, exchange);
    }

    public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
        throws IOException {
        //当前下标大于interceptors长度 抛出异常
        if (index >= interceptors.size()) throw new AssertionError();

        ...

        // 创建一个新的RealInterceptorChain,index+1拦截器下标后移
        RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
        //获取当前拦截器
        Interceptor interceptor = interceptors.get(index);
        //将RealInterceptorChain传入并执行拦截器的方法
        //interceptor.intercept方法内部会继续递归调用RealInterceptorChain.proceed方法,重新走到这儿,此时的index角标已经指向下一个interceptor
        Response response = interceptor.intercept(next);

        ...

        return response;
    }
}

通过上面分析我们知道RealInterceptorChain.proceedinterceptor.intercept会递归调用,执行完所有拦截器的intercept方法,直到遇见最后一个拦截器CallServerInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    ...

    return response;
  }
}

可以看到CallServerInterceptor最终返回了相应结果response,而并没有继续调用RealInterceptorChain.proceed方法。这儿就是递归的出口,最终跟服务器交互拿到了response并返回。

通过拦截链的调用我们可以知道: Request时拦截器是从第一个开始往后执行,Response返回时,拦截器是从最后一个往前执行。如下图: OkHttp拦截器执行过程

3.4.2 拦截器

接下来我们来看看通过责任链调用的一个个拦截器中都实现了什么吧:

  1. RetryAndFollowUpInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public final class RetryAndFollowUpInterceptor implements Interceptor {
  //重试次数上限
  private static final int MAX_FOLLOW_UPS = 20;

  private final OkHttpClient client;

  public RetryAndFollowUpInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    //请求的Request对象
    Request request = chain.request();
    //下一个RealInterceptorChain
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Transmitter transmitter = realChain.transmitter();

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {//循环执行 直到返回或者抛出异常
        //准备连接
        transmitter.prepareToConnect(request);
        //如果请求已经取消则抛出异常
        if (transmitter.isCanceled()) {
            throw new IOException("Canceled");
        }

        Response response;
        boolean success = false;
        try {
            //执行realChain.proceed,内部将调用下一个拦截器的intercept方法直到返回response
            response = realChain.proceed(request, transmitter, null);
            success = true;
        } catch (RouteException e) {
            //通过路由连接的尝试失败。请求将不会被发送
            if (!recover(e.getLastConnectException(), transmitter, false, request)) {
            throw e.getFirstConnectException();
            }
            //进行下一次重试
            continue;
        } catch (IOException e) {
            //试图与服务器通信失败
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            //尝试从与服务器通信失败中恢复 如果失败抛出异常
            if (!recover(e, transmitter, requestSendStarted, request)) throw e;
            //进行下一次重试
            continue;
        } finally {
            //如果请求未成功,释放资源
            if (!success) {
            transmitter.exchangeDoneDueToException();
            }
        }

        // 如果请求是重试后的请求,那么将重试前请求的body设置为null并加到当前响应体priorResponse字段中
        if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
        }

        Exchange exchange = Internal.instance.exchange(response);
        Route route = exchange != null ? exchange.connection().route() : null;
        //通过Http状态码处理 不需要后续操作返回null
        Request followUp = followUpRequest(response, route);

        if (followUp == null) {
            if (exchange != null && exchange.isDuplex()) {
            //停止应用超时
            transmitter.timeoutEarlyExit();
            }
            //返回响应体
            return response;
        }

        RequestBody followUpBody = followUp.body();
        //如果followUpBody为空并且该body只能调用一次 直接返回response
        if (followUpBody != null && followUpBody.isOneShot()) {
            return response;
        }
        //关闭响应体
        closeQuietly(response.body());
        if (transmitter.hasExchange()) {
            //释放请求和响应的相关资源
            exchange.detachWithViolence();
        }
        //超出重上限抛出异常
        if (++followUpCount > MAX_FOLLOW_UPS) {
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
        }
        //更换下一次重试的request
        request = followUp;
        //保存当前的response
        priorResponse = response;
    }
  }

  ...
}
  1. BridgeInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public final class BridgeInterceptor implements Interceptor {
    private final CookieJar cookieJar;

    public BridgeInterceptor(CookieJar cookieJar) {
        this.cookieJar = cookieJar;
    }

    @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();

        RequestBody body = userRequest.body();
        if (body != null) {
            MediaType contentType = body.contentType();
            if (contentType != null) {
                requestBuilder.header("Content-Type", contentType.toString());
            }

            long contentLength = body.contentLength();
            if (contentLength != -1) {
                requestBuilder.header("Content-Length", Long.toString(contentLength));
                requestBuilder.removeHeader("Transfer-Encoding");
            } else {
                requestBuilder.header("Transfer-Encoding", "chunked");
                requestBuilder.removeHeader("Content-Length");
            }
        }

        if (userRequest.header("Host") == null) {
            requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }

        if (userRequest.header("Connection") == null) {
            requestBuilder.header("Connection", "Keep-Alive");
        }

        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
            transparentGzip = true;
            requestBuilder.header("Accept-Encoding", "gzip");
        }

        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
            requestBuilder.header("Cookie", cookieHeader(cookies));
        }

        if (userRequest.header("User-Agent") == null) {
            requestBuilder.header("User-Agent", Version.userAgent());
        }
        //执行chain.proceed,内部将调用下一个拦截器的intercept方法直到返回response
        Response networkResponse = chain.proceed(requestBuilder.build());

        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);

        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
            GzipSource responseBody = new GzipSource(networkResponse.body().source());
            Headers strippedHeaders = networkResponse.headers().newBuilder()
                .removeAll("Content-Encoding")
                .removeAll("Content-Length")
                .build();
            responseBuilder.headers(strippedHeaders);
            String contentType = networkResponse.header("Content-Type");
            responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }

        return responseBuilder.build();
    }

}

上面代码总体来说干了两件事:

  • 对原始的Request进行检查,设置Content-TypeContent-LengthTransfer-EncodingHostConnectionAccept-EncodingCookieUser-Agent等header
  • 若是gzip编码,则对响应进行Gzip处理,否则直接返回。
  1. CacheInterceptor: 首先需要注意的是,OkHttp中的Cache策略采用的是DiskLruCache,关于DiskLruCache可以参考DiskLruCache。key的计算为: ByteString.encodeUtf8(url.toString()).md5().hex() 下面是CacheInterceptor的主要代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public final class CacheInterceptor implements Interceptor {
  final @Nullable InternalCache cache;

  public CacheInterceptor(@Nullable InternalCache cache) {
    this.cache = cache;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    // 根据是否设置了缓存以及网络请求,得到一个候选缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();

    // 根据当前时间、请求对象以及候选缓存,获取缓存策略
    // 在缓存策略中,有两个重要的对象
    // networkRequest: 网络请求,若为null表示不使用网络
    // cacheResponse: 响应缓存,若为null表示不使用缓存
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }
    // 如果有候选缓存但是没有响应缓存,说明候选缓存不可用
    // 关闭它,以免内存泄漏
    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    // 不进行网络请求,且缓存过期了,返回504错误
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    // If we don't need the network, we're done.
    // 不需要网络请求,此时缓存命中,直接返回缓存,后面的拦截器的步骤也不在执行了
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
    //网络回来的Response
    Response networkResponse = null;
    try {
        //执行chain.proceed,内部将调用下一个拦截器的intercept方法直到返回response
        networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // If we have a cache response too, then we're doing a conditional get.
    // 如果缓存策略中,网络响应和响应缓存都不为null,需要更新响应缓存
    // (比如 需要向服务器确认缓存是否可用的情况)
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    // 构建响应对象 装填cacheResponse和networkResponse
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
        // 将请求放到缓存中
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

        // 如果请求不能被缓存,则从cache移除
      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }
}
  1. ConnectInterceptor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;

  public ConnectInterceptor(OkHttpClient client) {
    this.client = client;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    Transmitter transmitter = realChain.transmitter();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);

    return realChain.proceed(request, transmitter, exchange);
  }
}

上面代码只有几行,作用就是与服务器建立连接,然后传递到下一个拦截器。newExchange 方法中会先通过 ExchangeFinder 尝试去 RealConnectionPool 中寻找已存在的连接,未找到则会重新创建一个 RealConnection 并开始连接,然后将其存入 RealConnectionPool,此时已经准备好了 RealConnection 对象,然后通过请求协议创建不同的 ExchangeCodec 并返回。

通过上面面步骤创建好 ExchangeCodec 之后,再根据它以及其他参数创建 Exchange 对象并返回。ConnectInterceptorExchange 对象作为参数,调用 Chain.process 方法传递到下一个连接器。

这儿有两个关键的类:

  • Transmitter:Transmitter 是 OkHttp 网络层的桥梁,对外提供功能实现。
  • Exchange:Exchange 与 Request 一一对应,新建一个请求时就会创建一个 Exchange,该 Exchange 负责将这个请求发送出去并读取到响应数据。Exchange内部有一个ExchangeCodec,它负责对 Request 编码及解码 Response,也就是写入请求及读取响应,我们的请求及响应数据都通过它来读写。
  1. CallServerInterceptor

CallServerInterceptor 负责读写数据。这是最后一个 interceptor 了,到了这里该准备的都准备好了,通过它,将会把 Request 中的数据发送到服务端,并获取到数据写入 Response

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public final class CallServerInterceptor implements Interceptor {
  private final boolean forWebSocket;

  public CallServerInterceptor(boolean forWebSocket) {
    this.forWebSocket = forWebSocket;
  }

  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Exchange exchange = realChain.exchange();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();

    exchange.writeRequestHeaders(request);

    boolean responseHeadersStarted = false;
    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        exchange.flushRequest();
        responseHeadersStarted = true;
        exchange.responseHeadersStart();
        responseBuilder = exchange.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        if (request.body().isDuplex()) {
          // Prepare a duplex body so that the application can send a request body later.
          exchange.flushRequest();
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, true));
          request.body().writeTo(bufferedRequestBody);
        } else {
          // Write the request body if the "Expect: 100-continue" expectation was met.
          BufferedSink bufferedRequestBody = Okio.buffer(
              exchange.createRequestBody(request, false));
          request.body().writeTo(bufferedRequestBody);
          bufferedRequestBody.close();
        }
      } else {
        exchange.noRequestBody();
        if (!exchange.connection().isMultiplexed()) {
          // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
          // from being reused. Otherwise we're still obligated to transmit the request body to
          // leave the connection in a consistent state.
          exchange.noNewExchangesOnConnection();
        }
      }
    } else {
      exchange.noRequestBody();
    }

    if (request.body() == null || !request.body().isDuplex()) {
      exchange.finishRequest();
    }

    if (!responseHeadersStarted) {
      exchange.responseHeadersStart();
    }

    if (responseBuilder == null) {
      responseBuilder = exchange.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(exchange.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      response = exchange.readResponseHeaders(false)
          .request(request)
          .handshake(exchange.connection().handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();

      code = response.code();
    }

    exchange.responseHeadersEnd(response);

    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(exchange.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      exchange.noNewExchangesOnConnection();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }
}

上面代码很简单,主要操作都放在了Exchange对象中,内部通过okio库来进行了网络的io操作,这里不做过多介绍。

4 总结

OkHttp 还有很多细节部分没有在本文展开,例如 HTTP2/HTTPS的支持,底层网络io 实现等,但建立一个清晰的概览非常重要。对整体有了清晰认识之后,细节部分如有需要,再单独深入将更加容易。

下面贴一张完整的流程图: OkHttp流程图