SequenceFile文件

时间:2023-03-10 01:14:44
SequenceFile文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。目前,也有不少人在该文件的基础之上提出了一些HDFS中小文件存储的解决方案,他们的基本思路就是将小文件进行合并成一个大文件,同时对这些小文件的位置信息构建索引。不过,这类解决方案还涉及到Hadoop的另一种文件格式——MapFile文件。SequenceFile文件并不保证其存储的key-value数据是按照key的某个顺序存储的,同时不支持append操作。

在SequenceFile文件中,每一个key-value被看做是一条记录(Record),因此基于Record的压缩策略,SequenceFile文件可支持三种压缩类型(SequenceFile.CompressionType):

NONE: 对records不进行压缩;

RECORD: 仅压缩每一个record中的value值;

BLOCK: 将一个block中的所有records压缩在一起;

那么,基于这三种压缩类型,Hadoop提供了对应的三种类型的Writer:

SequenceFile.Writer  写入时不压缩任何的key-value对(Record);

  1. public static class Writer implements java.io.Closeable {
  2. ...
  3. //初始化Writer
  4. void init(Path name, Configuration conf, FSDataOutputStream out, Class keyClass, Class valClass, boolean compress, CompressionCodec codec, Metadata metadata) throws IOException {
  5. this.conf = conf;
  6. this.out = out;
  7. this.keyClass = keyClass;
  8. this.valClass = valClass;
  9. this.compress = compress;
  10. this.codec = codec;
  11. this.metadata = metadata;
  12. //创建非压缩的对象序列化器
  13. SerializationFactory serializationFactory = new SerializationFactory(conf);
  14. this.keySerializer = serializationFactory.getSerializer(keyClass);
  15. this.keySerializer.open(buffer);
  16. this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
  17. this.uncompressedValSerializer.open(buffer);
  18. //创建可压缩的对象序列化器
  19. if (this.codec != null) {
  20. ReflectionUtils.setConf(this.codec, this.conf);
  21. this.compressor = CodecPool.getCompressor(this.codec);
  22. this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
  23. this.deflateOut = new DataOutputStream(new BufferedOutputStream(deflateFilter));
  24. this.compressedValSerializer = serializationFactory.getSerializer(valClass);
  25. this.compressedValSerializer.open(deflateOut);
  26. }
  27. }
  28. //添加一条记录(key-value,对象值需要序列化)
  29. public synchronized void append(Object key, Object val) throws IOException {
  30. if (key.getClass() != keyClass)
  31. throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);
  32. if (val.getClass() != valClass)
  33. throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);
  34. buffer.reset();
  35. //序列化key(将key转化为二进制数组),并写入缓存buffer中
  36. keySerializer.serialize(key);
  37. int keyLength = buffer.getLength();
  38. if (keyLength < 0)
  39. throw new IOException("negative length keys not allowed: " + key);
  40. //compress在初始化是被置为false
  41. if (compress) {
  42. deflateFilter.resetState();
  43. compressedValSerializer.serialize(val);
  44. deflateOut.flush();
  45. deflateFilter.finish();
  46. } else {
  47. //序列化value值(不压缩),并将其写入缓存buffer中
  48. uncompressedValSerializer.serialize(val);
  49. }
  50. //将这条记录写入文件流
  51. checkAndWriteSync();                                // sync
  52. out.writeInt(buffer.getLength());                   // total record length
  53. out.writeInt(keyLength);                            // key portion length
  54. out.write(buffer.getData(), 0, buffer.getLength()); // data
  55. }
  56. //添加一条记录(key-value,二进制值)
  57. public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
  58. if (keyLength < 0)
  59. throw new IOException("negative length keys not allowed: " + keyLength);
  60. int valLength = val.getSize();
  61. checkAndWriteSync();
  62. //直接将key-value写入文件流
  63. out.writeInt(keyLength+valLength);          // total record length
  64. out.writeInt(keyLength);                    // key portion length
  65. out.write(keyData, keyOffset, keyLength);   // key
  66. val.writeUncompressedBytes(out);            // value
  67. }
  68. ...
  69. }

SequenceFile.RecordCompressWriter写入时只压缩key-value对(Record)中的value;

  1. static class RecordCompressWriter extends Writer {
  2. ...
  3. public synchronized void append(Object key, Object val) throws IOException {
  4. if (key.getClass() != keyClass)
  5. throw new IOException("wrong key class: "+key.getClass().getName() +" is not "+keyClass);
  6. if (val.getClass() != valClass)
  7. throw new IOException("wrong value class: "+val.getClass().getName() +" is not "+valClass);
  8. buffer.reset();
  9. //序列化key(将key转化为二进制数组),并写入缓存buffer中
  10. keySerializer.serialize(key);
  11. int keyLength = buffer.getLength();
  12. if (keyLength < 0)
  13. throw new IOException("negative length keys not allowed: " + key);
  14. //序列化value值(不压缩),并将其写入缓存buffer中
  15. deflateFilter.resetState();
  16. compressedValSerializer.serialize(val);
  17. deflateOut.flush();
  18. deflateFilter.finish();
  19. //将这条记录写入文件流
  20. checkAndWriteSync();                                // sync
  21. out.writeInt(buffer.getLength());                   // total record length
  22. out.writeInt(keyLength);                            // key portion length
  23. out.write(buffer.getData(), 0, buffer.getLength()); // data
  24. }
  25. /** 添加一条记录(key-value,二进制值,value已压缩) */
  26. public synchronized void appendRaw(byte[] keyData, int keyOffset,
  27. int keyLength, ValueBytes val) throws IOException {
  28. if (keyLength < 0)
  29. throw new IOException("negative length keys not allowed: " + keyLength);
  30. int valLength = val.getSize();
  31. checkAndWriteSync();                        // sync
  32. out.writeInt(keyLength+valLength);          // total record length
  33. out.writeInt(keyLength);                    // key portion length
  34. out.write(keyData, keyOffset, keyLength);   // 'key' data
  35. val.writeCompressedBytes(out);              // 'value' data
  36. }
  37. } // RecordCompressionWriter
  38. ...
  39. }

SequenceFile.BlockCompressWriter 写入时将一批key-value对(Record)压缩成一个Block;

  1. static class BlockCompressWriter extends Writer {
  2. ...
  3. void init(int compressionBlockSize) throws IOException {
  4. this.compressionBlockSize = compressionBlockSize;
  5. keySerializer.close();
  6. keySerializer.open(keyBuffer);
  7. uncompressedValSerializer.close();
  8. uncompressedValSerializer.open(valBuffer);
  9. }
  10. /** Workhorse to check and write out compressed data/lengths */
  11. private synchronized void writeBuffer(DataOutputBuffer uncompressedDataBuffer) throws IOException {
  12. deflateFilter.resetState();
  13. buffer.reset();
  14. deflateOut.write(uncompressedDataBuffer.getData(), 0, uncompressedDataBuffer.getLength());
  15. deflateOut.flush();
  16. deflateFilter.finish();
  17. WritableUtils.writeVInt(out, buffer.getLength());
  18. out.write(buffer.getData(), 0, buffer.getLength());
  19. }
  20. /** Compress and flush contents to dfs */
  21. public synchronized void sync() throws IOException {
  22. if (noBufferedRecords > 0) {
  23. super.sync();
  24. // No. of records
  25. WritableUtils.writeVInt(out, noBufferedRecords);
  26. // Write 'keys' and lengths
  27. writeBuffer(keyLenBuffer);
  28. writeBuffer(keyBuffer);
  29. // Write 'values' and lengths
  30. writeBuffer(valLenBuffer);
  31. writeBuffer(valBuffer);
  32. // Flush the file-stream
  33. out.flush();
  34. // Reset internal states
  35. keyLenBuffer.reset();
  36. keyBuffer.reset();
  37. valLenBuffer.reset();
  38. valBuffer.reset();
  39. noBufferedRecords = 0;
  40. }
  41. }
  42. //添加一条记录(key-value,对象值需要序列化)
  43. public synchronized void append(Object key, Object val) throws IOException {
  44. if (key.getClass() != keyClass)
  45. throw new IOException("wrong key class: "+key+" is not "+keyClass);
  46. if (val.getClass() != valClass)
  47. throw new IOException("wrong value class: "+val+" is not "+valClass);
  48. //序列化key(将key转化为二进制数组)(未压缩),并写入缓存keyBuffer中
  49. int oldKeyLength = keyBuffer.getLength();
  50. keySerializer.serialize(key);
  51. int keyLength = keyBuffer.getLength() - oldKeyLength;
  52. if (keyLength < 0)
  53. throw new IOException("negative length keys not allowed: " + key);
  54. WritableUtils.writeVInt(keyLenBuffer, keyLength);
  55. //序列化value(将value转化为二进制数组)(未压缩),并写入缓存valBuffer中
  56. int oldValLength = valBuffer.getLength();
  57. uncompressedValSerializer.serialize(val);
  58. int valLength = valBuffer.getLength() - oldValLength;
  59. WritableUtils.writeVInt(valLenBuffer, valLength);
  60. // Added another key/value pair
  61. ++noBufferedRecords;
  62. // Compress and flush?
  63. int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
  64. //block已满,可将整个block进行压缩并写入文件流
  65. if (currentBlockSize >= compressionBlockSize) {
  66. sync();
  67. }
  68. }
  69. /**添加一条记录(key-value,二进制值,value已压缩). */
  70. public synchronized void appendRaw(byte[] keyData, int keyOffset, int keyLength, ValueBytes val) throws IOException {
  71. if (keyLength < 0)
  72. throw new IOException("negative length keys not allowed");
  73. int valLength = val.getSize();
  74. // Save key/value data in relevant buffers
  75. WritableUtils.writeVInt(keyLenBuffer, keyLength);
  76. keyBuffer.write(keyData, keyOffset, keyLength);
  77. WritableUtils.writeVInt(valLenBuffer, valLength);
  78. val.writeUncompressedBytes(valBuffer);
  79. // Added another key/value pair
  80. ++noBufferedRecords;
  81. // Compress and flush?
  82. int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
  83. if (currentBlockSize >= compressionBlockSize) {
  84. sync();
  85. }
  86. }
  87. } // RecordCompressionWriter
  88. ...
  89. }

源码中,block的大小compressionBlockSize默认值为1000000,也可通过配置参数io.seqfile.compress.blocksize来指定。

根据三种压缩算法,共有三种类型的SequenceFile文件格式:

1). Uncompressed SequenceFile

SequenceFile文件

2). Record-Compressed SequenceFile

SequenceFile文件

3). Block-Compressed SequenceFile

SequenceFile文件