Spark API
Spark APISparkContext1、连接Driver与Spark Cluster(Workers)2、Spark的主入口3、每个JVM仅能有一个活跃的SparkContextimport org.apache.spark.{SparkConf, SparkContext}创建SparkContext对象val conf=new SparkConf().setMaster("local[2
·
Spark API
import org.apache.spark.{SparkConf, SparkContext}
创建SparkContext对象
val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark")
val sc=SparkContext.getOrCreate(conf)
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.master("local[2]").appName("appName").getOrCreate()
使用集合创建RDD
val rdd=sc.parallelize(List(1,2,3,4,5,6))
val rdd=sc.parallelize(List(1,2,3,4,5,6),5)
#Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定
val rdd=sc.makeRDD(List(1,2,3,4,5,6))
通过加载文件产生RDD
val rdd=sc.textFile("file:///home/hadoop/data/hello.txt") 本地文件
val rdd=sc.textFile("hdfs://hadoop000:8020/hello.txt")
val broadcastVar=sc.broadcast(Array(1,2,3)) //定义广播变量
broadcastVar.value //访问方式
val accum = sc.accumulator(0,"My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
accum.value
val spark = SparkSession.builder.master("local[2]").appName("appName").getOrCreate()
val sc = spark.sparkContext
val words=sc.textFile("src/data/courses").map(_.split(","))
val blackList=sc.broadcast(List("hadoop","spark"))
val accum = sc.longAccumulator
words.map(x=>{if(blackList.value.contains(x))
{
accum.add(1)
None
}
else
Some(word)
}).filter(x=>x!=None).foreach(x=>print(x.get))
更多推荐



所有评论(0)