返回 登录
0

Spark多数据源计算实践及其在GrowingIO的实践

阅读17996

本文作者:田毅,目前在数据分析服务公司GrowingIO数据平台部门工作,Spark社区的Contributor,北京Spark Meetup组织者,2010年开始在电信领域实践应用hadoop,2013年开始关注Spark,从Shark开始向社区贡献代码。目前主要的研究方向是使用Spark搭建企业级的数据计算分析平台。
责任编辑:仲浩(zhonghao@csdn.net)
本文为《程序员》原创文章,未经允许不得转载,更多精彩文章请订阅2016年《程序员》

本文主要介绍如何使用Apache Spark中的DataSource API以实现多个数据源混合计算的实践,那么这么做的意义何在,其主要归结于3个方面:

  • 首先,我们身边存在大量的数据,结构化、非结构化,各种各样的数据结构、格局格式,这种数据的多样性本身即是大数据的特性之一,从而也决定了一种存储方式不可能通吃所有。因此,数据本身决定了多种数据源存在的必然性。
  • 其次:从业务需求来看,因为每天会开发各种各样的应用系统,应用系统中所遇到的业务场景是互不相同的,各种各样的需求决定了目前市面上不可能有一种软件架构同时能够解决这么多种业务场景,所以在数据存储包括数据查询、计算这一块也不可能只有一种技术就能解决所有问题。
  • 最后,从软件的发展来看,现在市面上出现了越来越多面对某一个细分领域的软件技术,比如像数据存储、查询搜索引擎,MPP数据库,以及各种各样的查询引擎。这么多不同的软件中,每一个软件都相对擅长处理某一个领域的业务场景,只是涉及的领域大小不相同。因此,越来越多软件的产生也决定了我们所接受的数据会存储到越来越多不同的数据源。

Apache Spark的多数据源方案

传统方案中,实现多数据源通常有两种方案:冗余存储,一份业务数据有多个存储,或者内部互相引用;集中的计算,不同的数据使用不同存储,但是会在统一的地方集中计算,算的时候把这些数据从不同位置读取出来。下面一起讨论这两种解决方案中存在的问题:

图片描述

图1 多数据源方案

第一种方案中存在的一个问题是数据一致性,一样的数据放在不同的存储里面或多或少会有格式上的不兼容,或者查询的差异,从而导致从不同位置查询的数据可能出现不一致。比如有两个报表相同的指标,但是因为是放在不同存储里查出来的结果对不上,这点非常致命。第二个问题是存储的成本,随着存储成本越来越低,这点倒是容易解决。

第二种方案也存在两个问题,其一是不同存储出来的数据类型不同,从而在计算时需求相互转换,因此如何转换至关重要。第二个问题是读取效率,需要高性能的数据抽取机制,尽量避免从远端读取不必要的数据,并且需要保证一定的并发性。

Spark在1.2.0版本首次发布了一个新的DataSourceAPI,这个API提供了非常灵活的方案,让Spark可以通过一个标准的接口访问各种外部数据源,目标是让Spark各个组件以非常方便的通过SparkSQL访问外部数据源。很显然,Spark的DataSourceAPI其采用的是方案二,那么它是如何解决其中那个的问题的呢?

图片描述

图2 External Datasource API

首先,数据类型转换,Spark中定义了一个统一的数据类型标准,不同的数据源自己定义数据类型的转换方法,这样解决数据源之间相互类型转换的问题;
关于数据处理效率的问题,Spark定义了一个比较简单的API的接口,主要有3个方式:

1./* 全量数据抽取 */
2.trait TableScan {
3.def buildScan(): RDD[Row]
4.}
5.
6./* 列剪枝数据抽取 */
7.trait PrunedScan {
8.def buildScan(requiredColumns: Array[String]): RDD[Row]
9.}
10.
11./* 列剪枝+行过滤数据抽取 */
12.trait PrunedFilteredScan {
13.def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
14.}

TableScan。这种方式需要将1TB的数据从数据抽取,再把这些数据传到Spark中。在把这1TB的数据穿过网络IO传给Spark端之后,Spark还要逐行的进行过滤,从而消耗大量的计算资源,这是目前最低效的方式。

PrunedScan。这个方式有一个好处是数据源只需要从磁盘读取1TB的数据,并只返回一些列的数据,Spark不需要计算就可以使用1GB的数据,这个过程中节省了大量的网络IO。

PrunedFilteredScan。它需要数据源既支持列过滤也支持行过滤,其好处是在磁盘IO这一层进行数据过滤,因此如果需要1GB数据,可能只抽出2GB大小,经过列过滤的规则再抽出1GB的数据,随后传给Spark,因此这种数据源接口最高效,这也是目前市面上实现的最高效的数据接口。

可直接使用的DataSource实现

目前市面上可以找到的Spark DataSource实现代码有三大类:Spark自带;Spark Packages(http://Spark-packages.org/)网站中存放的第三方软件包;跟随其他项目一同发布的内置的Spark的实现。这里介绍其中几个:

1.JDBCRelation

1.private[sql] case class JDBCRelation(
2.url: String,
3.table: String,
4.parts: Array[Partition],
5.properties: Properties = new Properties())(@transient val sqlContext: SQLContext)
6.extends BaseRelation
7.with PrunedFilteredScan
8.with InsertableRelation {
9….
10.}

以JDBC方式连接外部数据源在国内十分流行,Spark也内置了最高效的PrunedFilteredScan接口,同时还实现了数据插入的接口,使用起来非常方便,可以方便地把数据库中的表用到Spark。以Postgres为例:

1.sqlContext.read.jdbc(
2.“jdbc:postgresql://testhost:7531/testdb”,
3.“testTable”,
4.“idField”, ——-索引列
5.10000, ——-起始index
6.1000000, ——-结束index
7.10, ——-partition数量
8.new Properties
9.).registerTempTable(“testTable”)

实现机制:默认使用单个Task从远端数据库读取数据,如果设定了partitionColumn、lowerBound、upperBound、numPartitions这4个参数,那么还可以控制Spark把针对这个数据源的访问任务进行拆分,得到numPartitions个任务,每个Executor收到任务之后会并发的去连接数据库的Server读取数据。

具体类型:PostgreSQL, MySQL。

问题:在实际使用中需要注意一个问题,所有的Spark都会并发连接一个Server,并发过高时可能会对数据库造成较大的冲击(对于MPP等新型的关系型数据库还好)。

建议:个人感觉,JDBC的数据源适合从MPP等分布式数据库中读取数据,对于传统意义上单机的数据库建议只处理一些相对较小的数据。

2.HadoopFsRelation

第二个在Spark内置的数据源实现,HadoopFs,也是实现中最高效的PrunedFilteredScan接口,使用起来相对来说比JDBC更方便。

1.sqlContext
2..read
3..parquet(“hdfs://testFS/testPath”)
4..registerTempTable(“test”)

实现机制:执行的时候Spark在Driver端会直接获取列表,根据文件的格式类型和压缩方式生成多个TASK,再把这些TASK分配下去。Executor端会根据文件列表访问,这种方式访问HDFS不会出现IO集中的地方,所以具备很好的扩展性,可以处理相当大规模的数据。

具体类型:ORC,Parquet,JSon。

问题:在实时场景下如果使用HDFS作为数据输出的数据源,在写数据就会产生非常大量零散的数据,在HDFS上积累大量的零碎文件,就会带来很大的压力,后续处理这些小文件的时候也非常头疼。

建议:这种方式适合离线数据处理程序输入和输出数据,还有一些数据处理Pipeline中的临时数据,数据量比较大,可以临时放在HDFS。实时场景下不推荐使用HDFS作为数据输出。

3.ElasticSearch

越来越多的互联网公司开始使用ELK(ElasticSearch+LogStash+Kibana)作为基础数据分析查询的工具,但是有多少人知道其实ElasticSearch也支持在Spark中挂载为一个DataSource进行查询呢?

1.EsSparkSQL
2..esDF(hc,indexName,esQuery)
3..registerTempTable(”testTable”)

实现机制:ES DataSource的实现机制是通过对esQuery进行解析,将实际要发往多个ES Nodes的请求分为多个Task,在每个Executor上并行执行。

图片描述

图3 ElasticSearch架构

问题:原生程序使用HTTP方式进行数据加载,吞吐量很低,需要修改为Transport方式。

建议:存储doc数据,随机数据搜索场景使用,做其他数据源的Index。
Apache Phoenix(https://github.com/apache/phoenix
Phoenix提供了一个通过SQL方式访问HBase数据的途径,在不了解HBase实现细节的情况下很方便读写数据。其属于阿帕奇官方发布,位于Apache Phoenix项目里面的子模块。

实现机制:Phoenix的实现机制是通过对SQL解析,将执行计划中并行的部分转换为多个Task在Executor上执行。

1.sqlContext
2..read
3..format(“org.apache.phoenix.Spark”)
4..options(Map(“table” -> table, “zkUrl” -> zookeeperUrl))
5..load.registerTempTable(“testTable”)

实现机制:ES DataSource的实现机制是通过对esQuery进行解析,将实际要发往多个ES Nodes的请求分为多个Task,在每个Executor上并行执行。

问题:需要对Phoenix表模型非常了解,需要使用Rowkey字段进行查询。
建议:实时处理输出的数据,如果后面要进行数据查询,也可以把这个数据直接插入到Apache Phoenix,这样后面查询的数据可以及时得到更新结果。

4.Apache Phoenix

Phoenix提供了一个通过SQL方式访问HBase数据的途径,在不了解HBase实现细节的情况下很方便读写数据。其属于阿帕奇官方发布,位于Apache Phoenix项目里面的子模块。

实现机制:Phoenix的实现机制是通过对SQL解析,将执行计划中并行的部分转换为多个Task在Executor上执行。

1.sqlContext
2..read
3..format(“org.apache.phoenix.Spark”)
4..options(Map(“table” -> table, “zkUrl” -> zookeeperUrl))
5..load.registerTempTable(“testTable”)
代码6
问题:需要对Phoenix表模型非常了解,需要使用Rowkey字段进行查询。
建议:实时处理输出的数据,如果后面要进行数据查询,也可以把这个数据直接插入到Apache Phoenix,这样后面查询的数据可以及时得到更新结果。

5.其他

MongoDB-https://github.com/Stratio/Spark-mongodb
Cassantra-https://github.com/datastax/Spark-cassandra-connector

GrowingIO实践

GrowingIO的数据平台主要分为两部分应用。首先是实时应用,负责实时处理流入数据的ETL,将清洗后的数据录入HBase与ES;然后是离线应用,负责定时执行离线模型计算,将实时数据的结果进行汇总分析。上层搭建了自己的QueryService,负责根据前端需要快速查询返回需要的统计数据。

实时计算部分最主要的功能是数据的ETL,当实时数据从Kafka消费后,会利用Spark提供的JSon DataSource将数据转化为一个Table,再通过JDBC将配置数据引入,通过Phoenix和ES的DataSource将最终存储的目标位置映射成为多个Table,最终通过SparkSQL的load操作插入目标数据源。

离线部分的主要工作是二次汇总分析,这里将模型计算所需的内容从各个数据源挂载到Spark,然后写很多复杂的SQL进行计算,再将结果保存到HBase供QueryService使用。

图片描述

图4 整体架构

图片描述

图5 实时计算架

图片描述

图6 离线计算架构

外部数据源使用中遇到的问题和解决途径

问题:Elastic Search查询数据时,当Mapping数据的列大于Source中列时报Index Out of Bound Exception。
解决:修改RowValueReader的addToBuffer方法。
问题:Elastic Search数据加载默认通过HTTP的接口加载数据,性能极差。
解决:修改为Transport方式加载使得性能提升2-3倍。
问题:Elastic Search性能优化。
解决:需要详细设计Index, 尽量减少每次查询的数据量。
问题:Phoenix4.4与Spark 1.5兼容性。
解决:Spark 1.5修改的DecimalType类型适配,GenericMutableRow 修改为 InternalRow。
问题:PHOENIX-2279 Limit与Union All相关的BUG。
解决:修改Phoenix代码。
问题与解决:Phoenix打包过程中解决与Hadoop版本兼容性
问题与解决:Region Split导致缓存中的Region信息失效(暂时无解)。
问题:由于YARN资源控制导致Excutor端报错:Phoenix JDBC Driver has been closed。
解决:配置额外的内存避免Executor被YARN杀掉。
问题:PhoenixRDD读取数据时Partition数量过少导致读取速度慢。
解决:通过Phoenix的BucketTable增加查询的并行度(建议控制Bucket的数量,避免Table自动split,BucketTable在split后有BUG)。

图片描述

图7 JobServer架构

其他问题——Spark Job Server

由于大量使用了SparkSQL和DataSource,所以面临到的一个新问题就是如何更加有效地去提升Spark平台的资源利用率。 社区中提供了一个开源的Spark Job Server实现(https://github.com/Spark-jobserver/Spark-jobserver) ,相较之下,觉得这个实现对于GrowingIO来说有些复杂,于是自己设计实现了一个简化版的JobServer 。

简化版的JobServer允许用户通过一个SDK提交自己的Job(需要提前部署对应的jar包到JobServer);JobServer使用FairScheduler平均分配每个Job使用的资源;允许设置最大的Job并发数量;允许在Job中设置优先级别。

总结

总体来说,GrowingIO通过使用SparkSQL加DataSourceAPI的方法在很短时间内搭建起一套完整的数据处理平台,并且扩展性很好。对于大多数中小企业来说,是一个便捷有效的途径。 由于SQL的易学,对于团队中的新人上手也是比较容易。

但是大量使用SQL也带来一些问题,比如:所有数据模型的管理,例如所有字段的长度,类型的统一;需要根据性能监控对各个数据源进行不断的优化(例如ES的Index,HBase的Rowkey)。

评论