摘要:内部会新开一个叫做的线程,根据超时时间依次处理链表的节点。总结通过以及分别提供了同步超时和异步超时功能,同步超时是在每次读取数据前判断是否超时,异步超时则是将组成有序链表,并且开启一个线程来监控,到达超时则触发相关操作。
简介
上一篇文章(Okio 源码解析(一):数据读取流程)分析了 Okio 数据读取的流程,从中可以看出 Okio 的便捷与高效。Okio 的另外一个优点是提供了超时机制,并且分为同步超时与异步超时。本文具体分析这两种超时的实现。
同步超时回顾一下 Okio.source 的代码:
public static Source source(InputStream in) { // 生成一个 Timeout 对象 return source(in, new Timeout()); } private static Source source(final InputStream in, final Timeout timeout) { if (in == null) throw new IllegalArgumentException("in == null"); if (timeout == null) throw new IllegalArgumentException("timeout == null"); return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount); if (byteCount == 0) return 0; try { // 超时检测 timeout.throwIfReached(); Segment tail = sink.writableSegment(1); int maxToCopy = (int) Math.min(byteCount, Segment.SIZE - tail.limit); int bytesRead = in.read(tail.data, tail.limit, maxToCopy); if (bytesRead == -1) return -1; tail.limit += bytesRead; sink.size += bytesRead; return bytesRead; } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) throw new IOException(e); throw e; } } @Override public void close() throws IOException { in.close(); } @Override public Timeout timeout() { return timeout; } @Override public String toString() { return "source(" + in + ")"; } }; }
在 Source 的构造方法中,传入了一个 Timeout 对象。在下面创建的匿名的 Source 对象的 read 方法中,先调用了 timeout.throwIfReached(),这里显然是判断是否已经超时,代码如下:
public void throwIfReached() throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException("thread interrupted"); } if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) { throw new InterruptedIOException("deadline reached"); } }
这里逻辑很简单,如果超时了则抛出异常。在 TimeOut 中有几个变量用于设定超时的时间:
private boolean hasDeadline; private long deadlineNanoTime; private long timeoutNanos;
由于 throwIfReached 是在每次读取数据之前调用并且与数据读取在同一个线程,所以如果读取操作阻塞,则无法及时抛出异常。
异步超时异步超时与同步超时不同,其开了新的线程用于检测是否超时,下面是 Socket 的例子。
Okio 可以接受一个 Socket 对象构建 Source,代码如下:
public static Source source(Socket socket) throws IOException { if (socket == null) throw new IllegalArgumentException("socket == null"); AsyncTimeout timeout = timeout(socket); Source source = source(socket.getInputStream(), timeout); // 返回 timeout 封装的 source return timeout.source(source); }
相比于 InputStream,这里的额外操作是引入了 AsyncTimeout 来封装 socket。timeout 方法生成一个 AsyncTimeout 对象,看一下代码:
private static AsyncTimeout timeout(final Socket socket) { return new AsyncTimeout() { @Override protected IOException newTimeoutException(@Nullable IOException cause) { InterruptedIOException ioe = new SocketTimeoutException("timeout"); if (cause != null) { ioe.initCause(cause); } return ioe; } // 超时后调用 @Override protected void timedOut() { try { socket.close(); } catch (Exception e) { logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e); } catch (AssertionError e) { if (isAndroidGetsocknameError(e)) { logger.log(Level.WARNING, "Failed to close timed out socket " + socket, e); } else { throw e; } } } }; }
上面的代码生成了一个匿名的 AsyncTimeout,其中有个 timedout 方法,这个方法是在超时的时候被调用,可以看出里面的操作主要是关闭 socket。有了 AsyncTimeout 之后,调用其 source 方法来封装 socket 的 InputStream。
下面具体看看 AsyncTimeout 。
AsyncTimeoutAsyncTimeout 继承了 Timeout,提供了异步的超时机制。每一个 AsyncTimeout 对象包装一个 source,并与其它 AsyncTimeout 组成一个链表,根据超时时间的长短插入。AsyncTimeout 内部会新开一个叫做 WatchDog 的线程,根据超时时间依次处理 AsyncTimout 链表的节点。
下面是 AsyncTimeout 的一些内部变量:
// 链表头结点 static @Nullable AsyncTimeout head; // 此节点是否在队列中 private boolean inQueue; // 链表中下一个节点 private @Nullable AsyncTimeout next;
其中 head 是链表的头结点,next 是下一个节点,inQueue 则标识此 AsyncTimeout 是否处于链表中。
在上面的 Okio.source(Socket socket) 中,最后返回的是 timeout.source(socket),下面是其代码:
public final Source source(final Source source) { return new Source() { @Override public long read(Buffer sink, long byteCount) throws IOException { boolean throwOnTimeout = false; // enter enter(); try { long result = source.read(sink, byteCount); throwOnTimeout = true; return result; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public void close() throws IOException { boolean throwOnTimeout = false; try { source.close(); throwOnTimeout = true; } catch (IOException e) { throw exit(e); } finally { exit(throwOnTimeout); } } @Override public Timeout timeout() { return AsyncTimeout.this; } @Override public String toString() { return "AsyncTimeout.source(" + source + ")"; } }; }
AsyncTimtout#source 依然是返回一个匿名的 Source 对象,只不过是将参数中真正的 source 包装了一下,在 source.read 之前添加了 enter 方法,在 catch 以及 finally 中添加了 exit 方法。enter 和 exit 是重点,其中 enter 中会将当前的 AsyncTimeout 加入链表,具体代码如下:
public final void enter() { if (inQueue) throw new IllegalStateException("Unbalanced enter/exit"); long timeoutNanos = timeoutNanos(); boolean hasDeadline = hasDeadline(); if (timeoutNanos == 0 && !hasDeadline) { return; // No timeout and no deadline? Don"t bother with the queue. } inQueue = true; scheduleTimeout(this, timeoutNanos, hasDeadline); } private static synchronized void scheduleTimeout( AsyncTimeout node, long timeoutNanos, boolean hasDeadline) { // 如果链表为空,则新建一个头结点,并且启动 Watchdog线程 if (head == null) { head = new AsyncTimeout(); new Watchdog().start(); } long now = System.nanoTime(); if (timeoutNanos != 0 && hasDeadline) { node.timeoutAt = now + Math.min(timeoutNanos, node.deadlineNanoTime() - now); } else if (timeoutNanos != 0) { node.timeoutAt = now + timeoutNanos; } else if (hasDeadline) { node.timeoutAt = node.deadlineNanoTime(); } else { throw new AssertionError(); } // 按时间将节点插入链表 long remainingNanos = node.remainingNanos(now); for (AsyncTimeout prev = head; true; prev = prev.next) { if (prev.next == null || remainingNanos < prev.next.remainingNanos(now)) { node.next = prev.next; prev.next = node; if (prev == head) { AsyncTimeout.class.notify(); // Wake up the watchdog when inserting at the front. } break; } } }
真正插入链表的操作在 scheduleTimeout 中,如果 head 节点还不存在则新建一个头结点,并且启动 Watchdog 线程。接着就是计算超时时间,然后遍历链表进行插入。如果插入在链表的最前面(head 节点后面的第一个节点),则主动进行唤醒 Watchdog 线程,从这里可以猜到 Watchdog 线程在等待超时的过程中是调用了 AsyncTimeout.class 的 wait 进入了休眠状态。那么就来看看 WatchDog 线程的实际逻辑:
private static final class Watchdog extends Thread { Watchdog() { super("Okio Watchdog"); setDaemon(true); } public void run() { while (true) { try { AsyncTimeout timedOut; synchronized (AsyncTimeout.class) { timedOut = awaitTimeout(); // Didn"t find a node to interrupt. Try again. if (timedOut == null) continue; // The queue is completely empty. Let this thread exit and let another watchdog thread // get created on the next call to scheduleTimeout(). if (timedOut == head) { head = null; return; } } // Close the timed out node. timedOut.timedOut(); } catch (InterruptedException ignored) { } } } }
WatchDog 主要是调用 awaitTimeout() 获取一个已超时的 timeout,如果不为空并且是 head 节点,说明链表中已经没有其它节点,可以结束线程,否则调用 timedOut.timedOut(), timeOut() 是一个空方法,由用户实现超时后应该采取的操作。 awaitTimeout 是获取超时节点的方法:
static @Nullable AsyncTimeout awaitTimeout() throws InterruptedException { // Get the next eligible node. AsyncTimeout node = head.next; // 队列为空的话等待有节点进入队列或者达到超时IDLE_TIMEOUT_MILLIS的时间 if (node == null) { long startNanos = System.nanoTime(); AsyncTimeout.class.wait(IDLE_TIMEOUT_MILLIS); return head.next == null && (System.nanoTime() - startNanos) >= IDLE_TIMEOUT_NANOS ? head // The idle timeout elapsed. : null; // The situation has changed. } // 计算等待时间 long waitNanos = node.remainingNanos(System.nanoTime()); // The head of the queue hasn"t timed out yet. Await that. if (waitNanos > 0) { // Waiting is made complicated by the fact that we work in nanoseconds, // but the API wants (millis, nanos) in two arguments. long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); // 调用 wait AsyncTimeout.class.wait(waitMillis, (int) waitNanos); return null; } // 第一个节点超时,移除并返回这个节点 head.next = node.next; node.next = null; return node; }
与 enter 相反,exit 则是视情况抛出异常并且移除链表中的节点,这里就不放具体代码了。
总结Okio 通过 Timeout 以及 AsyncTimeout 分别提供了同步超时和异步超时功能,同步超时是在每次读取数据前判断是否超时,异步超时则是将 AsyncTimeout 组成有序链表,并且开启一个线程来监控,到达超时则触发相关操作。
如果我的文章对您有帮助,不妨点个赞支持一下(^_^)
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68256.html
摘要:封装了和,并且有多个优点提供超时机制不需要人工区分字节流与字符流,易于使用易于测试本文先介绍的基本用法,然后分析源码中数据读取的流程。和分别用于提供字节流和接收字节流,对应于和。和则是保存了相应的缓存数据用于高效读写。 简介 Okio 是 square 开发的一个 Java I/O 库,并且也是 OkHttp 内部使用的一个组件。Okio 封装了 java.io 和 java.nio,...
摘要:使用前准备配置添加网络权限异步请求惯例,请求百度可以省略,默认是请求请求成功与版本并没有什么不同,比较郁闷的是回调仍然不在线程。 前言 上一篇介绍了OkHttp2.x的用法,这一篇文章我们来对照OkHttp2.x版本来看看,OkHttp3使用起来有那些变化。当然,看这篇文章前建议看一下前一篇文章Android网络编程(五)OkHttp2.x用法全解析。 1.使用前准备 Android ...
摘要:需要注意的是回调并不是在线程。也可以通过来同时取消多个请求。在开始创建的时候配置好,在请求网络的时候用将请求的结果回调给线程。最后调用这个的方法请求成功使用起来简单多了,而且请求结果回调是在线程的。 前言 讲完了Volley,我们接下来看看目前比较火的网络框架OkHttp, 它处理了很多网络疑难杂症:会从很多常用的连接问题中自动恢复。如果您的服务器配置了多个IP地址,当第一个IP连接失...
阅读 3207·2021-11-24 10:30
阅读 1322·2021-09-30 09:56
阅读 2395·2021-09-07 10:20
阅读 2608·2021-08-27 13:10
阅读 710·2019-08-30 11:11
阅读 2062·2019-08-29 12:13
阅读 769·2019-08-26 12:24
阅读 2911·2019-08-26 12:20