Apache Hudi 0.13.0版本重磅发布!

时间:2022-12-23 00:40:37

Apache Hudi 0.13.0 版本引入了许多新功能,包括 Metaserver变更数据捕获新的 Record Merge APIDeltastreamer支持新数据源等。 虽然此版本不需要表版本升级,但希望用户在使用 0.13.0 版本之前按照下面的迁移指南采取相关重大变更行为变更的操作。

此版本与 0.12.0 版本保持相同的表版本 (5),如果从 0.12.0 升级,则无需升级表版本。 如下所述,存在一些重大变更和行为变更,用户在使用 0.13.0 版本之前需要采取相应的措施。

注意:如果从旧版本(0.12.0 之前)迁移,请按顺序检查每个旧版本的升级说明。

迁移指南:重大变更

Bundle包更新

Spark Bundle支持

从现在开始,hudi-spark3.2-bundle 可与 Apache Spark 3.2.1 和 Spark 3.2.x 的更新版本一起使用。 由于Spark 版本 3.2.0 和 3.2.1 之间 HiveClientImplgetHive方法实现不兼容,因此hudi-spark3.2-bundle放弃了对Spark 3.2.0 的支持。

实用程序Bundle包更改

AWS 和 GCP bundle jar 与 hudi-utilities-bundle 分开。意味着用户在使用云服务时需要使用 hudi-aws-bundlehudi-gcp-bundle 以及 hudi-utilities-bundle

新的 Flink Bundle包

Hudi 现在通过新的 hudi-flink1.16-bundle 支持 Flink 1.16.x 。

Spark Lazy文件索引

Hudi 在 Spark 中的文件索引默认切换为Lazy加载:这意味着它只会列出查询请求的分区(即在分区修剪之后),而不是在此版本之前总是列出整个表。这会为大型表带来相当大的性能提升。

如果用户想要更改列表行为,则会添加一个新的配置属性:hoodie.datasource.read.file.index.listing.mode(现在默认为Lazy)。 可以设置两个可能的值:

  • eager:这会在初始化期间列出所有分区路径和其中相应的文件切片。这是 0.13.0 之前的默认行为。如果一个Hudi表有1000个分区,eager模式在构建文件索引时会列出所有分区下的文件。
  • lazy:其中的分区和文件切片将被延迟加载,允许分区修剪谓词被适当地向下推,因此只列出已经被修剪的分区。初始化文件索引时,文件未列在分区下。 在查询中使用谓词(例如,datestr=2023-02-19)进行分区修剪后,文件仅列在目标分区下。

要保留 0.13.0 之前的行为需要设置 hoodie.datasource.read.file.index.listing.mode=eager。

只有当表同时具有以下两种情况时才会发生重大更改:多个分区列和分区值包含未进行 URL 编码的斜杠。
例如假设我们要从分区路径 2022/01/03 解析两个分区列 - 月 (2022/01)日 (03)。 由于分区列的数量(此处为 2 - 分别为)与分区路径中由 /分隔的数量(在本例中为 3 - )不匹配,因此会导致歧义。 在这种情况下不能恢复每个分区列对应的分区值。
有两种方法可以避免重大变更:

  • 第一个选项是更改分区值的构造方式。 用户可以切换月份列的分区值,避免任何分区列值出现斜杠,比如202201,那么解析分区路径(202201/03)就没有问题。
  • 第二个选项是将列表模式切换为 eager。 文件索引将假定表未分区并仅牺牲分区修剪,但将能够像表未分区一样处理查询(因此可能导致性能损失),而不是失败查询。

Spark Structured Streaming 的检查点管理

如果您使用 Spark streaming 摄取到 Hudi,Hudi 会在内部自行管理检查点。 我们现在正在添加对多客户端写入的支持,每个写入客户端都通过流式摄取摄取到同一个 Hudi 表中。 在旧版本的 hudi 中,您不能将多个流式摄取编写器摄取到同一个 hudi 表中(一个具有并发 Spark 数据源编写器的流式摄取写入与锁提供程序一起工作;但是不支持两个 Spark 流式摄取编写器)。 在 0.13.0 中,我们添加了对同一个表进行多个流式摄取的支持。 如果是单个流摄取,用户无需执行任何操作; 旧管道无需任何额外更改即可工作。 但是如果有多个流式写入客户端写入到同一个 Hudi 表,则每个表都必须为配置 hoodie.datasource.write.streaming.checkpoint.identifier 设置一个唯一的值。 此外用户应该设置通常的多写入器配置。更多详情参考

Spark ORC 支持

此版本中删除了 Spark 2.x 中支持的 ORC ,因为为了与Spark 3兼容,Hudi 中对 orc-core:nohive 的依赖现在被 orc-core 取代,Spark 3.x 支持 ORC,但以前的版本不支持了。

强制设置记录键字段

设置记录键字段的配置hoodie.datasource.write.recordkey.field现在需要显示设置,没有默认值。 以前默认值为 uuid。

迁移指南:行为变更

写路径中的模式Schema处理

许多用户想使用 Hudi CDC 用例,他们希望在新模式中删除现有列的模式进行自动演化。 从 0.13.0 版本开始,Hudi 现在具有此功能,可以允许模式自动演化,可以在新模式中删除现有列。

由于根据源Schema在目标表中删除列是很大的行为更改,因此默认情况下禁用此功能并由以下配置保护:hoodie.datasource.write.schema.allow.auto.evolution.column.drop。要启用自动删除列以及传入批次的新演化模式,请将其设置为 true

此配置不需要通过使用例如 ALTER TABLE … Spark 中的 DROP COLUMN 手动演变模式。

删除默认Shuffle并行性

此版本更改了 Hudi 写入操作的Shuffle并行度的方式,包括 INSERT、BULK_INSERT、UPSERT 和 DELETE (hoodie.insert|bulkinsert|upsert|delete.shuffle.parallelism),这最终会影响写入性能。

之前如果用户不配置,Hudi 会使用 200 作为默认的 shuffle 并行度。 从 0.13.0 开始,默认情况下Hudi 通过使用由 Spark 确定的输出 RDD 分区数(如果可用)或使用 spark.default.parallelism 值自动推导suffle并行度。 如果上述Hudi shuffle并行度是用户明确配置的,那么用户配置的并行度仍然用于定义实际的并行度。 对于具有合理大小输入的工作负载,此类行为更改可将开箱即用的性能提高 20%。

如果输入数据文件很小,例如小于 10MB,我们建议显式配置 Hudi shuffle 并行度(hoodie.insert|bulkinsert|upsert|delete.shuffle.parallelism),这样并行度至少为 total_input_data_size/500MB,以 避免潜在的性能下降(有关更多信息请参考调优指南

默认的简单写执行器SimpleExecutor

对于插入/更新插入操作的执行,Hudi 过去使用执行器的概念,依靠内存中的队列将摄取操作(以前通常由 I/O 操作获取shuffle数据)与写入操作分离。到现在 Spark 架构有了很大的发展,使得这种架构变得多余。为了发展这种写入模式并利用 Spark 优势,在 0.13.0 中引入了一个新的简化版本的执行程序,并命名为 SimpleExecutor并将其设置为开箱即用的默认值。

SimpleExecutor 没有任何内部缓冲(即不在内存中保存记录),它在内部实现对提供的迭代器的简单迭代(类似于默认的 Spark 行为)。 它在 Spark 3.x版本上提供了约 10% 的开箱即用性能改进,与 Spark 原生 SparkRecordMerger一起使用时效果更好。

批量插入使用NONE排序以匹配 Parquet 写入

此版本调整了 BULK_INSERT 写入操作的 NONE 排序模式(默认排序模式)的并行度。 从现在开始默认情况下,使用输入并行性而不是随机shuffle并行性 (hoodie.bulkinsert.shuffle.parallelism) 来写入数据,以匹配默认的 parquet 写入行为。这不会更改使用 NONE 排序模式的聚类行为。

BULK_INSERT 写入操作行为的变更提高了开箱即用的写入性能。

如果在默认的NONE排序方式下还是发现小文件问题,我们建议在写入Hudi表之前,先根据分区路径和记录键对输入数据进行排序。还可以使用 GLOBAL_SORT 来确保最佳文件大小。

Deltstreamer 元同步失败处理

在早期版本中,我们使用了一种快速失败的方法,如果任何目录同步失败,则不会尝试同步剩余的目录。 在 0.13.0 中,在任何目录同步失败的操作失败之前尝试同步到所有配置的目录。 在一个目录同步失败的情况下,其他目录的同步仍然可以成功,所以用户现在只需要重试失败的目录即可。

不覆盖内部元数据表Metadata Table配置

由于错误配置可能导致数据完整性问题,在 0.13.0 中我们努力使用户的元数据表配置更加简单。 在内部 Hudi 确定这些配置的最佳选择,以实现系统的最佳性能和稳定性。

以下与元数据表相关的配置是内部配置,用户不能再显式配置这些配置:

hoodie.metadata.clean.async
hoodie.metadata.cleaner.commits.retained
hoodie.metadata.enable.full.scan.log.files
hoodie.metadata.populate.meta.fields

Spark SQL CTAS 性能修复

以前由于配置错误,CTAS 写入操作被错误地设置为使用 UPSERT。 在 0.13.0 版本中我们修复了这个问题,以确保 CTAS 使用 BULK_INSERT操作来提高初始化写入 Hudi 表的性能。

Flink CkpMetadata

在 0.13.0 之前,我们通过清理所有消息来初始化 ckp 元数据(检查点相关元数据)。但一些极端情况没有得到正确处理。 例如:

  • 重新启动作业时,写任务无法正确获取pending的instant。
  • 如果检查点成功并且作业突然崩溃,则instant没有时间提交。 数据丢失,因为最后一个pending的instant被回滚; 然而 Flink 引擎仍然认为检查点/即时是成功的。

Q:为什么要在 0.13.0 版本之前清理消息?
A:为了防止时间线和消息不一致
Q:为什么我们要保留 0.13.0 版本中的消息?
A:不一致有两种情况:

  1. 时间线即时完成但 ckp 消息正在传输(用于提交instant)。
  2. 时间轴时刻处于pending状态,而 ckp 消息未启动(用于启动新instant)。

对于case 1,不需要re-commit instant,如果write task在恢复的时候没有得到pending instant就好了

对于case 2,instant基本上是pending的。instant将被回滚(如预期的那样)。 因此保持 ckp 消息实际上可以保持正确性。

重大特性

元服务器Metaserver

在 0.13.0 中我们引入了元数据集中管理服务 Metaserver。 这是我们在未来引入的平台服务组件之一。 Metaserver 帮助用户轻松管理数据湖平台中的大量表。

注意:这是一项实验性功能。

如需要在环境中设置元服务器,请使用 hudi-metaserver-server-bundle 并将其作为 java 服务器应用程序运行,例如 java -jar hudi-metaserver-server-bundle-<HUDI_VERSION>.jar。 在客户端添加以下选项以与元服务器集成:

hoodie.metaserver.enabled=true
hoodie.metaserver.uris=thrift://<server url>:9090

Metaserver 存储 Hudi 表的元数据,如表名、数据库、所有者; 以及时间线的元数据,如提交instant、action、state等。此外Metaserver 通过 Hudi Spark 包支持 Spark 写入和读取。

更改数据捕获CDC

在 Hudi 表用作流源的情况下,我们希望了解属于单个提交的记录的所有变更。 例如我们想知道哪些记录被插入、删除和更新。 对于更新的记录,后续管道可能希望获取更新前的旧值和更新后的新值。 0.13.0之前,增量查询不包含硬删除记录,用户需要使用软删除流删除,可能不符合GDPR要求。

Change-Data-Capture (CDC) 功能使 Hudi 能够通过生成变更来显示记录是如何变更的,从而处理 CDC 查询用例。

注意:CDC 是一项实验性功能,当前仅支持COW 表。CDC 查询尚不支持 MOR 表。

要使用 CDC,用户需要先在写入表时启用它以记录额外的数据,这些数据由 CDC 增量查询返回。
对于写入,设置 hoodie.table.cdc.enabled=true 并通过 hoodie.datasource.query.incremental.format指定 CDC 日志记录模式,以控制记录的数据。 有3种模式可供选择:

  • data_before_after:这记录了变更记录的操作以及变更前后的整个记录。 这种模式在存储上产生最多的 CDC 数据,并且查询 CDC 结果的计算量最少。
  • data_before:这记录了变更记录的操作和更改前的整个记录。
  • op_key_only:这只记录变更记录的操作和key。这种模式在存储上产生最少的 CDC 数据,但查询 CDC 结果需要最多的计算工作。

默认值为 data_before_after
CDC读取配置如下设置:

hoodie.datasource.query.type=incremental
hoodie.datasource.query.incremental.format=cdc

和其他通常的增量查询选项,如开始和结束即时时间,并返回 CDC 结果。

请注意,hoodie.table.cdc.enabled 是表配置。 一旦启用就不允许为关闭它。 同样不能更改 hoodie.table.cdc.supplemental.logging.mode,一旦它被保存为表配置。

优化记录负载处理

此版本引入了期待已久的支持,可将记录作为其引擎原生表示进行处理,从而避免将它们转换为中间形式 (Avro) 的需要。

注意:此功能处于实验模式,目前仅支持 Spark。

通过引入新的 HoodieRecordMerger 抽象。 HoodieRecordMerger 是未来在 Hudi 中实现任何合并语义的核心和事实来源。在这种能力下,它取代了以前用于实现自定义合并语义的 HoodieRecordPayload 层次结构。 通过依赖 HoodieRecordMerger 形式的统一组件,我们可以在写入操作的整个生命周期内以统一的方式处理记录。 这大大减少了延迟,因为记录现在是引擎原生表示,避免了不必要的复制、反序列化和转换为中间表示 (Avro)。 在基准测试中,与 0.13.0 默认状态相比,upsert 性能提高了 10%,与 0.12.2 相比提高了 20%。

如要启用,需要为 Hudi 表指定不同的配置:

  • 对于 COW,指定 hoodie.datasource.write.record.merger.impls=org.apache.hudi.HoodieSparkRecordMerger
  • 对于 MOR,指定 hoodie.datasource.write.record.merger.impls=org.apache.hudi.HoodieSparkRecordMerger 和 hoodie.logfile.data.block.format=parquet

请注意,当前的 HoodieSparkRecordMerger 实现仅支持与 OverwriteWithLatestAvroPayload 类等效的合并语义,这是当前用于合并记录的默认 HoodieRecordPayload 实现(设置为“hoodie.compaction.payload.class”)。 因此如果您正在使用任何其他 HoodieRecordPayload 实现,则需要等到它被相应的 HoodieRecordMerger 实现替换。

Deltastreamer 中的新源支持

Deltastreamer 是一个完全托管的增量 ETL 实用程序,支持各种数据源。 在此版本中我们添加了三个新数据源。

Proto Kafka源

Deltastreamer 已经支持从 Kafka 中一次性摄取 JSON 和 Avro 格式新事件。 ProtoKafkaSource 支持基于 Protobuf 类的模式。只需一个额外的配置,就可以轻松设置此源。 查看文档以获取更多详细信息。

GCS 增量源

沿着 S3 事件源的路线,我们现在有一种可靠且快速的方法来通过 GcsEventsHoodieIncrSource 从 Google Cloud Storage (GCS) 中的对象中摄取。

Pulsar源

Apache Pulsar 是一个为云构建的开源分布式消息传递和流数据平台。 PulsarSource 支持通过 Deltastreamer 从 Apache Pulsar 摄取。

支持部分负载更新

部分更新是社区中的一个常见用例,它需要能够仅更新某些字段而不是替换整个记录。 以前我们建议用户通过引入他们自己的自定义记录负载实现来满足此用例。 随着该需求变得越来越普遍,在 0.13.0 版本中,我们添加了一个新的记录有效负载实现 PartialUpdateAvroPayload以支持这种开箱即用的功能,因此用户可以使用该实现而不必编写自己的自定义实现。

一致性哈希索引

我们引入了 Consistent Hashing Index 作为使用 Hudi 写入的另一种索引选项。 这是对 0.11.0 版本中添加的 Bucket Index 的增强。 使用桶索引,每个分区的桶/文件组是静态分配的,而使用一致性哈希索引,桶可以动态增长,因此用户无需担心数据倾斜。 桶将根据每个分区的负载因子扩展和收缩。 更多细节可参考此功能设计的 RFC

如需使用,配置如下

hoodie.index.type=bucket
hoodie.index.bucket.engine=CONSISTENT_HASHING
hoodie.bucket.index.max.num.buckets=128
hoodie.bucket.index.min.num.buckets=32
hoodie.bucket.index.num.buckets=4
## do split if the bucket size reach 1.5 * max_file_size
hoodie.bucket.index.split.threshold=1.5
## do merge if the bucket size smaller than 0.2 * max_file_size
hoodie.bucket.index.merge.threshold=0.1 

要强制缩小或扩大存储桶,需要使用以下配置启用Clustering

## check resize for every 4 commit
hoodie.clustering.inline=true
hoodie.clustering.inline.max.commits=4
hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkConsistentBucketClusteringPlanStrategy
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkConsistentBucketClusteringExecutionStrategy
## for supporting concurrent write & resizing
hoodie.clustering.updates.strategy=org.apache.hudi.client.clustering.update.strategy.SparkConsistentBucketDuplicateUpdateStrategy

Consistent Hashing Index 仍然是一个不断发展的特性,目前从 0.13.0 开始使用它有一些限制:

  • 只有使用 MOR 表的 Spark 引擎才支持此索引。
  • 它不适用于启用元数据表。
  • 要扩大或缩小桶,用户必须使用上述配置(以某种节奏)手动触发Clustering,但他们不能同时运行压缩。
  • 因此如果常规写入管道启用了压缩,请遵循以下建议:可以选择每 12 小时触发一次扩容/缩容。 在这种情况下,每 12 小时一次,可能需要禁用压缩、停止写入管道并启用Clustering。 应该格外小心不要同时运行两者,因为这可能会导致冲突和管道失败。 Clustering完成后可以恢复常规写入管道,这将启用压缩。

我们正在努力实现这些自动化,并使用户更容易利用 Consistent Hashing Index。 可以在此处关注 Consistent Hashing Index 正在进行的工作。

多客户端写入的早期冲突检测

Hudi提供乐观并发控制(OCC),允许多个写入客户端在没有重叠数据文件写入的情况下,并发写入并原子提交到Hudi表,保证数据的一致性、完整性和正确性。 在0.13.0版本之前,这种重叠数据文件的冲突检测是在提交元数据之前和数据写入完成之后进行的。 如果在最后阶段检测到任何冲突,则可能会浪费计算资源,因为数据写入已经完成。

为了提高并发控制,0.13.0版本引入了OCC早期冲突检测的新特性,利用Hudi的标记机制,在数据写入阶段检测到冲突,一旦检测到冲突就提前中止写入。 Hudi 现在可以更早地停止冲突写入器,因为它可以及早检测冲突并释放集群所需的计算资源,从而提高资源利用率。

注意:OCC 中的早期冲突检测在 0.13.0 版本中是实验性的。

默认情况下,此功能处于关闭状态。 要尝试这一点,用户需要在使用 OCC 进行并发控制时将 hoodie.write.concurrency.early.conflict.detection.enable设置为 true(有关更多详细信息请参阅并发控制)。

写入数据中的无锁消息队列

在以前的版本中,Hudi 使用生产者-消费者模型通过有界内存队列将传入数据写入表中。 在此版本中我们添加了一种新型队列,利用 Disruptor,它是无锁的。 当数据量很大时,这会增加写入吞吐量。 将 1 亿条记录写入云存储上的 Hudi 表中的 1000 个分区的基准显示,与现有的有界内存队列执行器类型相比,性能提高了 20%。

注意:DisruptorExecutor 作为实验特性支持 Spark 插入和 Spark 批量插入操作

用户可以设置 hoodie.write.executor.type=DISRUPTOR_EXECUTOR 来启用该功能。 还有其他配置,如 hoodie.write.wait.strategyhoodie.write.buffer.size 可以进一步调整性能。

Hudi CLI 包

我们为 Spark 3.x 引入了一个新的 Hudi CLI Bundle,hudi-cli-bundle_2.12,使 Hudi CLI 更简单易用。 用户现在可以使用这个 bundle jar(已经发布到 Maven 仓库)和 Hudi Spark bundle 来启动脚本来启动带有 Spark 的 Hudi-CLI shell,Hudi-CLI 可轻松部署,因为用户不需要在本地编译 Hudi CLI 模块、上传 jar 和解决任何依赖冲突(如果有)。 可以在 Hudi CLI 页面上找到详细说明。

支持 Flink 1.16

Flink 1.16.x 集成Hudi,在编译源码时使用profile参数-Pflink1.16激活版本。 或者使用 hudi-flink1.16-bundle。 Flink 1.15、Flink 1.14 和 Flink 1.13 也将继续支持。

Json Schema转换器

对于配置模式注册表的 DeltaStreamer 用户,添加了一个 JSON schema转换器,以帮助将 JSON 模式转换为目标 Hudi 表的 AVRO。 将 hoodie.deltastreamer.schemaprovider.registry.schemaconverter 设置为 org.apache.hudi.utilities.schema.converter.JsonToAvroSchemaConverter 以使用此功能。 用户还可以实现此接口 org.apache.hudi.utilities.schema.SchemaRegistryProvider.SchemaConverter 以提供从原始模式到 AVRO 的自定义转换。

通过 Spark SQL 设置 Hudi Config

用户现在可以通过 Spark SQL conf 提供 Hudi 配置,例如,设置

spark.sql("set hoodie.sql.bulk.insert.enable = true")

确保 Hudi 在执行 INSERT INTO 语句时能够使用 BULK_INSERT 操作

感谢

感谢参与0.13.0版本的所有贡献者,欢迎广大数据湖爱好者加入Apache Hudi社区,欢迎star & fork https://github.com/apache/hudi