/**
  * 一:SparkSql--dataset
  * 1.Perple是一个强类型的类
  * 2.Dataset中的数据是由结构的,因为People对象中有结构信息,例如字段和字段类型
  * 3.Dataset能够将使用类似SQL这样声名结构化查询的形式来查询
  * 4.Dataset是一个强类型,并且类型安全的数据容器,并且提供了结构化查询API和类似RDD
  * 一样的命令式API
  *
  * 二:Dataset底层类型
  * 即使使用Dataset命令API,执行计划依然会被优化
  * dataset具有RDD的方便,同时也具有Dataframe的性能优势,并且dataset还是强类型,能做到类型安全
  * RDD会-+经过AST语法树进行一系列的优化,dataset也是这样的.从逻辑到物理
  * Source -> Dataset[People] -> Catalyst -> RDD[InternalRow] -> spark集群
  */
object sparkSqlDataset{
  def main(args: Array[String]): Unit = {
    //创建sparkSession
    val spark = SparkSession.builder()
      .appName("ds_intro_demo_dataset")
      .master("local[6]")
      .getOrCreate()
    //导入隐式转换
    import  spark.implicits._

    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三", 18), Person("李四", 20), Person("王五", 25)))

    val dataset: Dataset[Person] = sourceRDD.toDS()

    //Dataset支持强类型API
//    dataset.filter(item => item.age > 18 && item.age < 21).show()

    //Dataset支持弱类型
//    dataset.filter('age > 10).show()
//    dataset.filter($"age" > 10).show()
    dataset.filter("age > 18 and age < 21 ").show()

//    dataset.explain(true)
    /**
      *dataset.queryExecution.toRdd查看dataset转换成rdd的详细步骤,这个API可以看到底层执行的RDD,这个RDD中的
      * 范型就是InternalRow,InternalRow又称为Catalyst Row,是dataset底层的数据结构,无论dataset的范型是什么,无
      * 论是dataset[People]还是其他的,其最底层进行处理的数据结构都是InternalRow,所以,Dataset的范型对象在执行
      * 前,都需要通过Encoder转换成Internal Row,在输入之前需要把InternalRow通过Decoder转换成范型对象.
      */
      //通过Encode将RDD[People]转换成RDD[InternalRow]
    val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd
  }
}

/**
  * Dataset装换成范型相同的RDD
  */
object sparkSqlDatasetCover{
  def main(args: Array[String]): Unit = {
    //创建sparkSession
    val spark = SparkSession.builder()
      .appName("ds_intro_demo_dataset")
      .master("local[6]")
      .getOrCreate()
    //导入隐式转换
    import  spark.implicits._

//    val sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三", 18), Person("李四", 20), Person("王五", 25)))
    //    val dataset = sourceRDD.toDS()

    val source = spark.createDataset(Seq(Person("张三", 18), Person("李四", 20), Person("王五", 25)))
    //RDD[InternaleRow] 通过decoder转换成RDD[自定义范型]
    val rdd: RDD[Person] = source.rdd

    rdd.filter(item => item.age > 18)
  }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐