Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

时间:2023-01-19 08:33:20

一、综述

      HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等众多角色的分工与合作。

      首先上一段代码,客户端是如何写文件的:

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path file = new Path("demo.txt");
FSDataOutputStream outStream = fs.create(file);
out.write("Welcome to HDFS Java API !!!".getBytes("UTF-8"));
outStream.close();
      只有简单的6行代码,客户端封装的如此简洁,各组件间的RPC调用、异常处理、容错等均对客户端透明。

      总体来说,最简单的HDFS写文件大体流程如下:

      1、客户端获取文件系统实例FileSyStem,并通过其create()方法获取文件系统输出流outputStream;

            1.1、首先会联系名字节点NameNode,通过ClientProtocol.create()RPC调用,在名字节点上创建文件元数据,并获取文件状态FileStatus;

            1.2、通过文件状态FileStatus构造文件系统输出流outputStream;

      2、通过文件系统输出流outputStream写入数据;

             2.1、首次写入会首先向名字节点申请数据块,名字节点能够掌握集群DataNode整体状况,分配数据块后,连同DataNode列表信息返回给客户端;

             2.2、客户端采用流式管道的方式写入数据节点列表中的第一个DataNode,并由列表中的前一个DataNode将数据转发给后面一个DataNode;

             2.3、确认数据包由DataNode经过管道依次返回给上游DataNode和客户端;

             2.4、写满一个数据块后,向名字节点提交一个数据;

             2.5、再次重复2.1-2.4过程;

      3、向名字节点提交文件(complete file),即告知名字节点文件已写完,然后关闭文件系统输出流outputStream等释放资源。


      可以看出,在不考虑异常等的情况下,上述过程还是比较复杂的。本文,我将着重阐述下HDFS写数据时,客户端是如何实现的,关于NameNode、DataNode等的配合等,后续文章将陆续推出,敬请关注!

二、实现分析

      我们将带着以下问题来分析客户端写入数据过程:

      1、如何获取数据输出流?

      2、如何通过数据输出流写入数据?

      3、数据输出流关闭时都做了什么?

      4、如果发生异常怎么办?即如何容错?


      (一)如何获取数据输出流?

      HDFS客户端获取数据流是一个复杂的过程,流程图如下:

Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)

      

      以DistributedFileSystem为例,create()是其入口方法,DistributedFileSystem内部封装了一个DFS的客户端,如下:

  DFSClient dfs;
      在DistributedFileSystem的初始化方法initialize()中,会构造这个文件系统客户端,如下:

    this.dfs = new DFSClient(uri, conf, statistics);

      而create()方法就是通过这个文件系统客户端dfs获取数据输出流的,如下:

  @Override  public FSDataOutputStream create(final Path f, final FsPermission permission,    final EnumSet<CreateFlag> cflags, final int bufferSize,    final short replication, final long blockSize, final Progressable progress,    final ChecksumOpt checksumOpt) throws IOException {    statistics.incrementWriteOps(1);    Path absF = fixRelativePart(f);    return new FileSystemLinkResolver<FSDataOutputStream>() {            /*       * 创建文件系统数据输出流       */      @Override      public FSDataOutputStream doCall(final Path p)          throws IOException, UnresolvedLinkException {            // 调用create()方法创建文件,并获取文件系统输出流      final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,                cflags, replication, blockSize, progress, bufferSize,                checksumOpt);        return dfs.createWrappedOutputStream(dfsos, statistics);      }      @Override      public FSDataOutputStream next(final FileSystem fs, final Path p)          throws IOException {        return fs.create(p, permission, cflags, bufferSize,            replication, blockSize, progress, checksumOpt);      }    }.resolve(this, absF);  }
      FileSystemLinkResolver是一个文件系统链接解析器(抽象类),我们待会再分析它,这里只要知道,该抽象类实例化后会通过resolve()方法--doCall()方法得到数据输出流即可。接着往下DFSClient的create()方法,省略部分代码,如下:

    // 为create构建一个数据输出流    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,        src, masked, flag, createParent, replication, blockSize, progress,        buffersize, dfsClientConf.createChecksum(checksumOpt),        getFavoredNodesStr(favoredNodes));        // 开启文件租约    beginFileLease(result.getFileId(), result);    return result;
      实际上,它又通过DFSOutputStream的newStreamForCreate()方法来获取数据输出流,并开启文件租约。租约的内容我们后续再讲,继续看下如何获取文件输出流的,如下:

  /**   * 为创建文件构造一个新的输出流   */  static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,      FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,      short replication, long blockSize, Progressable progress, int buffersize,      DataChecksum checksum, String[] favoredNodes) throws IOException {    TraceScope scope =        dfsClient.getPathTraceScope("newStreamForCreate", src);    try {      HdfsFileStatus stat = null;      // Retry the create if we get a RetryStartFileException up to a maximum      // number of times      boolean shouldRetry = true;      int retryCount = CREATE_RETRY_COUNT;      while (shouldRetry) {        shouldRetry = false;        try {          // 首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,              new EnumSetWritable<CreateFlag>(flag), createParent, replication,              blockSize, SUPPORTED_CRYPTO_VERSIONS);          break;        } catch (RemoteException re) {          IOException e = re.unwrapRemoteException(              AccessControlException.class,              DSQuotaExceededException.class,              FileAlreadyExistsException.class,              FileNotFoundException.class,              ParentNotDirectoryException.class,              NSQuotaExceededException.class,              RetryStartFileException.class,              SafeModeException.class,              UnresolvedPathException.class,              SnapshotAccessControlException.class,              UnknownCryptoProtocolVersionException.class);          if (e instanceof RetryStartFileException) {            if (retryCount > 0) {              shouldRetry = true;              retryCount--;            } else {              throw new IOException("Too many retries because of encryption" +                  " zone operations", e);            }          } else {            throw e;          }        }      }      Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");            // 构造一个数据输出流      final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,          flag, progress, checksum, favoredNodes);            // 启动数据输出流      out.start();            return out;    } finally {      scope.close();    }  }
      大体可以分为三步:

      1、首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态HdfsFileStatus;

      2、构造一个数据输出流;

      3、启动数据输出流。

      上述连接NameNode节点创建文件的过程中,如果发生瞬时错误,会充分利用重试机制,增加系统容错性。DFSClient中nameNode的Create()方法,实际上是调用的是客户端与名字节点间的RPC--ClientProtocol的create()方法,该方法的作用即是在NameNode上创建一个空文件,并返回文件状态。文件状态主要包括以下信息:

  // 文件路径  private final byte[] path;  // local name of the inode that's encoded in java UTF8  // 符号连接  private final byte[] symlink; // symlink target encoded in java UTF8 or null  private final long length;// 文件长度  private final boolean isdir;// 是否为目录  private final short block_replication;// 数据块副本数  private final long blocksize;// 数据块大小  private final long modification_time;// 修改时间  private final long access_time;// 访问时间  private final FsPermission permission;// 权限  private final String owner;// 文件所有者  private final String group;// 文件所属组  private final long fileId;// 文件ID
      继续看如何构造一个数据输出流,实际上它是通过构造DFSOutputStream实例获取的,而DFSOutputStream的构造方法如下:

  /** Construct a new output stream for creating a file. */  private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,      EnumSet<CreateFlag> flag, Progressable progress,      DataChecksum checksum, String[] favoredNodes) throws IOException {    this(dfsClient, src, progress, stat, checksum);    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);    // 计算数据包块大小    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);    // 构造数据流对象    streamer = new DataStreamer(stat, null);    if (favoredNodes != null && favoredNodes.length != 0) {      streamer.setFavoredNodes(favoredNodes);    }  }
      首先计算数据包块大小,然后构造数据流对象,后续就依靠这个数据流对象来通过管道发送流式数据。接下来便是启动数据输出流,如下:

  private synchronized void start() {    streamer.start();  }
      很简单,实际上也就是启动数据流对象,通过这个数据流对象实现数据的发送。

      中间为什么会有计算数据包块大小这一步呢?原来,数据的发送是通过一个个数据包发送出去的,而不是通过数据块发送的。设想下,如果按照一个数据块(默认128M)大小发送数据,合理吗?至于数据包大小是如何确定的,我们后续再讲。

      (二)如何通过数据输出流写入数据?

      下面,该看看如何通过数据输出流写入数据了。要解决这个问题,首先分析下DFSOutputStream和DataStreamer是什么。

      1、DFSOutputStream

      DFSOutputStream是分布式文件系统输出流,它内部封装了两个队列:发送数据包队列和确认数据包队列,如下:

  // 发送数据包队列  private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();  // 确认数据包队列  private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
      客户端写入的数据,会addLast入发送数据包队列dataQueue,然后交给DataStreamer处理。

      2、DataStreamer

      DataStreamer是一个后台工作线程,它负责在数据流管道中往DataNode发送数据包。它从NameNode申请获取一个新的数据块ID和数据块位置,然后开始往DataNode的管道写入流式数据包。每个数据包都有一个序列号sequence number。当一个数据块所有的数据包被发送出去,并且每个数据包的确认信息acks被接收到的话,DataStreamer关闭当前数据块,然后再向NameNode申请下一个数据块。

      所以,才会有上述发送数据包和确认数据包这两个队列。

      DataStreamer内部有很多变量,大体如下:

// streamer关闭标志位  private volatile boolean streamerClosed = false;// 扩展块,它的长度是已经确认ack的bytes大小    private ExtendedBlock block; // its length is number of bytes acked    private Token<BlockTokenIdentifier> accessToken;        // 数据输出流    private DataOutputStream blockStream;    // 数据输入流:即回复流    private DataInputStream blockReplyStream;    // 响应处理器    private ResponseProcessor response = null;    // 当前块的数据块列表    private volatile DatanodeInfo[] nodes = null; // list of targets for current block    // 存储类型    private volatile StorageType[] storageTypes = null;    // 存储ID    private volatile String[] storageIDs = null;        // 需要排除的节点    private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =        CacheBuilder.newBuilder()        .expireAfterWrite(            dfsClient.getConf().excludedNodesCacheExpiry,            TimeUnit.MILLISECONDS)        .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {          @Override          public void onRemoval(              RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {            DFSClient.LOG.info("Removing node " +                notification.getKey() + " from the excluded nodes list");          }        })        .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {          @Override          public DatanodeInfo load(DatanodeInfo key) throws Exception {            return key;          }        });        // 优先节点    private String[] favoredNodes;    // 是否存在错误    volatile boolean hasError = false;    volatile int errorIndex = -1;    // Restarting node index    // 从哪个节点重试的索引    AtomicInteger restartingNodeIndex = new AtomicInteger(-1);    private long restartDeadline = 0; // Deadline of DN restart    // 当前数据块构造阶段    private BlockConstructionStage stage;  // block construction stage    // 已发送数据大小    private long bytesSent = 0; // number of bytes that've been sent    private final boolean isLazyPersistFile;    /** Nodes have been used in the pipeline before and have failed. */    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();    /** The last ack sequence number before pipeline failure. */    // 管道pipeline失败前的最后一个确认包序列号    private long lastAckedSeqnoBeforeFailure = -1;    // 管道恢复次数    private int pipelineRecoveryCount = 0;    /** Has the current block been hflushed? */    // 当前数据块是否已被Hflushed    private boolean isHflushed = false;    /** Append on an existing block? */    // 是否需要在现有块上append    private final boolean isAppend;
      有很多比较简单,不再赘述。这里只讲解几个比较重要的:

      1、BlockConstructionStage stage

      当前数据块构造阶段。针对create()这种写入 来说,开始时默认是BlockConstructionStage.PIPELINE_SETUP_CREATE,即管道初始化时需要向NameNode申请数据块及所在数据节点的状态,这个很容易理解。有了数据块和其所在数据节点所在列表,才能形成管道列表不是?在数据流传输过程中,即一个数据块写入的过程中,虽然有多次数据包写入,但状态始终为DATA_STREAMING,即正在流式写入的阶段。而当发生异常时,则是PIPELINE_SETUP_STREAMING_RECOVERY状态,即需要从流式数据中进行恢复,如果一个数据块写满,则会进入下一个周期,PIPELINE_SETUP_CREATE->DATA_STREAMING,最后数据全部写完后,状态会变成PIPELINE_CLOSE,并且如果发生异常的话,会有一个特殊状态对应,即PIPELINE_CLOSE_RECOVERY。而append开始时则是对应的状态PIPELINE_SETUP_APPEND及异常状态PIPELINE_SETUP_APPEND_RECOVERY,其它则一致。

      2、volatile boolean hasError = false

      这个状态位用来标记数据写入过程中,是否存在错误,方便进行容错。

      3、ResponseProcessor response

      响应处理器。这个也是后台工作线程,它会处理来自DataNode回复流中的确认包,确认数据是否发送成功,如果成功,将确认包从确认数据包队列中移除,否则进行容错处理。


      create()模式下的DataStreamer构造比较简单,如下:

    private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {      isAppend = false;      isLazyPersistFile = isLazyPersist(stat);      this.block = block;      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;    }
      isAppend设置为false,即不是append写入,BlockConstructionStage默认为PIPELINE_SETUP_CREATE,即需要向NameNode写入数据块。


      我们首先看下DataStreamer是如何发送数据的。上面讲到过,DFSOutputStream中包括两个队列:发送数据包队列和确认数据包队列。这类似于两个生产者消--费者模型。针对发送数据包队列,外部写入者为生产者,DataStreamer为消费者。外部持续写入数据至发送数据包队列,DataStreamer则从中消费数据,判断是否需要申请数据块,然后写入数据节点流式管道。而确认数据包队列,DataStreamer为生产者,ResponseProcessor为消费者。首先,确认数据包队列数据的产生,是DataStreamer发送数据给DataNode后,从发送数据包队列挪过来的,而当ResponseProcessor线程确认接收到数据节点的ack确认包后,再从数据确认队列中删除。

      关于ResponseProcessor线程,稍后再讲。

      

      数据写入过程之DataStreamer

      首先看DataStreamer的run()方法,它会在数据流没有关闭,且dfs客户端正在运行的情况下,一直循环,循环内处理的大体流程如下:

      1、如果遇到一个错误(hasErro),且响应器尚未关闭,关闭响应器,使之join等待;

      2、如果有DataNode相关IO错误,先预先处理,初始化一些管道和流的信息,并决定外部是否等待,等待意即可以进行容错处理,不等待则数目错误比较严重,无法进行容错处理:这里还判断了errorIndex标志位和restartingNodeIndex的大小,意思是是否是由某个具体数据节点引起的错误,如果是的话,这种错误理论上是可以处理的;

      3、没有数据时,等待一个数据包发送:等待的条件是:当前流没有关闭(!streamerClosed)、没有错误(hasError)、dfs客户端正在 运行(dfsClient.clientRunning )、dataQueue队列大小为0,且当前阶段不是DATA_STREAMING,或者在需要sleep(doSleep)或者上次发包距离本次时间未超过阈值的情况下为DATA_STREAMING

            意思是各种标记为正常,数据流处于正常发送的过程或者可控的非正常发送过程中,可控表现在状态位doSleep,即上传错误检查中认为理论上可以进行修复,但是需要sleep已完成recovery的初始化,或者距离上次发送未超过时间的阈值等。

      4、如果数据流关闭、存在错误、客户端正常运行标志位异常时,执行continue:这个应该是对容错等的处理,让程序及时响应错误;

      5、获取将要发送的数据包:

            如果数据发送队列为空,构造一个心跳包;否则,取出队列中第一个元素,即待发送数据包。

      6、如果当前阶段是PIPELINE_SETUP_CREATE,申请数据块,设置pipeline,初始化数据流:append的setup阶段则是通过setupPipelineForAppendOrRecovery()方法完成的,并同样会初始化数据流;

      7、获取数据块中的上次数据位置lastByteOffsetInBlock,如果超过数据块大小,报错;

      8、 如果是数据块的最后一个包:等待所有的数据包被确认,即等待datanodes的确认包acks,如果数据流关闭,或者数据节点IO存在错误,或者客户端不再正常运行,continue,设置阶段为pipeline关闭

      9、发送数据包:将数据包从dataQueue队列挪至ackQueue队列,通知dataQueue的所有等待者,将数据写入远端的DataNode节点,并flush,如果发生异常,尝试标记主要的数据节点错误,方便容错处理;

      10、更新已发送数据大小:可以看出,数据包中存储了其在数据块中的位置LastByteOffsetBlock,也就标记了已经发送数据的总大小;

      11、数据块写满了吗?如果是最后一个数据块,等待确认包,调用endBlock()方法结束一个数据块 ;

      如果上述流程发生错误,hasError标志位设置为true,并且如果不是一个DataNode引起的原因,流关闭标志设置为true。

      最后,没有数据需要发送,或者发生致命错误的情况下,调用closeInternal()方法关闭内部资源。


      未完待续,请关注《Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(2)》。









三、代码分析