缓存字节流BufferedInputStream使用及原理解析

时间:2022-06-13 14:38:48

流操作的特性是数据传输(只能往前进,而不能往后退),而数据传输速度和数据处理的速度存在不平衡,这种不平衡使得数据传输过程中进行缓存处理而释放数据处理器的资源是一种提高程序效率的机制。比如,厕所的抽水马桶,在你上完厕所后,一按冲水则通过水流冲洗干净马桶,通过水流的冲击力来带走你的排泄物,而抽水马桶上方的水槽中的水是一点点进行积累,有时水未积累到位时,按冲水时,会发现马桶冲不干净。在极端情况下,我们不会用一滴滴的水进行冲洗,而是等到水达到一定的量在进行冲马桶。在数据流处理时,CPU也不会等待内存读取数据后就立即处理,而是在内存数据到达一定的量后在进行处理,从而腾出CPU的处理时间。在java.io读写文件时,常常使用缓存进行操作,而不是按部就班的逐个字节读取处理。


单字节读取

假设读取一个字节需要的时间为0.1毫秒,那么一个1M字节大小的文件,则需要1024 * 1024 = 104857.6毫秒,也即104秒.。同时假设处理一个字节需要0.01,处理完毕需要10秒。

public static void readFileOneByOne() {
    try {
        final File file = new File("test.txt");
        final FileInputStream is = new FileInputStream(file);
        int data = -1;
        //逐个字节读取处理
        while((data = is.read()) != -1) {
            //处理它
            System.out.println((char) data);
        }
    } catch(IOException ioex) {
        ioex.printStackTrace();
    }
}

批处理

预先创建一个大小固定的缓存容器,将流中的数据读入容器中,每当容器数据填满或者流中数据读取完毕时,则进行操作,释放CPU的处理时间。同理,在内存读取文件时,一次读取多个字节并不是线性时间t1,而是线性叠加时间t2,一般t1 << t2。 假设读取1024个字节需要的时间为10毫秒,那么一个1M字节大小的文件,则需要10240 毫秒,也即10秒.。同时假设处理一个字节需要0.01,处理完毕需要10秒。

public static void readFileOneByOne() {
    try {
        final File file = new File("test.txt");
        final FileInputStream is = new FileInputStream(file);
        int data = -1;
        //缓存容器
        byte[] datas = new byte[1024];
        //批量字节读取处理
        while((data = is.read(datas)) != -1) {
            //处理它
            System.out.println(new String(datas, 0, 1024));
        }
    } catch(IOException ioex) {
        ioex.printStackTrace();
    }
}

BufferedInputStream缓冲字节流

在java.io包中,介质流只存在三种:数组ByteArrayInputStream、字符StringBufferedInputStream(已被标记过时)和文件FileInputStream。而缓冲流这是这三种介质流上的处理器包装后的流,它们提供InputSteam所不能提供的额外功能。其实,BufferedInputStream就是上面批量处理操作的抽象过程,以便于采用面向对象的方式进行编程。废话不多说,直接上代码

  • 简单实现版
import java.io.FilterInputStream;
import java.io.InputStream;
import java.io.IOException;

public class MyBufferedInputStream extends FilterInputStream {
    private byte[] buf;

    private static final int DEFAULTSIZE = 8092;

    /** * 下一个读取字节的位置. */
    private int pos;

    /** * 当前缓冲数组中有效字节个数. */
    private int count;

    public MyBufferedInputStream(final InputStream in) {
        this(in, DEFAULTSIZE);
    }

    public MyBufferedInputStream(final InputStream in, final int size) {
        super(in);
        this.buf = new byte[size];
    }

    /** * 读取缓冲数组中的字节. */
    public int read() throws IOException {
        //当buf中已经缓存的字节都读取完了,那么将输入流的数据继续读入缓存buf中.
        if(pos >= count) {
            fill();
            //当发现输入流已经读取完毕了,那么退出.
            if(pos >= count) {
                return -1;
            }
        }
        return this.buf[pos++] & 0xff;  
    }

    /** * 将输入流in填充缓冲数组buf. */
    private void fill() throws IOException {
        this.count = this.in.read(this.buf, 0, this.buf.length);
        this.pos = 0;
    }
}

标记mark和reset重置的增强版

在InputStream中,mark和reset是配套使用来重复读取输入流中字节。BufferedInputStream中对mark和reset的操作就需要费点心思,因为mark标记的位置后,可能缓存数组buf中数据已经发生了变化,例如buf的size为10,而in总字节数为100,mark标记为5,但是reset发生在55,此时应该从哪个怎么读取?看下例子

public static void main(String[] args) {
    try {
        final byte[] src = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
        final ByteArrayInputStream bis = new ByteArrayInputStream(src);
        final BufferedInputStream bufis = new BufferedInputStream(bis, 5);
        int data = -1;
        int i = 0;
        while((data = bufis.read()) != -1) {
            if(data == 4) {
                bufis.mark(2);
            }
            if(i++ == 9) {
                bufis.reset();
            }
            System.out.printf("%d", data);
        }
    } catch(IOException ioex) {
        ioex.printStackTrace();
    }
}

在i++ == 6这个reset方法调用的条件中,可以发现i 的值可为(6,7,8,9),其他的值reset都会抛出java.io.IOException: Resetting to invalid mark;,其中:i<6时,因为markPos为-1,此时reset判断markPos<0,也即还未mark就调用了reset,抛出IOException;i > 9时,此时pos被重置为5,而pos == count,此时buf已经被读取完毕,则重新进行fill填充,此时判断readLimit小于buf.length的长度,那么重新填充buf。也就是readLimit是决定在mark允许进行reset范围内,如果reset未进行时,任然可以读取readLimit个字节。废话不多说,看代码注解。

import java.io.FilterInputStream;
import java.io.InputStream;
import java.io.IOException;

public class MyBufferedInputStream extends FilterInputStream {
    // 此处省略前面代码
    private int markPos;

    private int marklimit;
    /** * 则当外部读取到了buf的末尾,则将输入流in后面的字节填充到缓冲数组buf. */
    private void fill() throws IOException {
        byte[] buffer = this.buf;
        if(markPos < 0) {
            //1、如果没有进行mark标记,则从初始位置进行填充
            pos = 0
        } else if(pos >= buffer.length) {
            /** 2、当发现外部进行了mark标记,则记录标记位置,从当前位置开始到buf末尾的字节进行记录, 并且期望用户代码在当前缓冲期内能进行reset.例如,buf = [0, 1, 2, 3, 4],markPos = 3,则记录pos - markPos = 5 - 3 = 2个字节,也即buf[3]和buf[4],使用System.arraycopy进行变量赋值,buf=[3, 4, 2, 3, 4]; */
            if(markPos > 0) {
                int sz = pos - markPos;
                System.arraycopy(buffer, markPos, buffer, 0, sz);
                pos = sz;
                markPos = 0;
            } else if(buffer.length >= marklimit) {
                /** 当mark标记内的缓存期buf读取完毕后,任然没有进行reset操作时,则判断当前输入流还允许继续读入多少个字节,如果发现少于buffer的长度,则一次将buf填满 */
                markPos = -1;
                pos = 0;
            } else {
                /** 3、如果发现在mark标记后的缓存期内未进行reset,而readLimit足够大,则将当前缓存数组buf进行扩容(2倍) */              
                int nsz = 2 * pos;
                if(nsz > marklimit) {
                    nsz = marklimit;
                }
                byte[] nbuf = new byte[nsz];
                System.arraycopy(buffer, 0, nbuf, 0, pos);
                this.buf = buffer = nbuf;
            }
        }
        //从输入流中继续读取 buffer.length - pos个字节,使得buf容量为 n + pos
        count = pos;
        int n = this.in.read(buffer, pos, buffer.length - pos);
        if(n > 0) {
            count = n + pos;
        } 
    }

    public void mark(final int readlimit) {
        this.marklimit = readlimit;
        this.markPos = pos;
    }

    pubilc void reset() throws IOException {
        if(this.markPos < -1) {
            throw new IOException("Resetting to invalid mark");
        }
        pos = markPos;
    }
}

从上面分析可以看出,BufferedInputStream在提供mark和reset重复读取字节时的算法是,在mark后的buf.length+readLimit读取次数内,允许进行reset,超出这个范围内的reset都将获得一个IOException。

同步和安全

阅读过BufferedInputStream源码的同学会发现synchronized关键字修饰在了read、mark、reset和skip、availabe方法上,以保证这些操作的原子性,此外字段buf加以volatile关键字配以AtomicReferenceFileUpdater引用字段原子更新器来保证buf扩容赋值的原子性。

import java.io.FilterInputStream;
import java.io.InputStream;
import java.io.IOException;

public class MyBufferedInputStream extends FilterInputStream {
    private volatile byte[] buf;

    private static final AtomicReferenceFieldUpdater<BufferedInputStream, byte[]>
                bufUpdater = AtomicReferenceFieldUpdater.newUpdater(BufferedInputStream.class, byte[].class, "buf");
    //每次访问buf和in时都需要判断当前输入流和缓存是否可用
    private InputStream getInIfOpen() throws IOException {
        InputStream input = in;
        if(input == null) {
            throw new IOException("Stream closed");
        }
        return input;
    }
    private byte[] getBuffIfOpen() throws IOException {
        byte[] buffer = this.buf;
        if(buffer == null) {
            throw new IOException("Stream closed");
        }
        return buffer;
    }

    public synchronized int read() {....}
    public synchronized int read(byte[] b, int off, int length) {....}
    public synchronized void mark(int readlimit){....}
    pubic synchronized void reset() {....}
    public synchronized long skip(long n) {....}
    pubic synchronized int available() {....}

    pubic void close() throws IOException {
        byte[] buffer;
        while((buffer = buf) != null) {
            if(bufUpdater.compareAndSet(this, buffer, null)) {
                InputStream input = in;
                in = null;
                if(input != null) {
                    input.close();
                }
                return;
            }
        }
    }
}

结论

BufferedInputStream是作为缓存流来提高输入流文件的处理效率,但是它所提供的mark和reset的使用上还是需要加以注意,不然出其不意的IOException会让初学者萌逼的!