HDFS的读数据过程分析

时间:2022-06-26 05:20:42

我们继续在 FileSystem 类分析,读数据使用的是 open(…)方法,我们可以看到源码

FSDataInputStream in = fileSystem.open(new Path("/d100/f1"));

点击open

  /**
   * Opens an FSDataInputStream at the indicated Path.
   * @param f the file to open
   */
  public FSDataInputStream open(Path f) throws IOException {
    return open(f, getConf().getInt("io.file.buffer.size", 4096));
  }

在return open 的open按ctrl+t 选择第一个DistributedFileSystem

  public FSDataInputStream open(Path f, int bufferSize) throws IOException {
    statistics.incrementReadOps(1);
    return new DFSClient.DFSDataInputStream(
          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
  }

返回的是 DFSClient 类中 DFSDataInputStream 类,显而易见,这是一个内部类。这个内部类的构造函数,有两个形参,第一个参数是 dfs.open(…)创建的对象。我们

看一下方法的源码,

  /**
   * Create an input stream that obtains a nodelist from the
   * namenode, and then reads from all the right places.  Creates
   * inner subclass of InputStream that does the right out-of-band
   * work.
   */
  public DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
                      FileSystem.Statistics stats
      ) throws IOException {
    checkOpen();
    //    Get block info from namenode
    return new DFSInputStream(src, buffersize, verifyChecksum);
  }

返回的是一个 DFSInputStream 对象。该对象中含有 NameNode 中的数据块信息。我们看一下这个类的构造方法源码

    DFSInputStream(String src, int buffersize, boolean verifyChecksum
                   ) throws IOException {
      this.verifyChecksum = verifyChecksum;
      this.buffersize = buffersize;
      this.src = src;
      prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
      openInfo();
    }

openInfo()打开信息,openInfo()的源代码是获取数据块的信息的。我们查看这一行的源代码

    /**
     * Grab the open-file info from namenode
     */
    synchronized void openInfo() throws IOException {
      for (int retries = 3; retries > 0; retries--) {
        if (fetchLocatedBlocks()) {
          // fetch block success
          return;
        } else {
          // Last block location unavailable. When a cluster restarts,
          // DNs may not report immediately. At this time partial block
          // locations will not be available with NN for getting the length.
          // Lets retry a few times to get the length.
          DFSClient.LOG.warn("Last block locations unavailable. "
              + "Datanodes might not have reported blocks completely."
              + " Will retry for " + retries + " times");
          waitFor(4000);
        }
      }
      throw new IOException("Could not obtain the last block locations.");
    }

进去fetchLocatedBlocks()

    private boolean fetchLocatedBlocks() throws IOException,
        FileNotFoundException {
      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0,
          prefetchSize);
      if (newInfo == null) {
        throw new FileNotFoundException("File does not exist: " + src);
      }

      if (locatedBlocks != null && !locatedBlocks.isUnderConstruction()
          && !newInfo.isUnderConstruction()) {
        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks()
            .iterator();
        Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
        while (oldIter.hasNext() && newIter.hasNext()) {
          if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
            throw new IOException("Blocklist for " + src + " has changed!");
          }
        }
      }
      boolean isBlkInfoUpdated = updateBlockInfo(newInfo);
      this.locatedBlocks = newInfo;
      this.currentNode = null;
      return isBlkInfoUpdated;
    }

进入:callGetBlockLocations(namenode, src, 0, prefetchSize);

  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                    FileNotFoundException.class);
    }
  }

可以看到,获取数据块信息的方法也是通过调用 namenode 取得的。这里的 namenode 属性还是位于 DFSClient 中的。通过前面的分析,我们已经知道,在 DFSClient类中的 namenode 属性是 ClientProtocal。

可以参考上一篇文章 写数据的分析