Hive0.7.1中CompactIndex的实现---一个Hive的MapredTask如何执行

时间:2025-05-15 07:51:03

我们的关键在plan的建立上!!!

    int ret = compile(command);//compile command---key point of plan generating :生成plan。

一个简单的query:

hive> select * from table02 where id=500000;

基本流程是这样的:

(line);

(command);    run完,打印结果,打印TimeTaken

(Driver) (cmd)

(command);

();

Driver.launchTask(tsk,queryId,noName,running,jobname,jobs,cxt);

();  //new TaskRunner(tsk,tskRes);

();  //@6f513

(DriverContext);

(); //override

      executor = ().exec(cmdLine, env, new File(workDir));
      //("After");
      StreamPrinter outPrinter = new StreamPrinter(
          (), null,
          ().getChildOutStream());
      StreamPrinter errPrinter = new StreamPrinter(
          (), null,
          ().getChildErrStream());

      ();//start to print!!!
      ();
      
      int exitVal = (); //Wait for ending of executing... sequentiallllllllll

就这样,一个query来到了新的进程ececutor,它的cmdLine如下:

/home/allen/Hadoop/hadoop-0.20.2/bin/hadoop jar /home/allen/Desktop/hive-0.7.1/lib/hive-exec-0.7.   -plan file:/tmp/allen/hive_2012-03-05_16-15-28_863_4469375855705861948/-local-10002/  -jobconf =100000 -jobconf =DBCP -jobconf =false -jobconf =1 -jobconf =false -jobconf =allen_20120305161515_3e3d24cc-7d40-41c7-b00b-bb35ba4ddd3c -jobconf =false -jobconf =9999 -jobconf =0.3 -jobconf =true -jobconf =false -jobconf =true -jobconf =0.5 -jobconf =LOG -jobconf =1000000000 -jobconf =__HIVE_DEFAULT_PARTITION__ -jobconf =true -jobconf =strict -jobconf =20 -jobconf .=SOFT -jobconf =100000 -jobconf =. -jobconf =false -jobconf =true -jobconf =100 -jobconf =false -jobconf =true -jobconf =true -jobconf =false -jobconf =root -jobconf =100000 -jobconf =true -jobconf =select+*+from+table02+where+id%3D500000%28Stage-1%29 -jobconf =true -jobconf =0 -jobconf .level2=false -jobconf =allen_201203051549 -jobconf =60 -jobconf =false -jobconf =HIVE_SCRIPT_OPERATOR_ID -jobconf =false -jobconf =100000 -jobconf =false -jobconf =lib%2Fhive-hwi-0.7. -jobconf =0 -jobconf =false -jobconf =true -jobconf =false -jobconf =test_ -jobconf =true -jobconf = -jobconf =jdbc%3Aderby%3A%3BdatabaseName%3DTempStatsStore%3Bcreate%3Dtrue -jobconf =100 -jobconf =false -jobconf =30 -jobconf =true -jobconf =-1 -jobconf =select+*+from+table02+where+id%3D500000 -jobconf = -jobconf =false -jobconf =0.55 -jobconf =1000 -jobconf = -jobconf =false -jobconf =10000 -jobconf =true -jobconf =100000 -jobconf =false -jobconf =false -jobconf =false -jobconf =100000 -jobconf =0.90 -jobconf =false -jobconf =false -jobconf =1000 -jobconf =TextFile -jobconf =1000 -jobconf = -jobconf =32 -jobconf =1 -jobconf =true -jobconf =false -jobconf =false -jobconf =hive_zookeeper_namespace -jobconf =false -jobconf =jdbc%3Amysql%3A%2F%2Flocalhost%3A3306%2Fhive01%3FcreateDatabaseIfNotExist%3Dtrue -jobconf =false -jobconf = -jobconf =0.5 -jobconf =0.0.0.0 -jobconf =read-committed -jobconf =Table%2CStorageDescriptor%2CSerDeInfo%2CPartition%2CDatabase%2CType%2CFieldSchema%2COrder -jobconf =256000000 -jobconf =16000000 -jobconf =true -jobconf =false -jobconf =true -jobconf =33554432 -jobconf =25000000 -jobconf =nonstrict -jobconf =true -jobconf =100000 -jobconf =false -jobconf =hive-metastore%2F_HOST% -jobconf =false -jobconf =true -jobconf =false -jobconf =false -jobconf =datanucleus -jobconf =false -jobconf =100 -jobconf =true -jobconf =999 -jobconf = -jobconf =1000 -jobconf =true -jobconf =0.9 -jobconf =checked -jobconf =25000 -jobconf =%2Fuser%2Fhive%2Fwarehouse -jobconf =password -jobconf =5 -jobconf = -jobconf =false -jobconf =25000 -jobconf =8 -jobconf =0.75 -jobconf =rdbms -jobconf = -jobconf =2181 -jobconf =%2Ftmp%2Fhive-%24%%7D -jobconf =jdbc%3Aderby -jobconf = -jobconf =200 -jobconf =. -jobconf = -jobconf =true -jobconf = -jobconf =100000 -jobconf =1000 -jobconf = -jobconf =%2Ftmp%2Fhadoop-allen%2Fmapred%2Fsystem%2F712166882 -jobconf =%2Ftmp%2Fhadoop-allen%2Fmapred%2Flocal%2F299118477

hive> Set =/tmp/table02_index_data;
Set =/tmp/table02_index_data;
hive> Set =false;
Set =false;
hive> Set =;
Set =;
hive> select * from table02 where id =500000;
select * from table02 where id =500000;

加了索引之后,cmdLine如下:

/home/allen/Hadoop/hadoop-0.20.2/bin/hadoop jar /home/allen/Desktop/hive-0.7.1/lib/hive-exec-0.7.   -plan file:/tmp/allen/hive_2012-03-06_10-52-57_695_2032164111332457666/-local-10002/  -jobconf =100000 -jobconf =DBCP -jobconf =false -jobconf =1 -jobconf =false -jobconf =allen_20120306105353_edd17c2f-008e-489a-a354-b0d65da5d99c -jobconf =false -jobconf =9999 -jobconf =0.3 -jobconf =true -jobconf =false -jobconf =true -jobconf =0.5 -jobconf =LOG -jobconf =1000000000 -jobconf =__HIVE_DEFAULT_PARTITION__ -jobconf =true -jobconf =strict -jobconf =20 -jobconf .=SOFT -jobconf =100000 -jobconf =. -jobconf =false -jobconf =true -jobconf =100 -jobconf =false -jobconf =true -jobconf =true -jobconf =false -jobconf =root -jobconf =100000 -jobconf =true -jobconf =select+*+from+table02+where+id+%3D500000%28Stage-1%29 -jobconf =true -jobconf =0 -jobconf .level2=false -jobconf =allen_201203061051 -jobconf =60 -jobconf =false -jobconf =HIVE_SCRIPT_OPERATOR_ID -jobconf =false -jobconf =100000 -jobconf =false -jobconf =lib%2Fhive-hwi-0.7. -jobconf =0 -jobconf =false -jobconf =true -jobconf =false -jobconf =test_ -jobconf =true -jobconf = -jobconf =jdbc%3Aderby%3A%3BdatabaseName%3DTempStatsStore%3Bcreate%3Dtrue -jobconf =100 -jobconf =false -jobconf =30 -jobconf =true -jobconf =-1 -jobconf =select+*+from+table02+where+id+%3D500000 -jobconf = -jobconf =false -jobconf =%2Ftmp%2Ftable02_index_data -jobconf =0.55 -jobconf =1000 -jobconf = -jobconf =false -jobconf =10000 -jobconf =true -jobconf =100000 -jobconf =false -jobconf =false -jobconf =false -jobconf =100000 -jobconf =0.90 -jobconf =false -jobconf =false -jobconf =1000 -jobconf =TextFile -jobconf =1000 -jobconf = -jobconf =32 -jobconf =1 -jobconf =true -jobconf =false -jobconf =false -jobconf =hive_zookeeper_namespace -jobconf =false -jobconf =jdbc%3Amysql%3A%2F%2Flocalhost%3A3306%2Fhive01%3FcreateDatabaseIfNotExist%3Dtrue -jobconf =false -jobconf = -jobconf =0.5 -jobconf =0.0.0.0 -jobconf =read-committed -jobconf =Table%2CStorageDescriptor%2CSerDeInfo%2CPartition%2CDatabase%2CType%2CFieldSchema%2COrder -jobconf =256000000 -jobconf =16000000 -jobconf =true -jobconf =false -jobconf =true -jobconf =33554432 -jobconf =25000000 -jobconf =nonstrict -jobconf =true -jobconf =100000 -jobconf =false -jobconf =hive-metastore%2F_HOST% -jobconf =false -jobconf =true -jobconf =false -jobconf =false -jobconf =datanucleus -jobconf =false -jobconf =100 -jobconf =true -jobconf =999 -jobconf = -jobconf =false -jobconf =1000 -jobconf =true -jobconf =0.9 -jobconf =checked -jobconf =25000 -jobconf =%2Fuser%2Fhive%2Fwarehouse -jobconf =password -jobconf =5 -jobconf = -jobconf =false -jobconf =25000 -jobconf =8 -jobconf =0.75 -jobconf =rdbms -jobconf = -jobconf =2181 -jobconf =%2Ftmp%2Fhive-%24%%7D -jobconf =jdbc%3Aderby -jobconf = -jobconf =200 -jobconf =. -jobconf = -jobconf =true -jobconf = -jobconf =100000 -jobconf =1000 -jobconf = -jobconf =%2Ftmp%2Fhadoop-allen%2Fmapred%2Fsystem%2F1181744880 -jobconf =%2Ftmp%2Fhadoop-allen%2Fmapred%2Flocal%2F76345676

基本格式如下:

Hadoop jar hive-exec-0.7.

-plan  .../

-jobConf Key= Value ....

调用hadoop执行

遇到一个问题,因为我在eclipse里跑的是Java程序,不是提交到集群去debug,所以要加上选项-jobconf =hdfs://localhost:9000 ,这样才会去HDFS中读取数据

所以我们需要仔细的了解一下ExecDriver

找到main函数,

一系列的解析参数打印出来之后看到:

MapredWork plan = (pathData, conf);
ExecDriver ed = new ExecDriver(plan, conf, isSilent);
ret = (new DriverContext());

来到(ctx);  //Execute a query plan using Hadoop.

//We set it to  be 
    String inpFormat = (job, );
    
    if ((inpFormat == null) || (!(inpFormat))) {
      inpFormat = ().getInputFormatClassName();
    }
这里得到inputFormat为HiveCompactIndexInputFormat.,将它设为job的inputFormat

//12/03/06 13:36:26 INFO : Processing alias table02
      //12/03/06 13:36:26 INFO : Adding input file hdfs://localhost:9000/user/hive/warehouse/table02
      addInputPaths(job, work, emptyScratchDirStr);//put input path into jobconf from work.


JobClient jc = new JobClient(job);


      rj = (job);

submitJob调用JobClient的submitJobInternal

    // Create the splits for the job
    ("Creating splits at " + (submitSplitFile));
    int maps;
    if (()) {
      maps = writeNewSplits(context, submitSplitFile);
    } else {
      maps = writeOldSplits(job, submitSplitFile);
    }
此处writeOldSplits用来得到maps也就是分片数目,追踪进去

  private int writeOldSplits(JobConf job, 
                             Path submitSplitFile) throws IOException {
    InputSplit[] splits = 
      ().getSplits(job, ());
这里让HiveCompactIndexInputFormat来提供分片

此方法会读取索引文件/tmp/table02_index来得到真正的inputPath,返回实际的分片(table02中的一部分包含被索引id值的数据)期间还会调用TextInputFormat来切割数据。过滤掉未contain索引值的分片,返回。

然后在writeOldSplits中会将分片信息写到hdfs文件中

不过从writeOldSplits返回后,将含有分片信息的文件路径写到job的config中:DataOutputStream out = writeSplitsFileHeader(job, submitSplitFile, );然后还会再一次真真的submitJob

    //
    // Now, actually submit the job (using the submit name)
    //
    JobStatus status = (jobId);
    if (status != null) {
      return new NetworkedJob(status);

所以之前的获得分片的操作会再一次执行一次。

返回到ExecDriver,ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);//start to execute

success = progress(th);等待结束。

这样差不多一个job就算提交并运行了。


总的来说,就是通过=

=%2Ftmp%2Ftable02_index_data

将获取文件分片的操作委托给HiveCompactInputFormat,让其分析自己的中的索引信息,得到实际需要读取的HDFS文件分片,避免全部扫描!
而这些,是通过手动设置下面的参数达到的:
hive> Set =/tmp/table02_index_data;  
hive> Set =false;  
hive> Set =;