大数据之Spark(一)--- Spark简介,模块,安装,使用,一句话实现WorldCount,API,scala编程,提交作业到spark集群,脚本分析
一、Spark简介----------------------------------------------------------1.快如闪电的集群计算2.大规模快速通用的计算引擎3.速度: 比hadoop 100x,磁盘计算快10x4.使用: java / Scala /R /python5.提供80+算子(操作符),容易构建并行应用。...
·
一、Spark简介
----------------------------------------------------------
1.快如闪电的集群计算
2.大规模快速通用的计算引擎
3.速度: 比hadoop 100x,磁盘计算快10x
4.使用: java / Scala /R /python
5.提供80+算子(操作符),容易构建并行应用。
6.通用: 组合SQL ,流计算 + 复杂分析。
7.运行: Hadoop, Mesos, standalone, or in the cloud,local.
8.DAG //direct acycle graph,有向无环图
二、Spark模块
--------------------------------------------------------
Spark core //核心模块
Spark SQL //SQL
Spark Streaming //流计算
Spark MLlib //机器学习
Spark graph //图计算
三、安装Spark
--------------------------------------------------------
1.下载spark-2.1.0-bin-hadoop2.7.tgz
..
2.解压
..
3.环境变量
[/etc/profile]
SPARK_HOME=/soft/spark
PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
/soft/spark/bin
/soft/spark/sbin
[source]
$>source /etc/profile
4.验证spark
$>cd /soft/spark
$>./spark-shell
5.webui
http://s100:4040/
四、使用Spark
--------------------------------------------------------
1.进入终端
$>
2.sc ==> spark程序的入口点,封装了整个spark运行环境的信息
scala> sc
sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2b5ca574
五、一句话实现WorldCount
----------------------------------------------------------
//加载文件,返回RDD[按行切割]
scala> val rdd1 = sc.textFile("/home/ubuntu/downloads/1.txt");
rdd1: org.apache.spark.rdd.RDD[String] = /home/ubuntu/downloads/1.txt MapPartitionsRDD[1] at textFile at <console>:24
//开始按照',' 进行切割,然后压扁成一个集合,单个单词的集合
rdd1.flatMap(line => {line.split(",")})
//进行 单词和数量的映射
map(word => (word,1))
//reduce聚合,按照key
val rdd2 = reduceByKey(_ + _)
//查看单词统计的结果
scala> rdd2.collect
res2: Array[(String, Int)] = Array((tom1,1), (4,1), (14,1), (7,1), (15,1), (5,1), (tom2,1), (6,1), (tom6,1), (2,1), (16,1), (3,1), (tom3,1), (tom4,1), (17,1), (12,1), (13,1), (tom5,1), (1,1), (11,1), (tom7,1))
//一句话实现wc
scala> sc.textFile("/home/ubuntu/downloads/1.txt").flatMap(line => {line.split(",")}).map(word => (word,1)).reduceByKey(_ + _).collect
//加单词过滤,屏蔽"tom"关键字
scala> sc.textFile("/home/ubuntu/downloads/1.txt")
.flatMap(line => {line.split(",")})
.filter(!_.contains("tom"))
.map(word => (word,1))
.reduceByKey(_ + _)
.collect
res6: Array[(String, Int)] = Array((4,1), (14,1), (7,1), (15,1), (5,1), (6,1), (2,1), (16,1), (3,1), (17,1), (12,1), (13,1), (1,1), (11,1))
六、API
------------------------------------------------------
1.[SparkContext]
Spark功能的主要入口点。代表到Spark集群的连接,可以创建RDD、累加器和广播变量.
每个JVM只能激活一个SparkContext对象,在创建sc之前需要stop掉active的sc。
2.[RDD]
resilient distributed dataset,弹性分布式数据集。等价于集合。以换行符作为文件分割
3.[SparkConf]
spark配置对象,设置Spark应用各种参数,kv形式
七、scala编程 -- idea 引入spark类库,完成wordcount
-------------------------------------------------------------
1.创建spark模块
2.添加maven支持
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
3.添加scala支持
4.编写Object -- WorldCountDemo
import org.apache.spark.{SparkConf, SparkContext}
object WorldCountDemo {
def main(args: Array[String]): Unit = {
//创建spark配置对象
val conf = new SparkConf();
//设置appname
conf.setAppName("sparkwc");
//设置本地模式
conf.setMaster("local");
//创建核心 -- 上下文
val sc = new SparkContext(conf);
val rdd1 = sc.textFile("d:\\calllog.log");
val rdd2 = rdd1.flatMap(line => line.split(","));
val rdd3 = rdd2.map(word => (word,1));
val rdd4 = rdd3.reduceByKey(_ + _);
val r = rdd4.collect();
r.foreach(e => println(e));
}
}
5.运行app,查看结果
6.java实现
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.*;
/**
* 单词统计java版
*/
public class WorldCountDemoJava {
public static void main(String [] args)
{
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("wcjava");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD jrdd1 = jsc.textFile("d:\\calllog.log");
//压扁成单个单词
JavaRDD jrdd2 = jrdd1.flatMap(new FlatMapFunction<String,String>() {
public Iterator call(String s) throws Exception {
List<String> list = new ArrayList<String>();
String [] strs = s.split(",");
for(String ss : strs)
{
list.add(ss);
}
return list.iterator();
}
});
//完成 单词到数量的映射(word -- (word,1))
JavaPairRDD<String,Integer> jrdd3 = jrdd2.mapToPair(new PairFunction<String,String,Integer>() {
public Tuple2<String,Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s,1);
}
});
//开始统计
Map<String, Long> map = jrdd3.countByKey();
Set<String> set = map.keySet();
for(String s : set)
{
System.out.println(s + ":" + map.get(s));
}
}
}
八、提交作业到spark集群上运行
------------------------------------------------
1.导出jar包
2.复制到共享目录下
3.使用spark-submit提交命令,运行jar
$> spark-submit --master local --name wc --class com.spark.demo.java.WorldCountDemoJava TestSpark-1.0-SNAPSHOT.jar /home/ubuntu/downloads/1.txt
$> spark-submit --master local --name wc --class com.spark.demo.scala.WorldCountDemoScala TestSpark-1.0-SNAPSHOT.jar /home/ubuntu/downloads/1.txt
九、部署spark集群
-----------------------------------------------
1.local
nothing!
spark-shell --master local; //默认
2.standalone
独立。
a)复制spark目录到其他主机
b)配置其他主机的所有环境变量
[/etc/profile]
SPARK_HOME
PATH
c)配置master节点s100的slaves,并分发到所有节点
[/soft/spark/conf/slaves]
s202
s203
s204
d)在s100上启动spark集群
/soft/spark/sbin/start-all.sh [为了避免和hadoop集群混淆,要进入到sbin目录下执行sh文件]
e)查看进程
$>xcall.jps jps
master //s100
worker //s200
worker //s300
worker //s400
e)webui
http://s100:8080/
十、提交作业jar作业到完全分布式spark集群
--------------------------------------------------------
1.启动hadoop的hdfs集群
$> start-dfs.sh
2.put要进行单词统计的文件到hdfs
3.运行spark-submit
$> spark-submit --master spark://s100:7077 --name wc --class com.spark.demo.java.WorldCountDemoJava TestSpark-1.0-SNAPSHOT.jar hdfs://s500:8020/data/spark/1.txt
$> spark-submit --master spark://s100:7077 --name wc --class com.spark.demo.scala.WorldCountDemoScala TestSpark-1.0-SNAPSHOT.jar hdfs://s500:8020/data/spark/1.txt
十一、脚本分析
--------------------------------------------------------
[start-all.sh]
sbin/spark-config.sh
sbin/spark-master.sh //启动master进程
sbin/spark-slaves.sh //启动worker进程
[start-master.sh]
sbin/spark-config.sh
org.apache.spark.deploy.master.Master
spark-daemon.sh start org.apache.spark.deploy.master.Master --host --port --webui-port ...
[spark-slaves.sh]
sbin/spark-config.sh
slaves.sh //conf/slaves
[slaves.sh]
for conf/slaves{
ssh host start-slave.sh ...
}
[start-slave.sh]
CLASS="org.apache.spark.deploy.worker.Worker"
sbin/spark-config.sh
for (( .. )) ; do
start_instance $(( 1 + $i )) "$@"
done
$>cd /soft/spark/sbin
$>./stop-all.sh //停掉整个spark集群.
$>./start-all.sh //启动整个spark集群.
$>./start-master.sh //启动master节点
$>./start-slaves.sh //启动所有worker节点
$s400>./start-slave.sh spark://s100:7077 //在s400上启动单个worker节点
更多推荐



所有评论(0)