SparkSQL-Dataset详解
/*** 一:SparkSql--dataset* 1.Perple是一个强类型的类* 2.Dataset中的数据是由结构的,因为People对象中有结构信息,例如字段和字段类型* 3.Dataset能够将使用类似SQL这样声名结构化查询的形式来查询* 4.Dataset是一个强类型,并且类型安全的数据容器,并且提供了结构化查询API和类似RDD* 一样的命令式API** 二:Dataset底层类
·
/**
* 一:SparkSql--dataset
* 1.Perple是一个强类型的类
* 2.Dataset中的数据是由结构的,因为People对象中有结构信息,例如字段和字段类型
* 3.Dataset能够将使用类似SQL这样声名结构化查询的形式来查询
* 4.Dataset是一个强类型,并且类型安全的数据容器,并且提供了结构化查询API和类似RDD
* 一样的命令式API
*
* 二:Dataset底层类型
* 即使使用Dataset命令API,执行计划依然会被优化
* dataset具有RDD的方便,同时也具有Dataframe的性能优势,并且dataset还是强类型,能做到类型安全
* RDD会-+经过AST语法树进行一系列的优化,dataset也是这样的.从逻辑到物理
* Source -> Dataset[People] -> Catalyst -> RDD[InternalRow] -> spark集群
*/
object sparkSqlDataset{
def main(args: Array[String]): Unit = {
//创建sparkSession
val spark = SparkSession.builder()
.appName("ds_intro_demo_dataset")
.master("local[6]")
.getOrCreate()
//导入隐式转换
import spark.implicits._
val sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三", 18), Person("李四", 20), Person("王五", 25)))
val dataset: Dataset[Person] = sourceRDD.toDS()
//Dataset支持强类型API
// dataset.filter(item => item.age > 18 && item.age < 21).show()
//Dataset支持弱类型
// dataset.filter('age > 10).show()
// dataset.filter($"age" > 10).show()
dataset.filter("age > 18 and age < 21 ").show()
// dataset.explain(true)
/**
*dataset.queryExecution.toRdd查看dataset转换成rdd的详细步骤,这个API可以看到底层执行的RDD,这个RDD中的
* 范型就是InternalRow,InternalRow又称为Catalyst Row,是dataset底层的数据结构,无论dataset的范型是什么,无
* 论是dataset[People]还是其他的,其最底层进行处理的数据结构都是InternalRow,所以,Dataset的范型对象在执行
* 前,都需要通过Encoder转换成Internal Row,在输入之前需要把InternalRow通过Decoder转换成范型对象.
*/
//通过Encode将RDD[People]转换成RDD[InternalRow]
val executionRdd: RDD[InternalRow] = dataset.queryExecution.toRdd
}
}
/**
* Dataset装换成范型相同的RDD
*/
object sparkSqlDatasetCover{
def main(args: Array[String]): Unit = {
//创建sparkSession
val spark = SparkSession.builder()
.appName("ds_intro_demo_dataset")
.master("local[6]")
.getOrCreate()
//导入隐式转换
import spark.implicits._
// val sourceRDD = spark.sparkContext.parallelize(Seq(Person("张三", 18), Person("李四", 20), Person("王五", 25)))
// val dataset = sourceRDD.toDS()
val source = spark.createDataset(Seq(Person("张三", 18), Person("李四", 20), Person("王五", 25)))
//RDD[InternaleRow] 通过decoder转换成RDD[自定义范型]
val rdd: RDD[Person] = source.rdd
rdd.filter(item => item.age > 18)
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)