EMR 打造高效云原生数据分析引擎

时间:2024-03-23 11:55:30

本场视频链接:

https://developer.aliyun.com/live/1545?spm=a2c6h.12873639.0.0.244d6dceFT4wxM&groupCode=apachespark

本场ppt材料:

https://www.slidestalk.com/AliSpark/2019___0926_110365

链接可点击文末阅读原文直达


基于开源体系打造云上数据分析平台

客户选择开源方案的原因主要有以下几点:

• 灵活多样的业务场景:目前即便是一个小企业,其数据存储也可能是多种多样的,比如业务数据、日志数据和图数据等,这种情况下,需要有一个高度定制化的系统来串联不同的业务场景;自己有专业的运维能力:开源系统有充足的人才储备,丰富的网上资料与开源的强大后盾,可以确保公司业务的顺利开展;多种业务需求vs成本压力:每种云上产品有自己的使用场景,对于中小企业来说购买多种云产品将会造成很大的成本压力,而通过开源体系维护一套系统,在集成用户业务中所需组件的同时,可以降低用户的成本。EMR 打造高效云原生数据分析引擎

下图是阿里巴巴EMR系统的产品架构图。用户上云的方式主要有两种,一种是购买ECS资源自己搭建一套开源系统;另一种是直接选择阿里巴巴的EMR系统。第一种方式由于开源系统组件多,涉及到了Spark、Hive、Flink和TensorFlow等,从零搭建一套完整的大数据系统对于用户来讲非常复杂,尤其是成百上千的集群规模也给运维造成了很大的挑战。而使用EMR系统具有以下优点:

EMR 打造高效云原生数据分析引擎

EMR系统的目标主要有以下三个:

• 平台化:将EMR做成一个统一的云上数据分析平台,帮助用户打造全栈式的大数据解决方案,支持全系列VM容器化,提供企业级HAS和大数据APM;技术社区&深度:持续深耕技术社区,打造大数据友好的云 Native 存储,同时将技术回馈给社区,为社区做贡献;生态:EMR系统将结合阿里云其他产品构建一个生态,接入Blink、PAI,集成OSS、OTS方案。

EMR 打造高效云原生数据分析引擎

EMR-Jindo:云原生高效数据分析引擎

下图展示了TPC-DS的基准测试报告,可以发现在2019年3月份10TB的测试中,性能指标得分是182万左右,成本是0.31 USD;而2019年十月份同样的测试性能指标得分已经变成526万,成本下降到0.53 CNY,也就是说经过半年左右性能提升了2.9倍,成本缩减到原来的四分之一。同时阿里巴巴还成为了首个提交TPC-DS测试100TB测试报告的厂商。这些成绩的背后是EMR-Jindo引擎的支持。

EMR 打造高效云原生数据分析引擎

EMR-Jindo引擎架构主要分为两部分:

• Jindo-Spark:EMR内部全面优化的Spark高效计算引擎,可以处理多种计算任务;Jindo-FS:自研的云原生存储引擎,兼容开源HDFS的接口,兼顾性能与价格。

EMR 打造高效云原生数据分析引擎

1) Jindo-Spark

Jindo-Spark高效计算引擎对Spark采取了一系列优化措施,比如Runtime Filter支持自适应的运行时数据裁剪;Enhanced Join Reorder来解决外连接重排等问题;TopK支持推理并下推 TopK 逻辑,帮助尽早地过滤数据;File Index支持文件级别过滤和min/max/bloom/倒排等;自研开发了Relational Cache,实现使用一套引擎就可以将查询从分钟级提升为亚秒级;针对特定的场景推出Spark Transaction功能,为Spark引入Full ACID支持;实现了Smart Shuffle功能,从底层来减少sort-merge 次数,提升Shuffle的效率。

EMR 打造高效云原生数据分析引擎

• Runtime Filter

EMR 打造高效云原生数据分析引擎

根据这些统计信息,将备选数据比较少的一侧提取出来,推到另一侧进行过滤。Runtime Filter的成本很小,只需要在优化器中进行简单评估,却可以带来显著的性能提升。如下图所示,Runtime Filter实现了35%左右的整体性能提升。该特性已经在Spark提交了PR(SPARK-27227)。Enhanced Join Recorder

比如下图左上角的例子中,如果最底层两个表非常大的话,则这两张表join的开销会非常大,join后的大数据再去join小表,大数据一层一层地传递下去,就会影响整个流程的执行效率。此时,优化的思想是先将大表中一些无关的数据过滤掉,减少往下游传递的数据量。针对该问题,Spark使用的是动态规划算法,但其只适用于表的数量比较少的情况,如果表的数量大于12,该算法就束手无策。面对表的数量比较多的情况,EMR提供了多表join的遗传算法,其可以将原来的动态规划算法的2n的复杂度降到线性的量级,能完成成百上千张表的join。

下图右上角可以看到,Query64有18个表参与join,动态规划算法优化时间就需要耗费1400秒,而多表join的遗传算法仅需要20秒左右就可完成。Join Recorder另外一个重要的功能是外连接重排算法,大家都知道sql中外连接不能随意交换顺序的,但这并不代表不能交换顺序,比如A left join B, 然后再left join C,事实上在某种条件下其顺序是可交换的。在Spark中,外连接的优化是直接被放弃掉,而EMR则根据现有研究找到了顺序可交换的充分必要条件,实现了外连接重排算法(如下图左下角所示),对外连接的执行效率有了质的提升(下图右下角)EMR 打造高效云原生数据分析引擎

• Relational Cache

EMR 打造高效云原生数据分析引擎

Relational Cache的创建过程如下,其语法与Spark sql常见的DDL类似。首先CACHE一个表或视图,然后指定Relational Cache的更新策略(DEMAND或COMMIT)、是否用于后续优化、Cache数据的存储方式以及Cache的视图逻辑。Relational Cache支持cache任意Table、View,支持cache到内存、HDFS、OSS等任意数据源,JSON、ORC、Parquet等任意数据格式。

EMR 打造高效云原生数据分析引擎

EMR 打造高效云原生数据分析引擎

• Spark Transaction:有些用户可能会使用Hive表,Hive表有事务支持,然而Spark在事务这一项上是不兼容Hive的。因此,为了满足用户数据订正/删除以及数据流导入的场景支持,EMR平台提供了Spark Transaction支持事务的ACID支持。

传统的数据导入是分批的,比如一天一导入,而流数据导入场景下数据是实时写入的原始数据,并未经过任何处理,因此会有delete和update的需求。Spark Transaction整体来讲是一种锁+MVCC的实现形式,MVCC与底层的存储密不可分。大数据在Hive和Spark兼容的情况下,都是文件的形式存在目录中,文件的版本通过行来控制,写入的每一行都会加上Meta Columns,如op、original_write-id、bucket id和row_id等,来标识这是全表唯一的一行。当需要更新某一行的时候,并不会原地更新该行,而是将该行取出来,重写后产生新的版本进行存储。读取的时候,多版本会进行合并后返回给用户。

EMR 打造高效云原生数据分析引擎

 2) Jindo-FS

EMR 打造高效云原生数据分析引擎

进而的解决方案是将数据从远端拉取到计算侧进行缓存,这也是Jindo-FS做的事情。Jindo-FS是类似于HDFS的系统,其架构也类似于HDFS的Master-Slave架构,分成Name Service 和Storage Service。它支持将某些访问频率比较高的表可以放到RocksDB中进行多级缓存。Jindo-FS整体不同于HDFS的Master结点, Jindo-FS的“Master”(Name Service)是一个分布式集群,使用raft 协议,提供入口服务;提供多Name Space支持;元数据以kv形式存放于高性能kv store 中;因为其本身不存储数据,真实数据在OSS和OTS中,因此支持数据的弹性扩展和销毁重建。

EMR 打造高效云原生数据分析引擎

Jindo-FS底层的元数据管理会将数据拆成一系列的kv,通过递增的id来逐层查询。如/home/Hadoop/file1.txt需要读三次OTS。下图右侧的测试结果说明Jindo-FS在元数据操作方面相对于OSS有较好的性能提升。

EMR 打造高效云原生数据分析引擎

Jindo-FS使用Storage Service来进行底层存储,在写流程中Storage Service将要写的文件同时存储到本地和OSS中,然后再返回给用户写的结果,同时还会在集群结点内进行多副本传输;而读操作和HDFS类似,如果命中本地,则在本地存储中读取,否则要进行远程读取。Storage Service具备高性能、高可靠、高可用、弹性存储等特性,为了支持高性能,Jindo-FS建立了数据流高速通道,同时还有一系列的策略,如减少内存拷贝次数等。

EMR 打造高效云原生数据分析引擎

EMR 打造高效云原生数据分析引擎