Spark API

  • SparkContext

    1、连接Driver与Spark Cluster(Workers)
    2、Spark的主入口
    3、每个JVM仅能有一个活跃的SparkContext
import org.apache.spark.{SparkConf, SparkContext}

创建SparkContext对象
val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark")
val sc=SparkContext.getOrCreate(conf)
  • SparkSession

    Spark 2.0+应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.master("local[2]").appName("appName").getOrCreate()
  • RDD的创建

使用集合创建RDD
val rdd=sc.parallelize(List(1,2,3,4,5,6))
val rdd=sc.parallelize(List(1,2,3,4,5,6),5)
#Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定

val rdd=sc.makeRDD(List(1,2,3,4,5,6))

通过加载文件产生RDD
val rdd=sc.textFile("file:///home/hadoop/data/hello.txt")   本地文件
val rdd=sc.textFile("hdfs://hadoop000:8020/hello.txt")
  • 广播变量

val broadcastVar=sc.broadcast(Array(1,2,3))  //定义广播变量
broadcastVar.value 		//访问方式
  • 累加器

    只允许added操作,常用于实现计数
val accum = sc.accumulator(0,"My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
accum.value
  • 案例:使用共享变量实现黑名单过滤并计数

val spark = SparkSession.builder.master("local[2]").appName("appName").getOrCreate()
val sc = spark.sparkContext
val words=sc.textFile("src/data/courses").map(_.split(","))
val blackList=sc.broadcast(List("hadoop","spark")) 
val accum = sc.longAccumulator
words.map(x=>{if(blackList.value.contains(x))
	{
		accum.add(1)
		None			
	}
	else
		Some(word)
}).filter(x=>x!=None).foreach(x=>print(x.get))
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐