HDFS读取文件操作的处理流程源码分析

时间:2020-11-29 07:46:53
HDFS读取文件的客户端主要代码:
 Configuration conf = new Configuration();
 FileSystem fs = FileSystem.get(URI.create(args[0]),conf);
 FSDataInputStream in = fs.open(new Path(args[0]));
 IOUtils.copyBytes(in, System.out, 4096, false);
------------------------------------------------
一.DFSClient与NameNode的交互流程
调用方法主要流程:DistributedFileSystem.open(Path f, int bufferSize) --->DFSClient.open(src) --->new DFSInputStream(String src, int buffersize, boolean verifyChecksum) --->DFSInputStream.openInfo() --->DFSInputStream.fetchLocatedBlocks()最终获取LocatedBlocks,返回DFSInputStream对象;
从fetchLocatedBlocks()方法开始讲解具体逻辑:
1.【DFSClient.callGetBlockLocations(ClientProtocol namenode,String path, long start, long length)】此方法返回获取LocatedBlocks对象;内部调用流程如下:【NameNode.getBlockLocations(String src,long offset, long length) --->FSNamesystem.getBlockLocations(String clientMachine, String src, long offset, long length)-->FSNamesystem.getBlockLocationsInternal(String src,long offset, long length, int nrBlocksToReturn, boolean doAccessTime, boolean needBlockToken)】,FSNamesystem.getBlockLocations()方法的内部处理逻辑如下:
 1.1)调用getBlockLocationsInternal()方法,此方法的处理逻辑如下:
  1.1.1)【dir.getFileINode(src)】根据path参数从FSNamesystem.dir中获取INodeFile对象:node(即根据路径从树形目录中找到叶子节点INodeFile对象);
  1.1.2)【inode.getBlocks()】从node中获取此文件所有的block:Block[];若block==null则返回LocatedBlocks=null,若block.length=0表示是空文件则返回LocatedBlocks对象,而此对象的blocks:List<LocatedBlock>变量的集合长度等于0;否则进行往下执行;
  1.1.3)根据每个block块的字节数和偏移量offset计算应该读取Block[]数组的第几个元素,已经下一次读取时的起始位置curpos;
  1.1.4)获取目标DataNode集合machineSet:DatanodeDescriptor[]:获取存放此block块的DataNode集合(即在BlockInfo的object[]的三元组中查找),并排除在FSNamesystem.corruptReplicas:CorruptReplicasMap集合(已经损坏的DataNode集合)中的DataNode,具体逻辑如下:
     1.1.4.1)若此block块对应的INode为正在创建的文件,且当前block块为此文件的最后一个Block块,Block中DataNode个数为零,则直接获取INodeFileUnderConstruction.targets:DatanodeDescriptor[]数组作为目标DataNode集合machineSet;
     1.1.4.2)否则,遍历BlockInfo中object[]三元组的DataNode(object[index*3]),若此DataNode不在FSNamesystem.corruptReplicas集合,则添加到目标DataNode集合machineSet中;
  1.1.5)【LocatedBlock(Block b, DatanodeInfo[] locs, long startOffset, boolean corrupt)】以block对象,DatanodeDescriptor数组machineSet,startOffset(偏移量),是否全部损坏来初始化LocatedBlock对象;其中LocatedBlock.locs =locs,LocatedBlock.b=b;最后将此LocatedBlock对象存入List<LocatedBlock>对象中;
  1.1.6)遍历1.4-1.5步,找出此次读取的N个Block块的LocatedBlock集合:List<LocatedBlock>对象;
  1.1.7)以List<LocatedBlock>对象为参数初始化LocatedBlocks对象,LocatedBlocks.fileLength表示文件总长度,LocatedBlocks.blocks表示LocatedBlock集合,LocatedBlocks.underConstruction表示此文件是否正在创建;
  1.1.8)返回此LocatedBlocks对象;
 1.2)若LocatedBlocks对象不为空,调用【NetworkTopology.pseudoSortByDistance(Node reader, Node[] nodes)】对LocatedBlocks对象中每个block对象对应的DatanodeDescriptor数组按照与client的网络距离进行排序,便于优先到近距离的DataNode上读取文件;
2.【DFSInputStream.updateBlockInfo(LocatedBlocks newInfo)】更新正在创建的文件的最后一个Block的大小,待分析;
3.将返回的LocatedBlocks对象赋值给DFSInputStream.locatedBlocks变量;
4.返回DFSInputStream对象给Client端;

最后调用new DFSClient.DFSDataInputStream(DFSInputStream in)方法,初始化DFSClient.DFSDataInputStream对象返回给客户端;

二.DFSClient与DataNode交互,读取文件内容【DFSClient.DFSInputStream.read(byte[] buf, int off, int len) 】
1.若DFSClient.DFSInputStream.pos(读取的开始位置)大于LocatedBlocks.fileLength(文件总长度),表示文件已经读取完毕,否则继续下面的处理逻辑;
2.【DFSInputStream.blockSeekTo(long target)】寻找读取block内容的目标DataNode,初始化DFSInputStream.blockReader对象(包括了建立的数据连接),并返回DatanodeInfo对象;
  2.1)【DFSInputStream.getBlockAt(long offset, boolean updatePosition)】DFSInputStream.locatedBlocks中的每个LocatedBlock都有一个startOffset标记此Block在文件的开始位置,通过此startOffset与offset比较,找出要读取的文件内容所在Block块的LocatedBlock对象targetBlock;
  2.2)【DFSInputStream.chooseDataNode(LocatedBlock block)】,选择一个目标DataNode;
    2.2.1)【block.getLocations()】从targetBlock对象中获取此block块对应的DatanodeInfo[]数组;
    2.2.2)从第0位开始遍历此DatanodeInfo[]数组,返回第一个不在DFSInputStream.deadNodes中的DatanodeInfo对象;deadNodes表示失效的DataNode列表;
    2.2.3)用上一步查到的DatanodeInfo对象创建一个DNAddrPair对象并返回,此对象用于网络连接;
  2.3)到目前为止,已经查找到的目标Block块信息,目标DataNode,目标网络地址(InetSocketAddress)。接下来,判断得到的目标网络地址是否为本地地址(DFSClient.localAddrMap中存放了本地地址列表,与此列表比较),若是本地地址,则进入第2.4步,若不是则进入第2.5步;
  2.4)【BlockReaderLocal.newBlockReader(Configuration conf, String file, Block blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, int socketTimeout, long  startOffset, long length, boolean connectToDnViaHostname)】:创建BlockReaderLocal对象,并赋值给DFSInputStream.blockReader变量;
    2.4.1)根据目标DataNode的端口号从BlockReaderLocal.localDatanodeInfoMap中查找LocalDatanodeInfo对象,若没有,则初始化此对象,并以此port为key放入localDatanodeInfoMap中;然后返回LocalDatanodeInfo对象;
    2.4.2)以Block对象在缓存(BlockReaderLocal.LocalDatanodeInfo.cache:Map<Block, BlockLocalPathInfo>)中查找BlockLocalPathInfo对象pathinfo,若pathinfo==null,则从DataNode获取,见2.4.3步;
    2.4.3)【BlockReaderLocal.getBlockPathInfo(Block blk, DatanodeInfo node, Configuration conf, int timeout, Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)】缓存中没有此Block对象对应的BlockLocalPathInfo对象,调用此方法从DataNode端获取BlockLocalPathInfo对象pathinfo:
        2.4.3.1)【DataNode.getBlockLocalPathInfo(Block block, Token<BlockTokenIdentifier> token)】内部调用【DataNode.data.getBlockLocalPathInfo(Block
block)】:
            1)根据block对象从FSDataset.volumeMap中获取DatanodeBlockInfo对象info并调用info.getFile()找到datafile("blk_blockid"文件的File对象)
            2)生成对应的meta文件的FIle对象metafile;
            3)【BlockLocalPathInfo(Block b, String file, String metafile)】初始化BlockLocalPathInfo对象,其成员变量包括Block对象,"blk_blockid"文件的绝对路径,meta文件的绝对路径;
        2.4.3.2)将返回的BlockLocalPathInfo对象存入缓存(BlockReaderLocal.LocalDatanodeInfo.cache)中;
    2.4.4)根据BlockLocalPathInfo的block路径("blk_blockid"文件的绝对路径)初始化File对象,并初始化文件输入流dataIn(FileInputStream对象);
    2.4.5)读取配置"dfs.client.read.shortcircuit.skip.checksum",若为true(表示跳过校验和)调用【BlockReaderLocal(Configuration conf, String hdfsfile, Block  block, Token<BlockTokenIdentifier> token, long startOffset, long length, BlockLocalPathInfo pathinfo,  DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, FileInputStream  checksumIn)】用文件输入流对象,起始偏移量,block等信息为参数初始化BlockReaderLocal对象;
    2.4.6)若配置"dfs.client.read.shortcircuit.skip.checksum"为false,则先读取meta文件的校验和,然后调用【BlockReaderLocal(Configuration conf, String hdfsfile, Block block, Token<BlockTokenIdentifier> token, long startOffset, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, FileInputStream  checksumIn)】初始化BlockReaderLocal对象;
    2.4.7)将上述初始化的BlockReaderLocal对象赋值给DFSInputStream.blockReader变量;
  2.5)若目标DataNode不是本地地址,则创建RemoteBlockReader对象,并赋值给DFSInputStream.blockReader变量:
    2.5.1)根据目标DataNode地址创建socket对象;
    2.5.2)【RemoteBlockReader.newBlockReader(Socket sock,String file, long blockId, Token<BlockTokenIdentifier> accessToken, long genStamp, long startOffset,long len, int bufferSize, boolean verifyChecksum, String clientName)】初始化RemoteBlockReader对象:
        2.5.2.1)根据socket对象创建输出流[DataOutputStream];
        2.5.2.2)向输出流中写入消息头信息,包括DATA_TRANSFER_VERSION,DataTransferProtocol.OP_READ_BLOCK,blockid,GenerationStamp,startOffset,len(读取的字节数),clientName等信息;由DataNode端的DataXceiverServer线程处理收到的信息头数据【见下面DataNode端处理部分】;
        2.5.2.3)根据socket对象创建输入流[DataInputStream];解析从DataNode传来的信息,首先解析DataNode返回的状态信息,若状态表示不成功,则直接向上抛出异常,若状态为成功,则继续:
        2.5.2.4)从输入流读取5个字节(type,bpc)并以此信息初始化数据校验和DataChecksum对象;
        2.5.2.5)【RemoteBlockReader.RemoteBlockReader(String file, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, Socket dnSock)】初始化RemoteBlockReader对象,并返回;
    2.5.3)将上述初始化的BlockReaderLocal对象赋值给DFSInputStream.blockReader变量;
3.将第2步返回的DatanodeInfo对象赋值给DFSInputStream.currentNode变量;
4.【DFSInputStream.readBuffer(byte[] buf, int off, int len)】,内部调用【DFSInputStream.blockReader.read(buf, off, len)】从DataNode端读取文件内容并存入字节数组buf[]:byte中;
   4.1)当DataNode为本地地址,则调用【DFSClient.BlockReaderLocal.read(byte[] buf, int off, int len)】,内部调用在第2.4.4步中创建的输入流读取文件内容;
      4.1.1)若BlockReaderLocal.checksum=null,即不需要校验和,则直接调用BlockReaderLocal.dataIn.read(buf, off, len)读取文件内容;
      4.1.2)若需要校验和,则最终调用【BlockReaderLocal.readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf)】方法,在此方法中通过BlockReaderLocal.dataIn输入流读取文件内容;
   4.2)当DataNode为远程地址,调用【DFSClient.RemoteBlockReader.read(byte[] buf, int off, int len)】
      4.2.1)【FSInputChecker.read1(byte[] b, int off, int len)】方法,最终调用的是【RemoteBlockReader.readChunk(long pos, byte[] buf, int offset,int len, byte[] checksumBuf)】方法来读取远程DataNode传来的文件内容;
      4.2.2)【RemoteBlockReader.checksumOk(Socket sock)】校验和验证合格,向远程DataNode端发送回执消息,   
5.DFSInputStream.pos的值增加此次读取的字节数(readBuffer()方法返回的值);


三.若第二步中的DataNode为远程地址,则在第二.2.5.2步中发出读取文件请求后,远程DataNode端的DataXceiverServer线程监听【ss.accept()】到有请求到达时启动DataXceiver线程,具体处理逻辑如下:
1.在监听的Socket上建立一个DataInputStream对象;
2.解析输入流,从输入流中获取操作信息,若是DataTransferProtocol.OP_READ_BLOCK操作(读操作),则调用【DataXceiver.readBlock(DataInputStream in)】方法:
  2.1)根据消息头中的blockid,GenerationStamp创建Block对象;
  2.2)读取开始偏移量startOffset和字节总数length;
  2.3)创建数据输出流out:DataOutputStream,用于与DFClient交互,返回响应消息;
  2.4)【BlockSender(Block block, long startOffset, long length,boolean corruptChecksumOk, boolean chunkOffsetOK,boolean verifyChecksum, DataNode datanode, String clientTraceFmt)】初始化BlockSender对象,此对象用于从DataNode的硬盘中读取文件内容并发送给客户端;
     2.4.1)根据Block对象在FSDataset.volumeMap中查找"blk_blockid"文件的绝对路径,并检查此路径下是否存在meta文件,若存在则为此meta文件创建MetaDataInputStream对象,并封装成DataInputStream对象,赋值给BlockSender.checksumIn变量,并读取meta文件的头生成BlockMetadataHeader对象header,BlockSender.checksum =header.getChecksum();若不存在meta文件,则调用【DataChecksum.newDataChecksum(0,16 * 1024)】创建一个DataChecksum对象赋值给BlockSender.checksum变量;
     2.4.2)【datanode.data.getBlockInputStream(block, offset)】根据Block对象在FSDataset.volumeMap中查找对应的block文件的File对象,然后初始化FileInputStream对象;
         2.4.2.1)【FSDataset.getBlockFileNoExistsCheck(Block b)】根据Block对象在FSDataset.volumeMap中查找对应的block文件的File对象blockFile;
         2.4.2.2)【new RandomAccessFile(blockFile, "r")】根据上一步创建的File对象初始化一个随机读取的File对象blockInFile;
         2.4.2.3)【blockInFile.seek(offset)】设置从此File对象的哪个位置开始读取;
         2.4.2.4)根据blockInFile对象初始化FileInputStream输入流,并返回此FileInputStream对象;
     2.4.3)将上一步中创建的FileInputStream对象赋值给BlockSender.blockIn变量;
     2.4.4)初始化BlockSender.MemoizedBlock对象,并赋值给BlockSender.memoizedBlock变量;     
  2.5)向第2.3步中创建的输出流out中写入成功状态码;
  2.6)【blockSender.sendBlock(out, baseStream, null)】发送文件内容给客户端:
     2.6.1)将校验和字段信息写入数据输出流out中,并立即调用out.flush()方法;
     2.6.2)若baseStream为SocketOutputStream实例,blockIn为FileInputStream实例时,计算BlockSender.blockInPosition变量的值:
             1) FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
             2) blockInPosition = fileChannel.position();
     2.6.3)计算Byte的大小,并初始化ByteBuffer:pktBuf;
     2.6.4)【BlockSender.sendChunks(ByteBuffer pkt, int maxChunks,OutputStream out)】发送文件数据内容:
         2.6.4.1)写数据头信息到pktBuf中,包括:packet长度,开始偏移量,序号,字节长度等信息;
         2.6.4.2)若checksumSize > 0 && checksumIn != null,则【checksumIn.readFully(byte[] b, int off, int len)方法】输入流BlockSender.checksumIn将meta文件内容写入pktBuf中;
         2.6.4.3)若在2.6.2步计算的BlockSender.blockInPosition<0,则用一般的读取方式:
                1)【IOUtils.readFully(blockIn, buf, dataOff, len)】,内部调用【blockIn.read(byte[] b, int off, int len)】方法,将文件内容写入pktBuf中;
                2)【out.write(byte[] b, int off, int len)】,将上述写入pktBuf字节缓存中的文件内容全部写入输出流out中;
         2.6.4.4) 若在2.6.2步计算的BlockSender.blockInPosition>=0,则采用NIO方式读取文件内容:
                1)【BlockSender.memoizedBlock.hasBlockChanged(long dataLen)】若block被修改,则调用【IOUtils.readFileChannelFully(FileChannel fileChannel, byte[] buf, int off, int len)】方法将文件内容添加到pktBuf中;然后调用【out.write(byte[] b, int off, int len)】方法将上述写入pktBuf字节缓存中的文件内容全部写入输出流out中;
                2)若block未被修改,则先调用【out.write(byte[] b, int off, int len)】方法将上述写入pktBuf字节缓存中的文件内容全部写入输出流out中;然后调用【SocketOutputStream.transferToFully(FileChannel fileCh, long position, int count)】方法直接将FileChannel写入目标Channel中,【涉及NIO技术】。此方法效率更高;
     2.6.5)最后调用out.writeInt(0); 和  out.flush();方法,完成文件内容的写入;
  2.7)【blockSender.isBlockReadFully()】若文件内容发送完成,且收到客户端对文件内容校验和验证通过的回执消息,则调用【datanode.blockScanner.verifiedByClient(block)】方法;在此方法中更新DataBlockScanner.blockMap和DataBlockScanner.blockInfoSet,与容错机制有关,待分析;