Kafka+Spark streaming读取数据存hdfs
待更新
·
Spark streaming+Kafka读取数据存hdfs
一、环境准备:
二、Kafka代码
1、maven依赖的包以及编译环境(pom.xml)
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
2、创建UserProducer生产者的类
package com.chak
import java.io.File
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source
object UserProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.setProperty("bootstrap.servers","node1:9092");
val producter = new KafkaProducer[String,String](props,new StringSerializer(),new StringSerializer());
/**
* user_friends.csv文件格式如下
* user,friends...
3197468391,1346449342 3873244116 4226080662 1222907620 547730952 1052032722 2138119761
3537982273,1491560444 395798035 2036380346 899375619 3534826887 3427911581 494959696
823183725,1484954627 1950387873 1652977611 4185960823 427135664 3046862298 3536710376
*/
Source.fromFile(new File("E:\\Projects\\kafka_user\\src\\main\\resources\\my_friends.csv")).getLines().foreach(line =>{
val record = new ProducerRecord[String,String]("user_friend",line.toString);
println(line)
Thread.sleep(200)
producter.send(record);
})
}
}
三、spark streaming代码
1、maven依赖的包以及编译环境(pom.xml)
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
2、创建UserTotal类的Dstream
package com.chak
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object UserTotal{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("UserTotal")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc,Seconds(5))
//kafka
val config = Map[String,String](
"bootstrap.servers"->"node1:9092",
"key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
"group.id"->"spark_streaming"
)
val topics = Set("user_friend")
//创建Dstream
val kafka = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,config))
val lines = kafka.map(_.value())
val pairs = lines.map(x => {
var ss = x.split(",")
var friends = ss(1).split(" ")
(ss(0),friends.size)
})
pairs.foreachRDD(rdd => {
//将结果存入hdfs
rdd.saveAsTextFile("hdfs://node2:9000/user/hadoop/spark/data")
})
ssc.start()
ssc.awaitTermination()
}
}
四、运行
1、分别启动zookeeper、kafka进程(笔者用的kafka自带的zookeeper)
[hadoop@node1 kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties
[hadoop@node1 kafka]$ bin/kafka-server-start.sh config/server.properties
2、将项目打成jar上传到运行的机器上
3、spark submit提交脚本运行
错误信息:
提交运行spark任务,报错如下:(大概是yarn的异常)
打开MR管理页面,看到错误如下:(2.2g的虚拟内存实际值,超过了2.1g的上限。也就是说虚拟内存超限,所以contrainer被干掉了)
解决方法:在yarn-site.xml添加如下信息
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>4</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
五、运行结果:
kafka生产者
hdfs查询结果如下:
至此完成,如过在提交spark任务时运行找不到kafkaUtils,请看笔者的另一篇日志spark streaming + kafka 找不到kafkaUtils类,希望对你有帮助
更多推荐
已为社区贡献1条内容
所有评论(0)