返回 登录
0

在Spark中使用Pivot重塑数据

阅读24732

本文来自Andrew Ray博士在Silicon Valley Data Science网站上发表的博客,Andrew Ray博士对大数据有着浓厚的兴趣并且有着丰富的Spark使用经验。Andrew同样也是一名活跃的Apache Spark源码贡献者,其源码贡献主要集中在Spark SQL和GraphX组件上。

透视(pivot)数据功能是Spark 1.6的众多新增特性之一,它通过使用DataFrame(目前支持Scala、Java和Python语言)创建透视表(pivot table)。透视可以视为一个聚合操作,通过该操作可以将一个(实际当中也可能是多个)具有不同值的分组列转置为各个独立的列。透视表在数据分析和报告中占有十分重要的地位,许多流行的数据操纵工具(如pandas、reshape2和Excel)和数据库(如MS SQL和Oracle 11g)都具有透视数据的能力。在以前的博客当中我已经做了简要介绍,但在本文中,我将更深入地给大家讲解具体细节。本博文的代码可以从这里下载。

语法

在为透视操作进行pull请求的过程中,我进行了许多相关研究,其中一项便是对其它优秀工具的语法进行比较,目前透视语法格式多种多样,Spark 透视功能最主要的两个竞争对手是pandas(Python语言)和reshape2(R语言)。

图片描述

图片描述

例如,我们想对A列和B列进行分组,然后在C列上进行透视操作并对D列数据进行求和,pandas的语法格式为 pivot_table(df, values=’D’, index=[‘A’, ‘B’], columns=[‘C’], aggfunc=np.sum),这看起来有点冗长但表达还算清晰,如果使用reshape2的话,其语法格式为 dcast(df, A + B ~ C, sum),借助于R语言公式的表达能力,这种语法十分紧凑,需要注意的是reshape2不需要指定求值列,因为它自身具备将剩余DataFrame列作为最终求值列的能力(当然也可能通过其它参数进行显式指定)。

我们提出Spark透视操作自有的语法格式,它能够与DataFrame上现有其它聚合操作完美结合,同样是进行group/pivot/sum操作,在Spark中其语法为:df.groupBy(“A”, “B”).pivot(“C”).sum(“D”),显然这种语法格式非常直观,但这其中也有个值得注意的地方:为取得更好的性能,需要明确指定透视列对应的不同值,例如如果C列有两个不同的值(small 和 large),则性能更优的版本语法为: df.groupBy(“A”, “B”).pivot(“C”, Seq(“small”, “large”)).sum(“D”)。当然,这里给出的是Scala语言实现,使用Java语言和Python语言实现的话方法也是类似的。

报告

让我们来看一些实际应用案例,假设你是一个大型零售商(例如我前任东家),销售数据具有标准交易格式并且你想制作一些汇总数据透视表。当然,你可以选择将数据聚合到可管理的大小,然后使用其它工具去制作最终的数据透视表(尽管初始聚合操作的粒度受限)。但是现在你可以在Spark中进行所有操作(在进行这些操作之前需要进行若干IF判断),不过不幸的是没有大的零售商愿意将它们原始的销售数据共享给我们,因此我们将使用合成的数据进行演示,这里推荐使用TPC-DS 数据集,该数据集是我用过的数据集中比较好的一个,它的元数据(Schema)与实际零售数据非常相似。

图片描述

因为TPC-DS是为进行不同大小的“大数据”数据库基准测试而合成的数据集,所以我们可以使用尺度因子(scale factors)决定最终想要生成的数据集大小。为简单起见,这里的尺度因子为1,对应数据集大小为1GB。由于需求有点复杂,我使用了docker镜像以便大家可以跟着学习。假设我们想根据种类(category)和季度(quarter)对数据进行汇总,各季度数据最终在数据透视表中以列的形式展示,此时我们可以通过下列代码完成上述需求(更真实的查询可能会有更多条件如时间范围等):

(sql("""select *, concat('Q', d_qoy) as qoy
  from store_sales
  join date_dim on ss_sold_date_sk = d_date_sk
  join item on ss_item_sk = i_item_sk""")
  .groupBy("i_category")
  .pivot("qoy")
  .agg(round(sum("ss_sales_price")/1000000,2))
  .show)

+-----------+----+----+----+----+
| i_category|  Q1|  Q2|  Q3|  Q4|
+-----------+----+----+----+----+
|      Books|1.58|1.50|2.84|4.66|
|      Women|1.41|1.36|2.54|4.16|
|      Music|1.50|1.44|2.66|4.36|
|   Children|1.54|1.46|2.74|4.51|
|     Sports|1.47|1.40|2.62|4.30|
|      Shoes|1.51|1.48|2.68|4.46|
|    Jewelry|1.45|1.39|2.59|4.25|
|       null|0.04|0.04|0.07|0.13|
|Electronics|1.56|1.49|2.77|4.57|
|       Home|1.57|1.51|2.79|4.60|
|        Men|1.60|1.54|2.86|4.71|
+-----------+----+----+----+----+

注意,我们将销售额以百万元为单位并精确到小数点后两位以便于更清晰地比较,上面的数据结果有两个值得注意的地方:首先,四季度的数据明显要更多,这对任何熟悉零售业的人来说都很好理解;其次,同一季度中种类为null的异常结果值比较接近。遗憾的是,即使是如此优秀的合成数据集也与真实情况有出入,如果你有比该合成数据集更好且对公众开放的数据,请告诉我。

特征生成

第二个例子,让我们来看预测模型中的特征生成,在实际应用中,数据集中的目标观测值常常以每条一行(称为长格式或窄数据)的格式进行组织。为构建模型,我们首先需要将数据重塑,每个目标值重塑为一行,根据上下文该任务可以有多种方法来完成,其中一种方法便是通过Spark中的透视操作来完成。这也许是其它工具如pandas、reshape2和Excel完成不了的,因为结果集可能有成百万甚至数十亿行。

为使实验能够容易地再现,我将使用相对较小的MovieLens 1M数据集,该数据集中包含了由6040个用户针对3952个电影生成的大约一百万个电影评级数据。我们尝试根据100个最流行的电影评级去预测用户的性别。在下面的例子当中,评级表有三列:user、 movie和rating。

+----+-----+------+
|user|movie|rating|
+----+-----+------+
|  11| 1753|     4|
|  11| 1682|     1|
|  11|  216|     4|
|  11| 2997|     4|
|  11| 1259|     3|
...

为得到每用户一行格式的数据,我们进行如下透视操作:

val ratings_pivot = ratings.groupBy("user").pivot("movie", popular.toSeq).agg(expr("coalesce(first(rating),3)").cast("double"))

上面代码中的popular变量为最流行的电影列表(通过评级数得到),同时我们将默认评级设为3,对于用户11,其影评数据结果如下:

+----+----+---+----+----+---+----+---+----+----+---+...
|user|2858|260|1196|1210|480|2028|589|2571|1270|593|...
+----+----+---+----+----+---+----+---+----+----+---+...
|  11| 5.0|3.0| 3.0| 3.0|4.0| 3.0|3.0| 3.0| 3.0|5.0|...
+----+----+---+----+----+---+----+---+----+----+---+...

上面的数据为建模时所需要的宽格式数据,完整例子代码在这。需要注意的是:我只使用了100个最流行的电影,因为当前的透视操作需要作用于成千上万个不同值,在当前的实现中其速度不是特别快。我们未来将解决这一问题。

提示和技巧

为获取最好的性能,透视操作时需要指定透视列对应的不同值(如果你知道的话),不然的话Spark会立即启动一个job来确定这些值。除此之外,它们将按照排好的顺序放置,对大部分应用而言,这种做法是合理的,但是对部分应用而言,如每周各天的顺序,这种做法是不合理的(如Friday, Monday, Saturday等) 。

透视同其它正常的聚合操作一样,支持多个聚合表达式,只要将多个参数传递给agg方法即可,例如df.groupBy(“A”, “B”).pivot(“C”).agg(sum(“D”), avg(“D”))
虽然语法上只允许对某一列进行透视,但你可以将多个列组合起来,其得到的结果与透视多个列得到的结果一样,例如:

+----+----+---+----+----+---+----+---+----+----+---+...
|user|2858|260|1196|1210|480|2028|589|2571|1270|593|...
+----+----+---+----+----+---+----+---+----+----+---+...
|  11| 5.0|3.0| 3.0| 3.0|4.0| 3.0|3.0| 3.0| 3.0|5.0|...
+----+----+---+----+----+---+----+---+----+----+---+...

最后,你可能会对在未明确指定时,对应透视列所允许的值最大数感兴趣,这也是捕获错误及避免内存溢出(OOM)场景的主要关注点。其配置键(config key)为spark.sql.pivotMaxValues,默认值为10000,你可能并不需要对其进行修改。

实现

透视函数的实现通过添加新的逻辑算子(o.a.s.sql.catalyst.plans.logical.Pivot)进行,该逻辑算子被新的分析器规则(o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot)翻译,该新分析器规则会将其翻译成带有许多带有if语句的聚合操作,每个透视值对应一个表达式。

例如, df.groupBy(“A”, “B”).pivot(“C”, Seq(“small”, “large”)).sum(“D”)将被翻译成df.groupBy(“A”, “B”).agg(expr(“sum(if(C = ‘small’, D, null))”), expr(“sum(if(C = ‘large’, D, null))”))。你也可能直接这么用但这会使代码比较冗长且容易出错。

未来的工作

Spark中的透视功能仍然有待于提升,目前大量的工作集中在以下几个方面:

• 在R API和SQL语法(类似Oracle 11g和MS SQL)中添加透视功能,为用户提供更大的语言选择范围,使透视功能使用更简便。
• 添加逆透视的支持,其功能与透视操作相反
• 当透视列中的不同值较多时需要提升透视的速度,我目前正在想办法解决这一问题。

原文链接:Reshaping Data with Pivot in Spark

译者简介:牛亚真,中科院计算机信息处理专业硕士研究生,关注大数据技术和数据挖掘方向。责编:仲浩

评论