Android IO 框架 Okio 的实现原理,到底哪里 OK?

时间:2023-02-10 21:09:07

本文已收录到 AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 提问。

前言

大家好,我是小彭。

今天,我们来讨论一个 Square 开源的 I/O 框架 Okio,我们最开始接触到 Okio 框架还是源于 Square 家的 OkHttp 网络框架。那么,OkHttp 为什么要使用 Okio,它相比于 Java 原生 IO 有什么区别和优势?今天我们就围绕这些问题展开。

本文源码基于 Okio v3.2.0。


思维导图

Android IO 框架 Okio 的实现原理,到底哪里 OK?


1. 说一下 Okio 的优势?

相比于 Java 原生 IO 框架,我认为 Okio 的优势主要体现在 3 个方面:

  • 1、精简且全面的 API: 原生 IO 使用装饰模式,例如使用 BufferedInputStream 装饰 FileInputStream 文件输入流,可以增强流的缓冲功能。但是原生 IO 的装饰器过于庞大,需要区分字节、字符流、字节数组、字符数组、缓冲等多种装饰器,而这些恰恰又是最常用的基础装饰器。相较之下,Okio 直接在 BufferedSource 和 BufferedSink 中聚合了原生 IO 中所有基础的装饰器,使得框架更加精简;

  • 2、基于共享的缓冲区设计: 由于 IO 系统调用存在上下文切换的性能损耗,为了减少系统调用次数,应用层往往会采用缓冲区策略。但是缓冲区又会存在副作用,当数据从一个缓冲区转移到另一个缓冲区时需要拷贝数据,这种内存中的拷贝显得没有必要。而 Okio 采用了基于共享的缓冲区设计,在缓冲区间转移数据只是共享 Segment 的引用,而减少了内存拷贝。同时 Segment 也采用了对象池设计,减少了内存分配和回收的开销;

  • 3、超时机制: Okio 弥补了部分 IO 操作不支持超时检测的缺陷,而且 Okio 不仅支持单次 IO 操作的超时检测,还支持包含多次 IO 操作的复合任务超时检测。

下面,我们将从这三个优势展开分析:


2. 精简的 Okio 框架

先用一个表格总结 Okio 框架中主要的类型:

类型 描述
Source 输入流
Sink 输出流
BufferedSource 缓存输入流接口,实现类是 RealBufferedSource
BufferedSink 缓冲输出流接口,实现类是 RealBufferedSink
Buffer 缓冲区,由 Segment 链表组成
Segment 数据片段,多个片段组成逻辑上连续数据
ByteString String 类
Timeout 超时控制

2.1 Source 输入流 与 Sink 输出流

在 Java 原生 IO 中有四个基础接口,分别是:

  • 字节流: InputStream 输入流和 OutputStream 输出流;
  • 字符流: Reader 输入流和 Writer 输出流。

而在 Okio 更加精简,只有两个基础接口,分别是:

  • 流: Source 输入流和 Sink 输出流。

Source.kt

interface Source : Closeable {

    // 从输入流读取数据到 Buffer 中(Buffer 等价于 byte[] 字节数组)
    // 返回值:-1:输入内容结束
    @Throws(IOException::class)
    fun read(sink: Buffer, byteCount: Long): Long

    // 超时控制(详细分析见后续文章)
    fun timeout(): Timeout

    // 关闭流
    @Throws(IOException::class)
    override fun close()
}

Sink.java

actual interface Sink : Closeable, Flushable {

    // 将 Buffer 的数据写入到输出流中(Buffer 等价于 byte[] 字节数组)
    @Throws(IOException::class)
    actual fun write(source: Buffer, byteCount: Long)

    // 清空输出缓冲区
    @Throws(IOException::class)
    actual override fun flush()

    // 超时控制(详细分析见后续文章)
    actual fun timeout(): Timeout

    // 关闭流
    @Throws(IOException::class)
    actual override fun close()
}

2.2 InputStream / OutputStream 与 Source / Sink 互转

在功能上,InputStream - Source 和 OutputStream - Sink 分别是等价的,而且是相互兼容的。结合 Kotlin 扩展函数,两种接口之间的转换会非常方便:

  • source(): InputStream 转 Source,实现类是 InputStreamSource;
  • sink(): OutputStream 转 Sink,实现类是 OutputStreamSink;

比较不理解的是: Okio 没有提供 InputStreamSource 和 OutputStreamSink 转回 InputStream 和 OutputStream 的方法,而是需要先转换为 BufferSource 与 BufferSink,再转回 InputStream 和 OutputStream。

  • buffer(): Source 转 BufferedSource,Sink 转 BufferedSink,实现类分别是 RealBufferedSource 和 RealBufferedSink。

示例代码

// 原生 IO -> Okio
val source = FileInputStream(File("")).source()
val bufferSource = FileInputStream(File("")).source().buffer()

val sink = FileOutputStream(File("")).sink()
val bufferSink = FileOutputStream(File("")).sink().buffer()

// Okio -> 原生 IO
val inputStream = bufferSource.inputStream()
val outputStream = bufferSink.outputStream()

JvmOkio.kt

// InputStream -> Source
fun InputStream.source(): Source = InputStreamSource(this, Timeout())

// OutputStream -> Sink
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout())

private class InputStreamSource(
    private val input: InputStream,
    private val timeout: Timeout
) : Source {

    override fun read(sink: Buffer, byteCount: Long): Long {
        if (byteCount == 0L) return 0
        require(byteCount >= 0) { "byteCount < 0: $byteCount" }
        try {
            // 同步超时监控(详细分析见后续文章)
            timeout.throwIfReached()
            // 读入 Buffer
            val tail = sink.writableSegment(1)
            val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()
            val bytesRead = input.read(tail.data, tail.limit, maxToCopy)
            if (bytesRead == -1) {
                if (tail.pos == tail.limit) {
                    // We allocated a tail segment, but didn't end up needing it. Recycle!
                    sink.head = tail.pop()
                    SegmentPool.recycle(tail)
                }
                return -1
            }
            tail.limit += bytesRead
            sink.size += bytesRead
            return bytesRead.toLong()
        } catch (e: AssertionError) {
            if (e.isAndroidGetsocknameError) throw IOException(e)
            throw e
        }
  }

  override fun close() = input.close()

  override fun timeout() = timeout

  override fun toString() = "source($input)"
}

private class OutputStreamSink(
    private val out: OutputStream,
    private val timeout: Timeout
) : Sink {

    override fun write(source: Buffer, byteCount: Long) {
        checkOffsetAndCount(source.size, 0, byteCount)
        var remaining = byteCount
        // 写出 Buffer
        while (remaining > 0) {
            // 同步超时监控(详细分析见后续文章)
            timeout.throwIfReached()
            // 取有效数据量和剩余输出量的较小值
            val head = source.head!!
            val toCopy = minOf(remaining, head.limit - head.pos).toInt()
            out.write(head.data, head.pos, toCopy)

            head.pos += toCopy
            remaining -= toCopy
            source.size -= toCopy

            // 指向下一个 Segment
            if (head.pos == head.limit) {
                source.head = head.pop()
                SegmentPool.recycle(head)
            }
        }
    }

    override fun flush() = out.flush()

    override fun close() = out.close()

    override fun timeout() = timeout

    override fun toString() = "sink($out)"
}

Okio.kt

// Source -> BufferedSource
fun Source.buffer(): BufferedSource = RealBufferedSource(this)

// Sink -> BufferedSink
fun Sink.buffer(): BufferedSink = RealBufferedSink(this)

2.3 BufferSource 与 BufferSink

在 Java 原生 IO 中,为了减少系统调用次数,我们一般不会直接调用 InputStream 和 OutputStream,而是会使用 BufferedInputStreamBufferedOutputStream 包装类增加缓冲功能。

例如,我们希望采用带缓冲的方式读取字符格式的文件,则需要先将文件输入流包装为字符流,再包装为缓冲流:

Java 原生 IO 示例

// 第一层包装
FileInputStream fis = new FileInputStream(file);
// 第二层包装
InputStreamReader isr = new InputStreamReader(new FileInputStream(file), "UTF-8");
// 第三层包装
BufferedReader br = new BufferedReader(isr);
String line;
while ((line = br.readLine()) != null) {
    ...
}
// 省略 close

同理,我们在 Okio 中一般也不会直接调用 Source 和 Sink,而是会使用 BufferedSourceBufferedSink 包装类增加缓冲功能:

Okio 示例

val bufferedSource = file.source()/*第一层包装*/.buffer()/*第二层包装*/
while (!bufferedSource.exhausted()) {
    val line = bufferedSource.readUtf8Line();
    ...
}
// 省略 close

网上有资料说 Okio 没有使用装饰器模式,所以类结构更简单。 这么说其实不太准确,装饰器模式本身并不是缺点,而且从 BufferedSource 和 BufferSink 可以看出 Okio 也使用了装饰器模式。 严格来说是原生 IO 的装饰器过于庞大,而 Okio 的装饰器更加精简。

比如原生 IO 常用的流就有这么多:

  • 原始流: FileInputStream / FileOutputStream 与 SocketInputStream / SocketOutputStream;

  • 基础接口(区分字节流和字符流): InputStream / OutputStream 与 Reader / Writer;

  • 缓存流: BufferedInputStream / BufferedOutputStream 与 BufferedReader / BufferedWriter;

  • 基本类型: DataInputStream / DataOutputStream;

  • 字节数组和字符数组: ByteArrayInputStream / ByteArrayOutputStream 与 CharArrayReader / CharArrayWriter;

  • 此处省略一万个字。

原生 IO 框架

Android IO 框架 Okio 的实现原理,到底哪里 OK?

而这么多种流在 Okio 里还剩下多少呢?

  • 原始流: FileInputStream / FileOutputStream 与 SocketInputStream / SocketOutputStream;
  • 基础接口: Source / Sink;
  • 缓存流: BufferedSource / BufferedSink。

Okio 框架

Android IO 框架 Okio 的实现原理,到底哪里 OK?

就问你服不服?

而且你看哈,这些都是平时业务开发中最常见的基本类型,原生 IO 把它们都拆分开了,让问题复杂化了。反观 Okio 直接在 BufferedSource 和 BufferedSink 中聚合了原生 IO 中基本的功能,而不再需要区分字节、字符、字节数组、字符数组、基础类型等等装饰器,确实让框架更加精简。

BufferedSource.kt

actual interface BufferedSource : Source, ReadableByteChannel {

    actual val buffer: Buffer

    // 读取 Int
    @Throws(IOException::class)
    actual fun readInt(): Int

    // 读取 String
    @Throws(IOException::class)
    fun readString(charset: Charset): String

    ...

    fun inputStream(): InputStream
}

BufferedSink.kt

actual interface BufferedSink : Sink, WritableByteChannel {

    actual val buffer: Buffer

    // 写入 Int
    @Throws(IOException::class)
    actual fun writeInt(i: Int): BufferedSink

    // 写入 String
    @Throws(IOException::class)
    fun writeString(string: String, charset: Charset): BufferedSink

    ...

    fun outputStream(): OutputStream
}

2.4 RealBufferedSink 与 RealBufferedSource

BufferedSource 和 BufferedSink 还是接口,它们的真正的实现类是 RealBufferedSource 和 RealBufferedSink。可以看到,在实现类中会创建一个 Buffer 缓冲区,在输入和输出的时候,都会借助 “Buffer 缓冲区” 减少系统调用次数。

RealBufferedSource.kt

internal actual class RealBufferedSource actual constructor(
    // 装饰器模式
    @JvmField actual val source: Source
) : BufferedSource {

    // 创建输入缓冲区
    @JvmField val bufferField = Buffer()

    // 带缓冲地读取(全部数据)
    override fun readString(charset: Charset): String {
        buffer.writeAll(source)
        return buffer.readString(charset)
    }

    // 带缓冲地读取(byteCount)
    override fun readString(byteCount: Long, charset: Charset): String {
        require(byteCount)
        return buffer.readString(byteCount, charset)
    }
}

RealBufferedSink.kt

internal actual class RealBufferedSink actual constructor(
    // 装饰器模式
    @JvmField actual val sink: Sink
) : BufferedSink {

    // 创建输出缓冲区
    @JvmField val bufferField = Buffer()

    // 带缓冲地写入(全部数据)
    override fun writeString(string: String, charset: Charset): BufferedSink {
        buffer.writeString(string, charset)
        return emitCompleteSegments()
    }

    // 带缓冲地写入(beginIndex - endIndex)
    override fun writeString(
        string: String,
        beginIndex: Int,
        endIndex: Int,
        charset: Charset
    ): BufferedSink {
        buffer.writeString(string, beginIndex, endIndex, charset)
        return emitCompleteSegments()
    }
}

至此,Okio 基本框架分析结束,用一张图总结:

Okio 框架

Android IO 框架 Okio 的实现原理,到底哪里 OK?


3. Okio 的缓冲区设计

3.1 使用缓冲区减少系统调用次数

在操作系统中,访问磁盘和网卡等 IO 操作需要通过系统调用来执行。系统调用本质上是一种软中断,进程会从用户态陷入内核态执行中断处理程序,完成 IO 操作后再从内核态切换回用户态。

可以看到,系统调用存在上下文切换的性能损耗。为了减少系统调用次数,应用层往往会采用缓冲区策略:

以 Java 原生 IO BufferedInputStream 为例,会通过一个 byte[] 数组作为数据源的输入缓冲,每次读取数据时会读取更多数据到缓冲区中:

  • 如果缓冲区中存在有效数据,则直接从缓冲区数据读取;
  • 如果缓冲区不存在有效数据,则先执行系统调用填充缓冲区(fill),再从缓冲区读取数据;
  • 如果要读取的数据量大于缓冲区容量,就会跳过缓冲区直接执行系统调用。

输出流 BufferedOutputStream 也类似,输出数据时会优先写到缓冲区,当缓冲区满或者手动调用 flush() 时,再执行系统调用写出数据。

伪代码

// 1. 输入
fun read(byte[] dst, int len) : Int {
    // 缓冲区有效数据量
    int avail = count - pos
    if(avail <= 0) {
        if(len >= 缓冲区容量) {
            // 直接从输入流读取
            read(输入流 in, dst, len)
        }
        // 填充缓冲区
        fill(数据源 in, 缓冲区)
    }
    // 本次读取数据量,不超过可用容量
    int cnt = (avail < len) ? avail : len?
    read(缓冲区, dst, cnt)
    // 更新缓冲区索引
    pos += cnt
    return cnt
}

// 2. 输出
fun write(byte[] src, len) {
    if(len > 缓冲区容量) {
        // 先将缓冲区写出
        flush(缓冲区)
        // 直接写出数据
        write(输出流 out, src, len)
    }
    // 缓冲区剩余容量
    int left = 缓冲区容量 - count
    if(len > 缓冲区剩余容量) {
        // 先将缓冲区写出
        flush(缓冲区)
    }
    // 将数据写入缓冲区
    write(缓冲区, src, len)
    // 更新缓冲区已添加数据容量
    count += len
}

3.2 缓冲区的副作用

的确,缓冲区策略能有效地减少系统调用次数,不至于读取一个字节都需要执行一次系统调用,大多数情况下表现良好。 但考虑一种 “双流操作” 场景,即从一个输入流读取,再写入到一个输出流。回顾刚才讲的缓存策略,此时的数据转移过程为:

  • 1、从输入流读取到缓冲区;
  • 2、从输入流缓冲区拷贝到 byte[](拷贝)
  • 3、将 byte[] copy 到输出流缓冲区(拷贝);
  • 4、将输出流缓冲区写入到输出流。

如果这两个流都使用了缓冲区设计,那么数据在这两个内存缓冲区之间相互拷贝,就显得没有必要。

3.3 Okio 的 Buffer 缓冲区

Okio 当然也有缓冲区策略,如果没有就会存在频繁系统调用的问题。

Buffer 是 RealBufferedSource 和 RealBufferedSink 的数据缓冲区。虽然在实现上与原生 BufferedInputStream 和 BufferedOutputStream 不一样,但在功能上是一样的。区别在于:

  • 1、BufferedInputStream 中的缓冲区是 “一个固定长度的字节数组” ,数据从一个缓冲区转移到另一个缓冲区需要拷贝;

  • 2、Buffer 中的缓冲区是 “一个 Segment 双向循环链表” ,每个 Segment 对象是一小段字节数组,依靠 Segment 链表的顺序组成逻辑上的连续数据。这个 Segment 片段是 Okio 高效的关键。

Buffer.kt

actual class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {

    // 缓冲区(Segment 双向链表)
    @JvmField internal actual var head: Segment? = null

    // 缓冲区数据量
    @get:JvmName("size")
    actual var size: Long = 0L
        internal set

    override fun buffer() = this

    actual override val buffer get() = this
}

对比 BufferedInputStream:

BufferedInputStream.java

public class BufferedInputStream extends FilterInputStream {

    // 缓冲区的默认大小(8KB)
    private static int DEFAULT_BUFFER_SIZE = 8192;

    // 输入缓冲区(固定长度的数组)
    protected volatile byte buf[];

    // 有效数据起始位,也是读数据的起始位
    protected int pos;

    // 有效数据量,pos + count 是写数据的起始位
    protected int count;

    ...
}

3.4 Segment 片段与 SegmentPool 对象池

Segment 中的字节数组是可以 “共享” 的,当数据从一个缓冲区转移到另一个缓冲区时,可以共享数据引用,而不一定需要拷贝数据。

Segment.kt

internal class Segment {

    companion object {
        // 片段的默认大小(8KB)
        const val SIZE = 8192
        // 最小共享阈值,超过 1KB 的数据才会共享
        const val SHARE_MINIMUM = 1024
    }

    // 底层数组
    @JvmField val data: ByteArra
    // 有效数据的起始位,也是读数据的起始位
    @JvmField var pos: Int = 0
    // 有效数据的结束位,也是写数据的起始位
    @JvmField var limit: Int = 0
    // 共享标记位
    @JvmField var shared: Boolean = false
    // 宿主标记位
    @JvmField var owner: Boolean = false
    // 后续指针
    @JvmField var next: Segment? = null
    // 前驱指针
    @JvmField var prev: Segment? = null

    constructor() {
        // 默认构造 8KB 数组(为什么默认长度是 8KB)
        this.data = ByteArray(SIZE)
        // 宿主标记位
        this.owner = true
        // 共享标记位
        this.shared = false
    }
}

另外,Segment 还使用了对象池设计,被回收的 Segment 对象会缓存在 SegmentPool 中。SegmentPool 内部维护了一个被回收的 Segment 对象单链表,缓存容量的最大值是 MAX_SIZE = 64 * 1024,也就相当于 8 个默认 Segment 的长度:

SegmentPool.kt

// object:全局单例
internal actual object SegmentPool {

    // 缓存容量
    actual val MAX_SIZE = 64 * 1024

    // 头节点
    private val LOCK = Segment(ByteArray(0), pos = 0, limit = 0, shared = false, owner = false)

    ...
}

Segment 示意图

Android IO 框架 Okio 的实现原理,到底哪里 OK?


4. 总结

  • 1、Okio 将原生 IO 多种基础装饰器聚合在 BufferedSource 和 BufferedSink,使得框架更加精简;
  • 2、为了减少系统调用次数的同时,应用层 IO 框架会使用缓存区设计。而 Okio 使用了基于共享 Segment 的缓冲区设计,减少了在缓冲区间转移数据的内存拷贝;
  • 3、Okio 弥补了部分 IO 操作不支持超时检测的缺陷,而且 Okio 不仅支持单次 IO 操作的超时检测,还支持包含多次 IO 操作的复合任务超时检测。

关于 Okio 超时机制的详细分析,我们在 下一篇文章 里讨论。请关注。


参考资料