Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

时间:2022-10-30 19:55:30

Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

1 环境准备

1.1 示例代码

import org.apache.spark.sql.SparkSession

object SparkSqlHive {
  def main(args: Array[String]): Unit = {

    val ss = SparkSession.builder().master("local[2]").appName("the test of SparkSession")
      .config("spark.sql.hive.convertMetastoreOrc", "false")  // 默认true使用FileSourceScanExec算子读取,故设置为false
      .config("spark.hadoop.mapreduce.input.fileinputformat.split.maxsize","67108864") 
      .enableHiveSupport()
      .getOrCreate()
    // 临时表存在则先删除
    ss.sql("DROP TABLE IF EXISTS temp.temp_ods_start_log");
    // 读取orc表ods_start_log 数据 存到临时表中
    val df = ss.sql("CREATE TABLE IF NOT EXISTS temp.temp_ods_start_log as select substr(str,1,10) as str10 from biods.ods_start_log where dt='20210721'")
    // action算子,触发job启动
    df.count()
    // 线程休眠一段时间,便于spark ui上观察分析
    Thread.sleep(1000000)
    ss.stop()
  }
}

1.2 hive orc表准备

由于orc表无法加载txt数据,故先把数据加载txt表,再写入orc表。

-- 创建数据库
create database biods;

-- 创建orc外部表
create external table biods.ods_start_log
(
`str` string
)
comment '用户启动日志信息'
partitioned by (`dt` string)
stored as orc
location '/bi/ods/ods_start_log';

-- 创建txt外部表
create external table biods.txt_ods_start_log
(
`str` string
)
comment '用户启动日志信息'
partitioned by (`dt` string)
stored as textfile
location '/bi/ods/txt_ods_start_log';

-- 添加分区
alter table biods.ods_start_log add partition(dt='20210721');
alter table biods.txt_ods_start_log add partition(dt='20210721');

-- 加载数据
load data local inpath '/home/cwc/data/start0721.log' overwrite into table biods.txt_ods_start_log partition(dt='20210721');

-- 写入orc表
insert overwrite table biods.ods_start_log
partition(dt='20210721')
select str 
from biods.txt_ods_start_log
where dt='20210721';

--hdfs 合并多个文件为1个文件(如有需要)
alter table biods.ods_start_log partition(dt='20210721') concatenate;

最终构造分区文件如下:

[root@hadoop3 ~]# hdfs dfs -ls -R -h hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721
drwxr-xr-x   - root supergroup          0 2022-10-22 12:29 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/.hive-staging_hive_2022-10-22_12-29-16_934_837446341335257460-1
drwxr-xr-x   - root supergroup          0 2022-10-22 12:29 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/.hive-staging_hive_2022-10-22_12-29-16_934_837446341335257460-1/-ext-10001
drwxr-xr-x   - root supergroup          0 2022-10-22 12:29 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/.hive-staging_hive_2022-10-22_12-29-16_934_837446341335257460-1/_tmp.-ext-10002
-rwxr-xr-x   3 root supergroup    246.0 M 2022-10-25 12:18 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1
-rwxr-xr-x   3 root supergroup     94.1 M 2022-10-25 12:18 hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0

文件的块大小如下,为256M

[root@hadoop3 ~]# hadoop fs -stat "%o %r" hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0
268435456 3
[root@hadoop3 ~]# hadoop fs -stat "%o %r" hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1
268435456 3

orc文件的stripe个数如下:
94m的文件有2个stripe, 246m的文件有11个stripe

[root@hadoop3 ~]# hive --orcfiledump hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0 | less

Processing data file hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0 [length: 98673168]
Structure for hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000001_0
File Version: 0.12 with ORC_135
Rows: 3043150
Compression: ZLIB
Compression size: 262144
Type: struct<str:string>

Stripe Statistics:
  Stripe 1:
    Column 0: count: 2020000 hasNull: false
    Column 1: count: 2020000 hasNull: false min: 2021-09-16 16:26:46.194 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595347200000},"attr":{"area":"三亚","uid":"2F10092A1723314","app_v":"1.1.2","event_type":"common","device_id":"1FB872-9A1001723314","os_type":"0.8.8","channel":"WS","language":"chinese","brand":"iphone-1"}} max: 2021-09-16 16:28:48.696 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"1","error_code":"0"},"time":1596124800000},"attr":{"area":"铜川","uid":"2F10092A10999969","app_v":"1.1.7","event_type":"common","device_id":"1FB872-9A10010999969","os_type":"2.0","channel":"BL","language":"chinese","brand":"iphone-5"}} sum: 748218740
  Stripe 2:
    Column 0: count: 1023150 hasNull: false
    Column 1: count: 1023150 hasNull: false min: 2021-09-16 16:26:32.958 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"乳山","uid":"2F10092A727865","app_v":"1.1.19","event_type":"common","device_id":"1FB872-9A100727865","os_type":"9.7.7","channel":"HA","language":"chinese","brand":"Huawei-3"}} max: 2021-09-16 16:28:35.297 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"1","error_code":"0"},"time":1596038400000},"attr":{"area":"深圳","uid":"2F10092A9999986","app_v":"1.1.2","event_type":"common","device_id":"1FB872-9A1009999986","os_type":"0.4.6","channel":"LD","language":"chinese","brand":"Huawei-6"}} sum: 378150821

File Statistics:
  Column 0: count: 3043150 hasNull: false
  Column 1: count: 3043150 hasNull: false min: 2021-09-16 16:26:32.958 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"1","action":"0","error_code":"0"},"time":1595260800000},"attr":{"area":"乳山","uid":"2F10092A727865","app_v":"1.1.19","event_type":"common","device_id":"1FB872-9A100727865","os_type":"9.7.7","channel":"HA","language":"chinese","brand":"Huawei-3"}} max: 2021-09-16 16:28:48.696 [main] INFO  com.lagou.ecommerce.AppStart - {"app_active":{"name":"app_active","json":{"entry":"2","action":"1","error_code":"0"},"time":1596124800000},"attr":{"area":"铜川","uid":"2F10092A10999969","app_v":"1.1.7","event_type":"common","device_id":"1FB872-9A10010999969","os_type":"2.0","channel":"BL","language":"chinese","brand":"iphone-5"}} sum: 1126369561

Stripes:
  Stripe: offset: 3 data: 65480370 rows: 2020000 tail: 46 index: 21746
    Stream: column 0 section ROW_INDEX start: 3 length 34
    Stream: column 1 section ROW_INDEX start: 37 length 21712
    Stream: column 1 section DATA start: 21749 length 64216148
    Stream: column 1 section LENGTH start: 64237897 length 1264222
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2
  Stripe: offset: 65502165 data: 33158620 rows: 1023150 tail: 46 index: 11309
    Stream: column 0 section ROW_INDEX start: 65502165 length 29
    Stream: column 1 section ROW_INDEX start: 65502194 length 11280
    Stream: column 1 section DATA start: 65513474 length 32518042
    Stream: column 1 section LENGTH start: 98031516 length 640578
    Encoding column 0: DIRECT
    Encoding column 1: DIRECT_V2

File length: 98673168 bytes
Padding length: 0 bytes
Padding ratio: 0%

Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

2 HiveTableScanExec读取orc表分析

根据driver端日志分析,最早是在 OrcInputFormat 中生成split, 最终调度task读取orc表 生成 HadoopRDD.cala

22/10/27 23:46:59 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on LAPTOP-NDJFRBI4:57758 (size: 24.3 KiB, free: 4.0 GiB)
22/10/27 23:46:59 INFO SparkContext: Created broadcast 0 from 
---------这里开始在OrcInputFormat 计算生成split, 一个split对应一个task, 所以是在driver或者master执行,和大数据移动计算,而不是移动数据思想呼应。
22/10/27 23:46:59 INFO OrcInputFormat: ORC pushdown predicate: null
22/10/27 23:46:59 INFO deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
22/10/27 23:46:59 INFO OrcInputFormat: Using column configuration variables columns [str] / columns.types [string] (isAcidRead false)
22/10/27 23:47:00 INFO OrcCodecPool: Got brand-new codec ZLIB
22/10/27 23:47:00 INFO OrcCodecPool: Got brand-new codec ZLIB
22/10/27 23:47:00 INFO OrcInputFormat: FooterCacheHitRatio: 0/2
22/10/27 23:47:00 INFO SparkContext: Starting job: sql at SparkSqlHive.scala:17
22/10/27 23:47:00 INFO DAGScheduler: Got job 0 (sql at SparkSqlHive.scala:17) with 3 output partitions
22/10/27 23:47:00 INFO DAGScheduler: Final stage: ResultStage 0 (sql at SparkSqlHive.scala:17)
22/10/27 23:47:00 INFO DAGScheduler: Parents of final stage: List()
22/10/27 23:47:00 INFO DAGScheduler: Missing parents: List()
22/10/27 23:47:00 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at sql at SparkSqlHive.scala:17), which has no missing parents
22/10/27 23:47:00 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 228.3 KiB, free 4.0 GiB)
22/10/27 23:47:00 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 81.8 KiB, free 4.0 GiB)
22/10/27 23:47:00 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on LAPTOP-NDJFRBI4:57758 (size: 81.8 KiB, free: 4.0 GiB)
22/10/27 23:47:00 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1223
22/10/27 23:47:00 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at sql at SparkSqlHive.scala:17) (first 15 tasks are for partitions Vector(0, 1, 2))
22/10/27 23:47:00 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
22/10/27 23:47:00 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, LAPTOP-NDJFRBI4, executor driver, partition 0, ANY, 7546 bytes)
22/10/27 23:47:00 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, LAPTOP-NDJFRBI4, executor driver, partition 1, ANY, 7546 bytes)
22/10/27 23:47:00 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
22/10/27 23:47:00 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
----------------------注意这里开始调度执行 task
22/10/27 23:47:00 INFO HadoopRDD: Input split: OrcSplit [hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1, start=130975043, length=126979143, isOriginal=true, fileLength=257955207, hasFooter=false, hasBase=true, deltas=[]]
22/10/27 23:47:00 INFO HadoopRDD: Input split: OrcSplit [hdfs://hadoop1:9000/bi/ods/ods_start_log/dt=20210721/000000_1, start=3, length=130975040, isOriginal=true, fileLength=257955207, hasFooter=false, hasBase=true, deltas=[]]
22/10/27 23:47:00 INFO OrcInputFormat: Using column configuration variables columns [str] / columns.types [string] (isAcidRead false)

2.1 于是来到OrcInputFormat#generateSplitsInfo

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat#generateSplitsInfo

static List<OrcSplit> generateSplitsInfo(Configuration conf, Context context)
      throws IOException {
      // 对应打印日志的地方
    if (LOG.isInfoEnabled()) {
      LOG.info("ORC pushdown predicate: " + context.sarg);
    }
    ....
        for (SplitStrategy<?> splitStrategy : splitStrategies) {
          if (isDebugEnabled) {
            LOG.debug("Split strategy: {}", splitStrategy);
          }

          // Hack note - different split strategies return differently typed lists, yay Java.
          // This works purely by magic, because we know which strategy produces which type.
          if (splitStrategy instanceof ETLSplitStrategy) {
          //---------------------  生成splitFutures,用于异步生成split--------------------------
            scheduleSplits((ETLSplitStrategy)splitStrategy,
                context, splitFutures, strategyFutures, splits);
          } else {
            @SuppressWarnings("unchecked")
            List<OrcSplit> readySplits = (List<OrcSplit>)splitStrategy.getSplits();
            splits.addAll(readySplits);
          }
        }
      }

      // Run the last combined strategy, if any.
      if (combinedCtx != null && combinedCtx.combined != null) {
        scheduleSplits(combinedCtx.combined, context, splitFutures, strategyFutures, splits);
        combinedCtx.combined = null;
      }

      // complete split futures
      for (Future<Void> ssFuture : strategyFutures) {
         ssFuture.get(); // Make sure we get exceptions strategies might have thrown.
      }
      // All the split strategies are done, so it must be safe to access splitFutures.
      for (Future<List<OrcSplit>> splitFuture : splitFutures) {
       //---------------------  等待splitFuture 执行完成,获得split,详见下面SplitGenerator的call方法--------------------------
        splits.addAll(splitFuture.get());
      }
    } catch (Exception e) {
      cancelFutures(pathFutures);
      cancelFutures(strategyFutures);
      cancelFutures(splitFutures);
      throw new RuntimeException("ORC split generation failed with exception: " + e.getMessage(), e);
    }

    if (context.cacheStripeDetails) {
      LOG.info("FooterCacheHitRatio: " + context.cacheHitCounter.get() + "/"
          + context.numFilesCounter.get());
    }

    if (isDebugEnabled) {
      for (OrcSplit split : splits) {
        LOG.debug(split + " projected_columns_uncompressed_size: "
            + split.getColumnarProjectionSize());
      }
    }
    return splits;
  }

2.2 于是重点进OrcInputFormat#scheduleSplits 看,如何生成splitfuture

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat#scheduleSplits

  private static void scheduleSplits(ETLSplitStrategy splitStrategy, Context context,
      List<Future<List<OrcSplit>>> splitFutures, List<Future<Void>> strategyFutures,
      List<OrcSplit> splits) throws IOException {
      // 重点看这里
    Future<Void> ssFuture = splitStrategy.generateSplitWork(context, splitFutures, splits);
    if (ssFuture == null) return;
    strategyFutures.add(ssFuture);
  }

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.ETLSplitStrategy#generateSplitWork

2.3 进入 OrcInputFormat.ETLSplitStrategy#runGetSplitsSync

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.ETLSplitStrategy#runGetSplitsSync

private void runGetSplitsSync(List<Future<List<OrcSplit>>> splitFutures,
        List<OrcSplit> splits, UserGroupInformation ugi) throws IOException {
      UserGroupInformation tpUgi = ugi == null ? UserGroupInformation.getCurrentUser() : ugi;
      List<SplitInfo> splitInfos = getSplits();
      List<Future<List<OrcSplit>>> localListF = null;
      List<OrcSplit> localListS = null;
      for (SplitInfo splitInfo : splitInfos) {
      // SplitGenerator是一个callable类
        SplitGenerator sg = new SplitGenerator(splitInfo, tpUgi, allowSyntheticFileIds);
        if (!sg.isBlocking()) {
          if (localListS == null) {
            localListS = new ArrayList<>(splits.size());
          }
          // Already called in doAs, so no need to doAs here.
          localListS.addAll(sg.call());
        } else {
          if (localListF == null) {
            localListF = new ArrayList<>(splits.size());
          }
          // 创建futuretask
          localListF.add(Context.threadPool.submit(sg));
        }
      }
      if (localListS != null) {
        synchronized (splits) {
          splits.addAll(localListS);
        }
      }
      if (localListF != null) {
        synchronized (splitFutures) {
          splitFutures.addAll(localListF);
        }
       }
     }
   }

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#call
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#callInternal

2.4 (重点)进入OrcInputFormat.SplitGenerator#generateSplitsFromStripes

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#generateSplitsFromStripes
通过orc文件的stripe 生成split,是以stripe 为基本单位,组装成spllt, 而不会继续分割stripe

    private List<OrcSplit> generateSplitsFromStripes(boolean[] includeStripe) throws IOException {
      List<OrcSplit> splits = new ArrayList<>(stripes.size());

      // after major compaction, base files may become empty base files. Following sequence is an example
      // 1) insert some rows
      // 2) delete all rows
      // 3) major compaction
      // 4) insert some rows
      // In such cases, consider entire base delta file as an orc split (similar to what BI strategy does)
      if (stripes == null || stripes.isEmpty()) {
        splits.add(createSplit(0, file.getLen(), orcTail));
      } else {
        // if we didn't have predicate pushdown, read everything
        if (includeStripe == null) {
          includeStripe = new boolean[stripes.size()];
          Arrays.fill(includeStripe, true);
        }

        OffsetAndLength current = new OffsetAndLength();
        int idx = -1;
        for (StripeInformation stripe : stripes) {
          idx++;

          if (!includeStripe[idx]) {
            // create split for the previous unfinished stripe
            if (current.offset != -1) {
              splits.add(createSplit(current.offset, current.length, orcTail));
              current.offset = -1;
            }
            continue;
          }

          current = generateOrUpdateSplit(
            splits, current, stripe.getOffset(), stripe.getLength(), orcTail);
        }
        generateLastSplit(splits, current, orcTail);
      }
      // Add uncovered ACID delta splits.
      splits.addAll(deltaSplits);
      return splits;
    }

2.5 (重点)进入** OrcInputFormat.SplitGenerator#generateOrUpdateSplit

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitGenerator#generateOrUpdateSplit

private OffsetAndLength generateOrUpdateSplit(
        List<OrcSplit> splits, OffsetAndLength current, long offset,
        long length, OrcTail orcTail) throws IOException {
      // if we are working on a stripe, over the min stripe size, and
      // crossed a block boundary, cut the input split here.
       // current.offset : 表示当前组合stripe的偏移
       // offset : 表示传进来stripe的偏移
       // blockSize : 表示hdfs文件的block大小,是文件自带属性
       // 当 maxSize 特别大的时候,current.offset不变,current.length,offset逐渐增大,当offset大于blockSize,即满足第1个判断条件,就以block大小来划分
       // context.minSize,context.maxSize 正是传进去的参数
      if (current.offset != -1 && current.length > context.minSize &&
          (current.offset / blockSize != offset / blockSize)) {
        splits.add(createSplit(current.offset, current.length, orcTail));
        current.offset = -1;
      }
      // if we aren't building a split, start a new one.
      if (current.offset == -1) {
        current.offset = offset;
        current.length = length;
      } else {
        current.length = (offset + length) - current.offset;
      }
        // 如果stripe累加大小大于 mapreduce.input.fileinputformat.split.maxsize,就开始分割成1个split
      if (current.length >= context.maxSize) {
        splits.add(createSplit(current.offset, current.length, orcTail));
        current.offset = -1;
      }
      return current;
    }

2.6 总结一下,调用关系如下:

Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

其中context.minSize,context.maxSize定义如下 :

org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.Context#Context(org.apache.hadoop.conf.Configuration, int, org.apache.hadoop.hive.ql.io.orc.ExternalCache.ExternalFooterCachesByConf)

      minSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMINSPLITSIZE, DEFAULT_MIN_SPLIT_SIZE);
      maxSize = HiveConf.getLongVar(conf, ConfVars.MAPREDMAXSPLITSIZE, DEFAULT_MAX_SPLIT_SIZE);

Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优默认情况下

mapreduce.input.fileinputformat.split.minsize=1
mapreduce.input.fileinputformat.split.maxsize= 256M

因此,默认情况下,切片大小=blocksize。因为一般文件的block大小为128M, 是小于mapreduce.input.fileinputformat.split.maxsize默认值的

根据orc文件的stripe,结合参数配置,最终生成split

Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

结论: 因此使用 "spark.hadoop.mapreduce.input.fileinputformat.split.maxsize" 能够控制生成split个数,进而控制读取的map task数量

小疑问:为什么设置spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 最后怎么生效成mapreduce.input.fileinputformat.split.maxsize
答案在 org.apache.spark.deploy.SparkHadoopUtil#appendSparkHadoopConfigs 中,创建Sparkcontext时,,将spark.hadoop开头的参数截取后面部分,设置为hadoop的参数

以下为设置"spark.hadoop.mapreduce.input.fileinputformat.split.maxsize""67108864"时,spark sql 读取orc文件的示意图

Spark3.0 Sql 使用HiveTableScanExec 读取Hive orc表源码分析及参数调优

3、总结

(1)通过orc文件的stripe 生成split,是以stripe 为基本单位,组装成spllt, 而不会继续分割stripe
(2)默认情况下,split大小=文件blocksize。因为一般文件的block大小为128M, 是小于mapreduce.input.fileinputformat.split.maxsize默认值256M
(3)当使用 "spark.hadoop.mapreduce.input.fileinputformat.split.maxsize" 小于自身文件block值时,能够提高生成split个数,进而提高读取的map task数量