大数据SQL数据倾斜与数据膨胀的优化与经验总结

( 本文阅读时间:5分钟 )

01 背景

目前市面上大数据查询分析引擎层出不穷,如Spark,Hive,Presto等,因其友好的SQL语法,被广泛应用于各领域分析,公司内部也有优秀的ODPS SQL供用户使用。

笔者所在团队的项目也借用ODPS SQL去检测业务中潜在的安全风险。在给业务方使用与答疑过程中,我们发现大多含有性能瓶颈的SQL,主要集中在数据倾斜与数据膨胀问题中。

因此,本文主要基于团队实际开发经验与积累,并结合业界对大数据SQL的使用与优化,尝试给出相对系统性的解决方案。

本文主要涉及业务SQL执行层面的优化,暂不涉及参数优化。若设置参数,首先确定执行层面哪个阶段(Map/Reduce/Join)任务执行时间较长,从而设置对应参数。

本文主要分为以下三个部分:

第一部分,会引入数据倾斜与数据膨胀问题;

第二部分,介绍当数据倾斜与数据膨胀发生时,如何排查与定位;

第三部分,会从系统层面给出常见优化思路。

02 问题篇

2.1 数据倾斜

数据倾斜是指在分布式计算时,大量相同的key被分发到同一个reduce节点中。针对某个key值的数据量比较多,会导致该节点的任务数据量远大于其他节点的平均数据量,运行时间远高于其他节点的平均运行时间,拖累了整体SQL执行时间。

其主要原因是key值分布不均导致的Reduce处理数据不均匀。本文将从Map端优化,Reduce端优化和Join端优化三方面给出相应解决方案。

2.2 数据膨胀

数据膨胀是指任务的输出条数/数据量级比输入条数/数据量级大很多,如100M的数据作为任务输入,最后输出1T的数据。这种情况不仅运行效率会降低,部分任务节点在运行key值量级过大时,有可能发生资源不足或失败情况。

03 排查定位篇

本节主要关注于业务SQL本身引起的长时间运行或者失败,对于集群资源情况,平台故障本身暂不考虑在内。

1.首先检查输入数据量级。与其他天相比有无明显量级变化,是否因为数据量级的问题天然引起任务运行时间过长,如双11,双十二等大促节点;

2.观察执行任务拆分后各个阶段运行时间。与其他天相比有无明显量级变化;在整个执行任务中时间耗时占比情况;

3.最耗时阶段中,观察各个Task的运行情况。Task列表中,观察是否存在某几个Task实例耗时明显比平均耗时更长,是否存在某几个Task实例处理输入/输出数据量级比平均数据量级消费产出更多;

4.根据步骤3中定位代码行数,定位问题业务处理逻辑。

04 优化篇

4.1 数据倾斜

✪ 4.1.1 Map端优化

读取数据合并

在数据源读取查询时,动态分区数过多可能造成小文件数过多,每个小文件至少都会作为一个块启动一个Map任务来完成。对于文件数量而言,等于 map数量 * 分区数。对于一个Map任务而言,其初始化的时间可能远远大于逻辑处理时间,因此通过调整Map参数把小文件合并成大文件进行处理,避免造成很大的资源浪费。

列裁剪

减少使用select * from table语句,过多选择无用列会增加数据在集群上传输的IO开销;对于数据选择,需要加上分区过滤条件进行筛选数据。

谓词下推

在不影响结果的情况下,尽可能将过滤条件表达式靠近数据源位置,使之提前执行。通过在map端过滤减少数据输出,降低集群IO传输,从而提升任务的性能。

数据重分布

在Map阶段做聚合时,使用随机分布函数distribute by rand(),控制Map端输出结果的分发,即map端如何拆分数据给reduce端(默认hash算法),打乱数据分布,至少不会在Map端发生数据倾斜。

✪ 4.1.2 Reduce端优化

关联key空值检验

部分实例发生长尾效应,很大程度上由于null值,空值导致,使得Reduce时含有脏值的数据被分发到同一台机器中。

针对这种问题SQL,首先确认包含无效值的数据源表是否可以在Map阶段直接过滤掉这些异常数据;如果后续SQL逻辑仍然需要这些数据,可以通过将空值转变成随机值,既不影响关联也可以避免聚集。


SELECT  ta.id 
FROM    ta 
LEFT JOIN tb 
ON      coalesce(ta.id , rand()) = tb.id; 

排序优化

Order by为全局排序,当表数据量过大时,性能可能会出现瓶颈;Sort by为局部排序,确保Reduce任务内结果有序,全局排序不保证;Distribute by按照指定字段进行Hash分片,把数据划分到不同的Reducer中;CLUSTER BY:根据指定的字段进行分桶,并在桶内进行排序,可以认为cluster by是distribute by+sort by。

对于排序而言,尝试用distribute by+sort by确保reduce中结果有序,最后在全局有序。


-- 原始脚本 
select * 
from user_pay_table 
where dt = '20221015' 
order by amt 
limit 500 
; 

-- 改进脚本 
SELECT  * 
FROM    user_pay_table 
WHERE   dt = '20221015' 
DISTRIBUTE BY ( CASE 
                   WHEN amt = 100 AND age 

✪ 4.1.3 Join端优化

大表join小表

通过将需要join的小表分发至map端内存中,将Join操作提前至map端执行,避免因分发key值不均匀引发的长尾效应,复杂度从(M*N)降至(M+N),从而提高执行效率。ODPS SQL与Hive SQL使用mapjoin,SPARK使用broadcast。

大表join大表

长尾效应由热点数据导致,可以将热点数据加入白名单中,通过对白名单数据和非白名单数据分别处理,再合并数据。

具体表现为打散倾斜key,进行两端聚合(针对聚合)或者拆分倾斜key进行打散然后再合并数据。

4.2 数据膨胀

避免笛卡尔积

Join关联条件有误,表Join进行笛卡尔积,造成数据量爆炸。

关联key区分度校验

关注JoinKey区分度,key值区分度越低(distinct数量少),越有可能造成数据爆炸情况。如用户下的性别列,交易下的省市列等。

聚合操作误用

部分聚合操作需要将中间结果记录下来,最后再生成最终结果,这使得在select操作时,按照不同维度去重Distinct、不同维度开窗计算over Partition By可能会导致数据膨胀。针对这种业务逻辑,可以将一个SQL拆分成多个SQL分别进行处理操作。

05 总结

大数据SQL优化是一项涉及知识面较广的工作,除了分析现有执行计划之外,还需要学习相应查询分析引擎设计原理。针对我们日常遇到的问题现总结分享给大家,供大家查阅。

参考资料

[01] ODPS SELECT语法

https://help.aliyun.com/document_detail/73777.html?utm_content=g_1000230851&spm=5176.20966629.toubu.3.f2991ddcpxxvD1#section-ag9-2c4-t0e

[02] Presto Query Lifecycle

https://varada.io/blog/presto/accelerate-presto-trino-queries-data-lake/

[03] A Definitive Guide To Hive Performance Tuning- 10 Excellent Tips

https://www.hdfstutorial.com/blog/hive-performance-tuning/

[04] Presto Performance: Speed, Optimization & Tuning:

https://ahana.io/learn/presto-performance/

[05] Hive Optimizing Joins

*https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.0.0.2/ds_Hive/optimize-joins.html