一、概述

  • SparkSql是什么
    • Spark SQL是Spark处理数据的一个模块
    • 专门用来处理结构化数据的模块,像json,parquet,avro,csv,普通表格数据等均可。
    • 与基础RDD的API不同,Spark SQL中提供的接口将提供给更多关于结构化数据和计算的信息,并针对这些信息,进行额外的处理优化。
  • SparkSql操作方式说明
    • SparkSql shell
      • 类似于hive shell
    • DataFrames API
      • 最早专为sql on spark设计的数据抽象,与RDD相似,增加了数据结构scheme描述信息部分。
      • 写spark代码,面向DF(DataFrams缩写)编程,可以与其它Spark应用代码无缝集成。
      • 比RDD更丰富的算子,更有利于提升执行效率、减少数据读取、执行计划优化。
    • DataSets API
      • 集成了RDD强类型和DataFrames结构化的优点,官方正强力打造的新数据抽象类型。
      • 写spark代码,面向DS编程,可以与其它Spark应用代码无缝集成。
      • 比RDD更丰富的算子,更有利于提升执行效率、减少数据读取、执行计划优化。
    • 面向程序接口对接的操作:通过JDBC、ODBC等方式操作SparkSql
      • 通过jdbc、odbc链接后,发送相关的sparksql请求,实现基于sparksql功能开发。
    • SparkSql的特点
      • 可以利用SQL、DataFrams API、DataSets API或其它语言调用的基于sparksql模块计算,均是sparkcore执行引擎,其对计算的表达是独立的,即开发人员可以轻松在不同API之间切换实现相同的功能。
      • 也可以通过命令行、JDBC、ODBC的方式来操作SparkSQL,方便其它数据平台、BI平台使用SparkSql模块。
      • 在spark应用程序开发中,可以无缝使用SparkSql操作数据。
      • 可以直接使用Hive表格数据。
      • 与Hive的兼容性极好:它复用了Hive的前端(去掉驱动mapreduce执行任务的部分)和元数据,因此可以拿过来hivesql的东西在sparksql上运行即可。
        • 并不是100%完全兼容,但绝大多数情况下,不需要改动,或只需要极小的改动!!!
        • 比如个别版本不支持直接insert into table xxx values(xxx...)的插入数据的方式
      • SparkSql的应用中,sql是一个重要方面,但不局限制sql。
    • SparkSql愿景
      • 写更少的代码
      • 读更少的数据
      • 把优化的工作交由底层的优化器运行
        • 把优化工作拿掉,我们并不需要做一些优化工作,也就是小白和高手写出来的应用程序最后的执行效率都是一样的。

二、SparkSql Shell操作SparkSql

//直接输入spark-sql+自己想要添加的参数即可,与spark-shell相似
spark-sql [options]
//如指定运行模式
spark-sql local[*]
//如指定运行spark webui服务的端口,解决多人共用一个入口机时候的进入时候报port bind exception的问题
spark-sql --conf spark.ui.port=4075
//也可以用于似于hive -e的方式,直接直接一段sparksql代码
spark-sql –e “sparksql code”
  • 操作方式
    • 使用方式融合几乎全部hive操作:集成了hive前端使用+元数据信息
      • DDL、DML、DQL
        • 创建库
        • 创建表
        • 删除表
        • 数据装载(不支持直接insert into table ... values(...)方式)
        • 更多的系统函数,包含全部的hive系统函数
        • 添加自定义udf/udaf/udtf等
        • 动态自定义分区
        • hive数据分析数
        • 及其它hive的各项功能

三、开发环境搭建步骤

sparksql依赖库

<!--    sparksql依赖-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_${scala.compat.version}</artifactId>
      <version>2.3.2</version>
      <scope>provided</scope>
    </dependency>

DataFrames相关操作

  • 创建DataFrames-1.6.x
def main(args: Array[String]) {
    //创建sparksql配置
    var conf = new SparkConf()
    conf.setMaster("local[*]");
    conf.setAppName("YYF_SparkSql");
    conf.set("spark.testing.memory","512000000");
    //创建sparkContext,创建sq context
    var sc = new SparkContext(conf)
    var sqlContext = new SQLContext(sc)
    // 添加将RDD转化为DataFrame的功能包引入
//    import sqlContext.implicits._
    //读取对应文件生成DF
    var df = sqlContext.read.json("file:\\D:\\spark课件\\data\\data.json")
    //打印结构
    df.printSchema()
    //转换格式,修改数据类型;转换后形成新列替换旧列
    df=df.withColumn("commentCount",df("commentCount").cast(IntegerType))
      //调用对应的算子,展示数据,相当于select *
    df.show();
    //只显示某一列
    df.select("content").show
    //过滤筛选评论量大于1000的,升序排列
    df.filter("commentCount>1000").select("content","commentCount").orderBy("commentCount").show
    //groupby
    df.groupBy("userId")
      .count()
      .select(col("userId"),col("count").as("num"))
      .orderBy(desc("num"))
      .show()
    sc.stop();
  }
  • DataFrame常用操作
    • 常用命令
      • show
      • printSchema
      • select
      • filter
      • groupBy
      • count
      • orderBy
      • 其它操作类比于sql即可
  • 创建DataFrames-2.3.x
def main(args: Array[String]): Unit = {
    //构建对应的对象
    val sparkSession=SparkSession.builder()
      .appName("yyf_sparkdf2.3")
      .master("local[*]")
      .config("spark.testing.memory","512000000")
      .getOrCreate()
    //读取对应文件生成DF
    var df = sparkSession.read.json("file:\\D:\\spark课件\\data\\data.json")
    df.select("content","commentCount").show()
    df.filter(df("commentCount")>100).select().show()
    df.groupBy("commentCount").count().orderBy(desc("count")).show()

    sparkSession.stop()

  }
  • RDD与DataFrame互操作
    • 将RDD转化成DataFrame(将无结构化数据转化成有结构化数据)
      • 将一个RDD转化为带Scheme的DataFrame
      • 实现转化的方式有两种
        • 反射推断
        • 程序编码实现数据与结构的对应,达到转化目标。(重点)
  •  def main(args: Array[String]): Unit = {
        //创建session
        val session=SparkSession
          .builder()
          .appName("test rdd to df")
          .master("local[*]")
          .config("spark.testing.memory","512000000")
          .getOrCreate()
        //创建结构
    //    val scheme=StructType(
    //      List(StructField("stdno",StringType,true),
    //        StructField("name",StringType,true),
    //        StructField("classId",StringType,true),
    //        StructField("className",StringType,true))
    //    )
        val scheme=StructType(
      "stdno name classId className ".split(" ").map(fieldName=>StructField(fieldName,StringType,true))
    )
        //创建RDD
        val lineRDD = session.sparkContext.textFile("D:\\spark课件\\data\\student.txt")
        val rowRDD=lineRDD.map(_.split("\t")).map(P=>Row(P(0),P(1),P(2),P(3)))
        //基于结构和RDD创建DF
        val df=session.createDataFrame(rowRDD,scheme)
        //DF算子操作,row是每一项
        df.show()
        df.printSchema()
        df.filter(row=>"张三"equals row.getAs[String]("name")).show()
    
        //关闭
        session.stop()
      }
    

  • SparkSql临时表生成以及文件读写

def main(args: Array[String]): Unit = {
    //构建对应的对象
    val sparkSession=SparkSession.builder()
      .appName("yyf_sparkdf2.3")
      .master("local[*]")
      .config("spark.testing.memory","512000000")
      .getOrCreate()

    //读取对应文件生成DF
    var df = sparkSession.read.json("D:\\spark课件\\data\\data.json")
//      var df = sparkSession.read.textFile("D:\\spark课件\\data\\student.txt")
    //创建临时表
    df.createTempView("weibo_origin");
    //执行对应的SQL操作
    sparkSession.sql("select content,commentCount from weibo_origin limit 100").show()
//    sparkSession.sql("select * from weibo_origin").show()
    //写文件 保存
    df.repartition(2).write.format("json").save("./data")
    sparkSession.stop()
  }

三、DataSets API操作SparkSql

  • 1开发环境搭建步骤
    • 与DataFrames完全相同
  • 2 DataSets相关操作
    • 操作说明
      • DataSet集成了RDD和DataFrame的优点,也称为强类型的DataFrame。
      • DataSets和DataFrames具有完全相同的成员函数。
      • 两者中,每个行的数据类型不同。DataFrame也可以叫Dataset[Row],即DataFrame是Dataset的一种特定形式。而DataSet的每一行是不固定的,需要模式匹配来确定。
    • 版本说明
      • 在1.6.2版本DataSet为alpha版测试功能,API方面均没有得到丰富和完善。
      • 在2.0.0开始DataSet得到正式推广使用,由于其API和DataFrame在成员函数中完全对等,在使用上差异极小,由于是强类型,故仅在数据集case class模式匹配时,有明显差别。
    • DataSets的Spark2.3.2版本上的应用
  • def main(args: Array[String]): Unit = {
        //构建对应的对象
        val sparkSession=SparkSession.builder()
          .appName("yyf_TestDS2.3")
          .master("local[*]")
          .config("spark.testing.memory","512000000")
          .getOrCreate()
        //引入自动隐式类型转换
          import sparkSession.implicits._
        //从基础数据对象类型创建DataSet
          val seqDS=Seq(1,2,3,4,5).toDS()
          val list=seqDS.map(_*2).collect()
          list.foreach(println)
    
        //使用样例类
        val studentDS=Seq(Student("张三",19,"河北科技大学")).toDS()
        studentDS.show()
    
        //加载json数据
        var studentJsonDS = sparkSession.read.json("file:\\D:\\spark课件\\data\\student_data.txt").as[Student]
        val stuDFList=studentJsonDS.collect()
        studentJsonDS.foreach(item=>println(item.name))
    
        println("---------------")
        studentJsonDS.filter(row=>row.name equals "张一").show()
    
        sparkSession.stop()
      }

四、多数据集抽象类型对比分析

  • spark抽象数据集列表
    • RDD
    • DataFrame
    • DataSet
  • 相同点
    • 全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
    • 三者都有惰性机制,在进行Transform操作时不会立即执行,在遇到Action操作时会正式提交作业执行。
    • 均采用spark的内存运算和优化策略,内存使用和执行效率上均可以得到保障。
    • 均有partition的概念,便于分布式并行计算处理,达到分而治之。
    • 均有许多共同的函数,如map、filter、sort等。
    • 在进行三者的相关操作时候,个别特殊操作时必须引入一个相同的包依赖。( 早期称为 import sqlContext.implicits._,最新版本称为import spark.implicits._)
    • DF和DS均可以通过模式匹配获取内部的变量类型和值。
    • DF和DS产生于SparkSql,天然支持SparkSql。
  • 区别点
    • RDD
      • 不支持SparkSql操作,均需进行转成DF或是DS才行。
      • 类型是安全的,编译时候即可检查出类型错误。(强类型)
      • 机器间通信、IO操作均需要序列化、反序列化对象,性能开销大。
    • DataFrame
      • 有scheme的RDD:比RDD增加了数据的描述信息。
      • 比RDD的API更丰富,增加了针对结构化数据API。
      • 只有一个固定类型的DataSet,即为DataFrame=DataSet[Row]
      • 序列化和反序列化时做了结构化优化,减少了不必要的结构化信息的序列化,提高了执行效率。
    • DataSet
      • 强类型的DataFrame,与DF有完全相同的成员函数。
      • 每行的类型不固定,需要使用模式匹配case class后,获取实际的类信息、字段类型、字段值。
      • 访问对象数据时,比DF更加直接简单。
      • 在序列化和反序列化时,引入了Encoder机制,达到按需序列化和反序列化,不必像之前整个对象操作了,进一步提高了效率。
  • 应用场景
    • 使用RDD场景
      • 数据为非结构化,如流媒体等数据
      • 对数据集进行底层的转换、处理、控制
      • 不需要列式处理,而是通过常规的对象.属性来使用数据。
      • 对DF、DS带来的开发效率、执行效率提升不敏感时
    • 使用DF(必须)
      • R或是python语言开发者,使用DF
    • 使用DS(必须)
      • 在编译时就有高度的类型安全,想要有类型的JVM对象,用上Catalyst优化,并得益于Tungsten生成的高效代码
    • 使用DF、DS场景
      • 需要丰富的语义、高级抽象和特定领域专用的API时
      • 处理需要对半结构化数据进行高级处理,如filter、map、aggregation、average、sum、SQL查询、列式访问或使用lambda函数
      • 在不同的Spark库之间使用一致和简化的API

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐