《ODPS权威指南》笔记

ODPS简介

ODPS是什么

**ODPS(Open Data Processing Services)**是一个海量数据处理平台,提供大数据处理的云计算服务,基于阿里巴巴自主研发的分布式操作系统开发,主要提供结构化和半结构化数据的存储和计算。

ODPS的作用

  1. 基于SQL构建大规模数据仓库和企业BI系统
  2. 于DAG和Graph等分布式编程模型开发数据应用
  3. 基于统计和机器学习算法开发大数据统计模型和数据挖掘

ODPS的架构

ODPS执行计划

CREATE TABLE dw_log_parser AS
SELECT ip, user, time,
    regexp_substr(request, "(^[^ ]+ )") as method,
    regexp_extract(request, "^[^ ]+ (.*) [^ ]+$") as url,
    regexp_substr(request, "([^ ]+$)") as protocol,status, size, referer, agent
FROM ods_log_tracker
WHERE dt='20140301';
# 上述SQL生成的运行结果

# 标识本次执行的作业,当作业运行时间很长时,可以通过status 20140715085318597g1yuj7h查看运行状态
InstanceId: 20140715085318597g1yuj7h
# 具体的SQL
SQL: ...# SQL运行的主要信息
Summary of SQL:
# 输入的分区数和字节数,并且底层文件采用了压缩,这里是压缩后的大小
Inputs:
	odps_book.ods_log_tracker/dt=20140301: 469 (22591 bytes)
# 输出的分区数和字节数,并且底层文件采用了压缩,这里是压缩后的大小
Outputs:
	odps_book.dw_log_parser/dt=20140301: 469 (22655 bytes)
# 表示M1_Stg1阶段的运行情况
M1_Stg1_odps_book_20140715085318597g1yuj7h_SQL_0_0_0_job0:
	# 表示作业在1个worker上运行
	Worker Count : 1
	# 读取的记录数,min、max、avg表示各个Worker上的统计值,此处只有一个Worker
	Input Records :
	input: 469 (min: 469, max: 469, avg: 469)
	# 输出的记录数
	Output Records :M1_Stg1FS_5529574: 469 (min: 469, max: 469, avg: 469)
OK

Tips

  • 在导入数据时,将数据全部导入,不要丢弃任何数据,因为即使是脏数据也是非常有价值的(可以将脏数据导入到另一个文件中),比如在反作弊中。

  • ONWHERE的区别

    • INNER JOIN 中是等价的
    • LEFT OUTER JOIN中,ON会先执行并生成临时表,然后再执行WHERE过滤临时表中的数据,再输出最终结果。
  • MapJoin

    • 适用场景:当一张大表和一张或多张小表JOIN时,可以使用MapJOIN

    • 使用方法:在SELECT后加上 /*+ MAPJOIN(小表的名称或别名) */

    • 注意事项:如果使用的是LEFT OUTER JOIN或者RIGHT OUTER JOIN时,主表不能是小表。

    • 原理:执行过程是会把小表拷贝到各个Worker中,每个Worker直接读取大表的某个Split数据然后和小表进行JOIN计算,生成结果。此时不需要Reduce操作,所以也就没有了Shuffle阶段。对于多表连接,Shuffle往往会带来大量的网络IO操作,成为作业运行的瓶颈。

    • 举例

      SELECT /*+ MAPJOIN(u) */
      	u.user_id,
      	u.gender,
      	u.age,
      	pv.view_time,
      FROM user u
      JOIN page_view pv
      ON u.user_id = pv.user_id;
      
  • DISTRIBUTE BYSORT BYMapReduce的关系

    • 比如在网站访问日志中,希望能够实现按访问IP排序,相同IP按其时间Time排序,生成结果。
      • 解决方法:Key(IP, Time)二元组,分区键设置为IP,分组键设置为IP
      • 原因:分区键决定相同IP的数据会被分到同一个Reducer,但并不能保证(ip1, time1)(ip1, time2)会被分组到一起,因为此时的Key(IP, Time),它们的Key不相等,所以需要设置分组键为IP(默认是看Key是否相等)
      • 其实就相当于SQL中的DISTRIBUTE BY ip SORT BY ip, time
  • SQL的执行顺序

    • SELECT<select list>
    • FROM <left table>
    • <join_type> JOIN <right_table>
    • ON <join_condition>
    • WHERE <condition>
    • GROUP BY <group by list>
    • HAVING <having condition>
    • ORDER BY <order by list>
    • LIMIT <number>
  • SQL抽象为算子(逻辑查询计划实际上就是由这些算子构成的DAG)

    • TableScanOperator(TS)FROM操作
    • FilterOperator(FIL)WHEREHAVING操作
    • GBYOperator(GBY)GROUP BYDISTINCT操作
    • SelectOperator(SEL)SELECT操作
    • OrderByOperator(ORDER)ORDER BYLIMIT操作
    • JoinOperator(JOIN)JOIN操作
    • UnionAllOperator(UNION)UNIONUNION ALL操作
  • 逻辑查询计划

    • 普通子查询

      • SELECT sum(cnt) as hits
        FROM (
        	SELECT count(*) as cnt
            FROM t1
            JOIN t2
            ON t1.user_id = t2.user_id
            GROUP BY t1.user_id
        ) t3;
        
      • image-20210807150920315
    • 谓词下推

      • SELECT *
        FROM t1 JOIN t2
        ON t1.id = t2.id
        WHERE t1.age > 10 AND t2.age > 5;
        
        # 上述SQL会被优化为
        SELECT *
        FROM t1 JOIN t2
        ON t1.id = t2.id AND t1.age > 10 AND t2.age > 5;
        
      • image-20210807151220510
  • 数据倾斜

    • 使用COUNT(DISTINCT)
      • 当使用的是COUNT(*)时,相当于会在Map端进行一次Combine操作,以减少数据量(也就是减少网络IO),而使用COUNT(DISTINCT)操作时,无法进行Combine操作,如下图所示。
      • 使用COUNT(*)时,IMG_8235
      • 使用COUNT(DISTINCT)时,IMG_8236
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐