hadoop2.2编程:DFS API 操作

时间:2023-03-08 22:58:36
hadoop2.2编程:DFS API 操作

1. Reading data from a hadoop URL

说明:想要让java从hadoop的dfs里读取数据,则java 必须能够识别hadoop hdfs URL schema, 因此我们应该将hdfs的FsUrlStreamHandlerFactory作为一个实例提供给java, java的setURLStreamHandlerFactory方法可以实现此功能;

注意:此方法有缺陷,由于在java里,setURLStreamHandlerFactorymethod在每一个JVM里只能调用一次,加入第三方component已经set a URLStreamHandlerFactory,则hadoop用户就不能使用setURLStreamHandlerFactory方法来reading data from hadoop。

简要提示:

 1.[java.net.URL]
    methods:
        InputStream openStream()
        static void setURLStreamHandlerFactory(URLStreamHandlerFactory fac)
                                                                                                                                              2.[org.apache2.hadoop.fs.FsUrlStreamHandlerFactory]
    method:
        public class FsUrlStreamHandlerFactory
        extends Object
        implements URLStreamHandlerFactory
3.[org.apache.hadoop.io.IOUtils]
    method:
    static void copyBytes(InputStream in, OutputStream out, long length, int bufferSize, boolean close)

代码:

 import java.io.InputStream;
 import java.net.URL;

 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
 import org.apache.hadoop.io.IOUtils;

 public class URLCat {
   static {
     URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
   }

   public static void main(String[] args) throws Exception {
     InputStream in = null;
     try {
       in = new URL(args[0]).openStream();
       IOUtils.copyBytes(in, System.out, 4096, false);
     } finally {
       IOUtils.closeStream(in);
     }
   }
 }

执行步骤:

$source $YARN_HOME/libexec/hadoop-config.sh
$mkdir myclass
$javac -cp $CLASSPATH URLCat.java -d myclass
$jar -cvf urlcat.jar -C myclass ./
# assume we have a file bar.txt in hdfs: /user/grid/bar.txt
# then we need run yarn with this command
$yarn jar -cp urlcat.jar URLCat hdfs:///user/grid/bar.txt

2. Reading data using HDFS API

说明:使用hadoop的FileSystem API可以避免上面所述的JVM只能调用一次setURLStreamHandlerFactory的缺陷;

简要提示:

 (1)org.apache.hadoop.conf.Configured
    |__ org.apache.hadoop.fs.FileSystem
            public abstract class FileSystem
            extends Configured
            implements Closeable
            [method]:
            static FileSystem get(URI uri, Configuration conf)
            FSDataInputStream open(Path f)
(2)java.io.InputStream
    |__ java.io.FilterInputStream
          |__ java.io.DataInputStream
               |__ org.apache.hadoop.fs.FSDataInputStream
                        public class FSDataInputStream
                        extends DataInputStream
                        implements Seekable, PositionedReadable, Closeable
                        [methods]:
                         void seek(long desired)
                         long getPos()
(3)org.apache.hadoop.fs
    public class Path
    extends Object
    implements Comparable
    [methods]:
    Path(String pathString)

(4)java.net.URI
    public final class URI
    extends Object
    implements Comparable<URI>, Serializable
    [methods]:
    static URI create(String str)

代码:

 import java.net.URI;
 import java.io.InputStream;

 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;

 public class URICat {
   public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
   }
 }

执行步骤:

$source $YARN_HOME/libexec/hadoop-config.sh
$mkdir myclass
$javac -cp $CLASSPATH URICat.java -d myclass
$jar -cvf uricat.jar -C myclass ./
$yarn jar -cp uricat.jar URICat /user/grid/bar.txt

备注1:因为我们调用了FileSystem的API,故输入的filepath也可以省略HDFS的URI全名hdfs://,如上面执行步骤里所写。

备注2:FileSystem是抽象类,故不能new FileSystem()来得到instance, 而需要调用其的静态方法get()来得到;

备注3:注意java里的向上转型,体现在简要提示里各种Stream的继承关系上;

备注4:Configuration conf = new Configuration();

  • Configurations需要xml文件里的键值对<name>x</name>来配置,规则为:

    if x is named by a String, 则在classpath里检查同名文件;

    if x is named by a Path, 则直接本地查找,不检查classpath;

  • 若用户不指定,则默认调用两个resources: core-site.xml和core-default.xml

  • 用户可以指定xml文件以添加自己定义的configurations:

    conf.addResource("my_configuration.xml");

3. Writing data

3.1 从本地复制文件到hdfs

  • 版本1 FileCopy with copyBytes() method

简要提示:

  • 核心代码就一行,即从InputStrea 以二进制方式复制到OutputStream:

static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
  • 我们新建一个FileInputStream(localsrc)实例, 将其暂存在BufferedInputStream()里,并向上转型生成InputStream:

FileInputStream(String name )
  • 调用FileSystem来产生OutputStream:

FSDataOutputStream create(Path f, Progressable progress)

代码:

    import java.net.URI;
 import java.io.InputStream;
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.OutputStream;

 import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;

 public class FileCopyWithProgress {
   public static void main(String[] args) throws Exception {
     String localsrc = args[0];
     String dst = args[1];
     InputStream in = new BufferedInputStream(new FileInputStream(localsrc));
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(URI.create(dst), conf);
     OutputStream out = fs.create(new Path(dst), new Progressable() {
         public void progress() { System.out.print(".");} }
                                 );
     IOUtils.copyBytes(in, out, 4096, true);
   }
 }

执行步骤:

$. $YARN_HOME/libexec/hadoop-config.sh
$javac -cp $CLASSPATH -d my_class FileCopyWithProgress.java
$jar -cvf filecopywithprogress.jar -C my_class/ .
# assum we have a local file foo.out in directory: /home/grid/foo.out, then we should run yarn like below
$yarn jar filecopywithprogress.jar FileCopyWithProgress /home/grid/foo.out hdfs:///user/grid/copied_foo.out
# we can do a check for the copied file
$hadoop fs -ls -R /user/grid/

注:从下面开始使用另一种方式来编译、运行代码

  • 版本2 使用FileSystem的copyFromLocalFile()方法

代码如下:

 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;

 public class FileCopyFromLocal {
   public static void main(String[] args) throws Exception {
     String localSrc = args[0];
     String dst = args[1];
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
     fs.copyFromLocalFile(new Path(localSrc),new Path(dst));
   }
 }

执行步骤:

$source $YARN_HOME/libexec/hadoop-config.sh
$javac FileCopyFromLocal.java -d class/
$jar -cvf filecopyfromlocal.jar -C class ./
$export HADOOP_CLASSPATH=$CLASSPATH:filecopyfromlocal.jar
# suppose we have a file bar.txt in local disk, then we use the following command line to copy it to hdfs
$yarn FileCopyFromLocal bar.txt hdfs:///user/grid/kissyou
# we can check the copied file on hdfs
$hadoop fs -ls /user/grid/
 grid supergroup         -- : /user/grid/kissyou

3.2 新建文件夹/文件

  • 新建文件夹 FileSystem.mkdirs()

代码如下:

 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;

 public class CreateDir {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String dst = args[0];
     FileSystem fs = FileSystem.get(conf);
     fs.mkdirs(new Path(dst));
   }
 }

执行步骤:

$source $YARN_HOME/libexec/hadoop-config.sh
$javac CreatDir.java -d class/
$jar -cvf createdir.jar -C class ./
$export HADOOP_CLASSPATH=$CLASSPATH:createdir.jar
$yarn CreateDir hdfs:///user/grid/kissyou
# we can check the created directory on hdfs
$hadoop fs -ls /user/grid/
 grid supergroup         -- : /user/grid/kissyou
  • 新建文件 FileSystem.create()

代码如下:

 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;

 public class CreateFile {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     String dst = args[0];
     FileSystem fs = FileSystem.get(conf);
     fs.create(new Path(dst));
   }
 }

执行步骤:

$source $YARN_HOME/libexec/hadoop-config.sh
$javac CreatFile.java -d class/
$jar -cvf createfile.jar -C class ./
$export HADOOP_CLASSPATH=$CLASSPATH:createfile.jar
$yarn CreatFile hdfs:///user/grid/kissyou.txt
# we can check the created file on hdfs
$hadoop fs -ls /user/grid/
 grid supergroup         -- : /user/grid/kissyou.txt

注意三点

1. 同一路径下不可以新建同名的文件foo和目录foo/, 否则运行时会抛出异常:

    fs.FileAlreadyExistsException

2. 我们进行copy复制、写文件操作时mkdirs()方法会被自动调用,故一般不会调用mkdirs()来手动创建目录;

3. 官方API文档里对mkdirs()的描述是:"Make the given file and all non-existent parents into directories", 所以在hadoop里创建文件的方法是recursive(递归的),相当于linux里的:

    $mkdir -p foo/bar/qzx

同样等价于hdfs-shell里的命令:

    %$YARN_HOME/bin/hadoop fs -mkdir -p hdfs:///foo/bar/qzx

4.Testing file and Getting fileStatus

提示: hadoop2.2中一些API已经deprecated, 现只列出本例中用到的已经deprecated的method,并给出最新的method.

    deprecated APIs:
(1)java.lang.Object
    |__ org.apache.hadoop.fs.FileStatu
       //deprecated method:
        boolean isDir() //Deprecated. Use isFile(),
                        //isDirectory(), and isSymlink() instead.
(2)java.lang.Object
    |__org.apache.hadoop.conf.Configured
        |__org.apache.hadoop.fs.FileSystem
            //deprecated methods:
            boolean isDirectory(Path f)    //Deprecated. Use
                                           //getFileStatus() instead
            short getReplication(Path src) //Deprecated. Use
                                           //getFileStatus() instead
            long getLength(Path f)         //Deprecated. Use
                                           //getFileStatus()instead

代码:

 import java.net.URI;

 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;

 public class TestFileStatus {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
     FileStatus stat = fs.getFileStatus(new Path(args[0]));
     if (stat.isDirectory()) {
       System.out.println(stat.getPath().toUri().getPath() + " is a directory.");
     } else if (stat.isFile()) {
       System.out.println(stat.getPath().toUri().getPath() + " is a file.");
       System.out.println(stat.getPath().toUri().getPath() + " getBlockSize: " + stat.getBlockSize());
       System.out.println(stat.getPath().toUri().getPath() + " getLen(): " + stat.getLen());
       System.out.println(stat.getPath().toUri().getPath() + " getOwner(): " + stat.getOwner());
       System.out.println(stat.getPath().toUri().getPath() + " getGroup(): " + stat.getGroup());
       System.out.println(stat.getPath().toUri().getPath() + " getAccessTime(): " + stat.getAccessTime());
       System.out.println(stat.getPath().toUri().getPath() + " getModificationTime(): " + stat.getModificationTime());
       System.out.println(stat.getPath().toUri().getPath() + " getPermission(): " + stat.getPermission());
       System.out.println(stat.getPath().toUri().getPath() + " hashcode(): " + stat.hashCode());
       System.out.println(stat.getPath().toUri().getPath() + " getPath(): " + stat.getPath());
     }
   }
 }

先给一个福利^_^, 利用下面我写的小脚本可以方便地编译并生成jar文件:

 #!/usr/bin/env sh
 CWD=$(pwd)
 export CLASSPATH=''
 . $YARN_HOME/libexec/hadoop-config.sh

 if [ -d class ]; then
   rm -rf class/*
 else
   mkdir $CWD/class
 fi

 for f in $@
   do
     srcs="$srcs $CWD/$f"
   done

 javac $srcs -d class

 if [ $? -ne 0 ] ;then
   echo Error found when compiling the code!
   exit 1
 fi

 class=$( cat $1 |grep 'package'|sed -e "s/\(package\s\)\|\(;\)//g" ).$(echo $1 | sed -r 's/(.*).java/echo \1/ge')
 jarfile=$(echo $1 | sed -r 's/(.*)\.java/echo \L\1\.jar/ge')

 jar -cvf $CWD/$jarfile -C $CWD/class . > /dev/null 2>&1
 #echo jar -cvf $jarfile -C class .
 echo -----------------CMD Lines-----------------------
 echo source $YARN_HOME/libexec/hadoop-config.sh >sourceIt.sh
 echo export HADOOP_CLASSPATH=$jarfile:'$CLASSPATH'>>sourceIt.sh
 echo source  $CWD/sourceIt.sh
 echo yarn $class  [command args]...

执行步骤:

注意,为了简化起见,脚本定义:

$./compack.sh args1 args2 args3...中args1为main class
$ compack.sh
$./compack.sh TestFileStatus.java
#then the script will reminder you with the following message:
-----------------CMD Lines------------------
source /home/grid/hadoop--src/hadoop-dist/target/hadoop-/task/DFSAPIProgramming/sourceIt.sh
yarn TestFileStatus  [command args]...
$source sourceIt.sh
# suppose we have a file "part-m-00000" in hdfs,run yarn like below
$yarn TestFileStatus /user/hive/warehouse/footbl/part-m-

Output:

#output
/user/hive/warehouse/footbl/part-m- is a file.
/user/hive/warehouse/footbl/part-m- getBlockSize:
/user/hive/warehouse/footbl/part-m- getLen():
/user/hive/warehouse/footbl/part-m- getOwner(): grid
/user/hive/warehouse/footbl/part-m- getGroup(): supergroup
/user/hive/warehouse/footbl/part-m- getAccessTime():
/user/hive/warehouse/footbl/part-m- getModificationTime():
/user/hive/warehouse/footbl/part-m- getPermission(): rw-r--r--
/user/hive/warehouse/footbl/part-m- hashcode():
/user/hive/warehouse/footbl/part-m- getPath(): hdfs://cluster1:9000/user/hive/warehouse/footbl/part-m-00000

5. Listing files & glob files

  • Listing files

代码:

 import java.net.URI;

 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;

 public class ListFiles {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);

     Path[] paths = new Path[args.length];
     for(int i = 0; i < args.length; i++) {
       paths[i] = new Path(args[i]);
     }

     FileStatus[] status = fs.listStatus(paths);
     Path[] pathList = FileUtil.stat2Paths(status);
     for(Path p : pathList) {
       System.out.println(p);
     }
   }
 }

执行步骤:

$./compack.sh ListFiles.java
$source sourceIt.s
$yarn ListFiles /user/hive/warehouse/footbl /user/grid/

output:

hdfs://cluster1:9000/user/hive/warehouse/footbl/_SUCCESS
hdfs://cluster1:9000/user/hive/warehouse/footbl/part-m-00000
hdfs://cluster1:9000/user/grid/kiss
hdfs://cluster1:9000/user/grid/kissyou
hdfs://cluster1:9000/user/grid/missyou
  • Filter files

提示:

  1. java.lang.Object
  |__ org.apache.hadoop.conf.Configured
       |__ org.apache.hadoop.fs.FileSystem
            public abstract class FileSystem
            extends Configured
            implements Closeable
            //method:
            FileStatus[] globStatus(Path pathPattern, PathFilter filter)
   2. org.apache.hadoop.fs
    public interface PathFilter
    //method:
    boolean accept(Path path)

代码:

 package org.apache.hadoop.MyCode;

 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.Path;

 public class MyFilter implements PathFilter {
   private final String regex;
   public MyFilter(String regex) {
     this.regex = regex;
   }
   public boolean accept(Path path) {
     return path.toString().matches(regex);
   }
 }
 package org.apache.hadoop.MyCode;

 import org.apache.hadoop.MyCode.MyFilter;

 import java.net.URI;

 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.conf.Configuration;

 public class ListStatusWithPattern {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);

     FileStatus[] status = fs.globStatus(new Path(args[0]), new MyFilter(args[1]));
     Path[] pathList = FileUtil.stat2Paths(status);

     for( Path p : pathList ) {
       System.out.println(p);
     }
   }
 }

执行步骤:

$source $YARN_HOME/libexec/hadoop-config.sh
$mkdir class
$javac ListStatusWithPattern.java  MyFilter.java -d class
$jar -cvf liststatuswithpattern.jar -C class ./
$export HADOOP_CLASSPATH=liststatuswithpattern.jar:$CLASSPATH
#suppose we have four files in hdfs like below
$hadoop fs -ls /user/grid/
Found  items
drwxr-xr-x   - grid supergroup           -- : /user/grid/kiss
-rw-r--r--    grid supergroup           -- : /user/grid/kissyou
drwxr-xr-x   - grid supergroup           -- : /user/grid/miss
-rw-r--r--    grid supergroup         -- : /user/grid/missyou
# then we can run the command to filter the matched file
$yarn jar liststatuswithpattern.jar org.apache.hadoop.MyCode.ListStatusWithPattern "hdfs:///user/grid/*ss*" "^.*grid/[k].*$

或者可以使用前面给出的脚本编译、打包并生成主要的执行yarn的代码:

$./compack.sh ListStatusWithPattern.java MyFilter.java #注意,脚本默认输入的第一个源文件为main class所在文件
$source source /home/grid/hadoop--src/hadoop-dist/target/hadoop-/task/DFSAPIProgramming/sourceIt.sh
-----------------CMD Lines-----------------------
source /home/grid/hadoop--src/hadoop-dist/target/hadoop-/task/DFSAPIProgramming/sourceIt.sh
yarn org.apache.hadoop.MyCode.MyFilter [command args]...
$yarn org.apache.hadoop.MyCode.ListStatusWithPattern "hdfs:///user/grid/*ss*" "^.*grid/[k].*$"
output:

hdfs://cluster1:9000/user/grid/kiss
hdfs://cluster1:9000/user/grid/kissyou

(完)