Spark streaming+Kafka读取数据存hdfs

一、环境准备:

spark 2.3.0下载

kafka 1.1.0下载

二、Kafka代码

1、maven依赖的包以及编译环境(pom.xml)

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

2、创建UserProducer生产者的类

package com.chak

import java.io.File
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.io.Source

object UserProducer {
  def main(args: Array[String]): Unit = {

    val props = new Properties()
    props.setProperty("bootstrap.servers","node1:9092");
    val producter = new KafkaProducer[String,String](props,new StringSerializer(),new StringSerializer());
    /**
      * user_friends.csv文件格式如下
      * user,friends...
        3197468391,1346449342 3873244116 4226080662 1222907620 547730952 1052032722 2138119761
        3537982273,1491560444 395798035 2036380346 899375619 3534826887 3427911581 494959696
        823183725,1484954627 1950387873 1652977611 4185960823 427135664 3046862298 3536710376
      */
    Source.fromFile(new File("E:\\Projects\\kafka_user\\src\\main\\resources\\my_friends.csv")).getLines().foreach(line =>{
      val record = new ProducerRecord[String,String]("user_friend",line.toString);
      println(line)
      Thread.sleep(200)
      producter.send(record);
    })
  }
}

三、spark streaming代码

1、maven依赖的包以及编译环境(pom.xml) 

<dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-compiler</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>

2、创建UserTotal类的Dstream

package com.chak

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UserTotal{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("UserTotal")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc,Seconds(5))

    //kafka
    val config = Map[String,String](
      "bootstrap.servers"->"node1:9092",
      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      "group.id"->"spark_streaming"
    )
    val topics = Set("user_friend")
    //创建Dstream
    val kafka = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](topics,config))
    val lines = kafka.map(_.value())
    val pairs = lines.map(x => {
      var ss = x.split(",")
      var friends = ss(1).split(" ")
      (ss(0),friends.size)
    })
    pairs.foreachRDD(rdd => {
      //将结果存入hdfs
      rdd.saveAsTextFile("hdfs://node2:9000/user/hadoop/spark/data")
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

四、运行

1、分别启动zookeeper、kafka进程(笔者用的kafka自带的zookeeper)

[hadoop@node1 kafka]$ bin/zookeeper-server-start.sh config/zookeeper.properties

[hadoop@node1 kafka]$ bin/kafka-server-start.sh config/server.properties

2、将项目打成jar上传到运行的机器上


3、spark submit提交脚本运行


错误信息:

提交运行spark任务,报错如下:(大概是yarn的异常)


打开MR管理页面,看到错误如下:2.2g的虚拟内存实际值,超过了2.1g的上限。也就是说虚拟内存超限,所以contrainer被干掉了


解决方法:在yarn-site.xml添加如下信息

    <property>
	<name>yarn.nodemanager.vmem-pmem-ratio</name>
	<value>4</value>
    </property>
    <property>
	<name>yarn.nodemanager.vmem-check-enabled</name>
	<value>false</value>
    </property>

五、运行结果:

kafka生产者


hdfs查询结果如下:


至此完成,如过在提交spark任务时运行找不到kafkaUtils,请看笔者的另一篇日志spark streaming + kafka 找不到kafkaUtils类,希望对你有帮助



Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐