文章目录


依赖

    <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
            <exclusions>
                <exclusion>
                    <groupId>net.jpountz.lz4</groupId>
                    <artifactId>lz4</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

高阶API

package stream

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object TestStream2 {
  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("Chapter8_4_2")
  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(1))

  /**
   * 2 为读取分区线程数
   */
   val value: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "note01:2181,note02:2181,note03:2181", "group_test", Map("test" -> 2))
    value.map(_._2).filter(_.equals("A")).foreachRDD(
      rdd => {
        //Driver上执行
        rdd.foreachPartition(
          p => {
            //Executor上执行
            p.foreach(result => println(result))
          }
        )
      }
    )

  ssc.start()
  ssc.awaitTermination()
}

高阶API会自己维护偏移量,低阶API需要自己手动维护偏移量

低阶API

在项目中大多使用低阶API读取kafka中数据,如果数据量较大,集群健康状态不稳定,网络波动低阶API有助于对于消费失败数据进行精准恢复
封装ZKUtil

package utils

import org.I0Itec.zkclient.exception.ZkMarshallingError
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.hadoop.yarn.lib.ZKClient

object ZKUtil {
  def initZKClient(zkServers: String, sessionTimeOut: Int, connectionTimeOut: Int) = {
  new ZKClient(zkServers,sessionTimeOut,connectionTimeOut,new ZkSerializer {
    override def serialize(o: Any): Array[Byte] = {
      try{
        o.toString.getBytes("UTF-8")
      }catch {
        case _:ZkMarshallingError => null
      }
    }

    override def deserialize(bytes: Array[Byte]): AnyRef = {
      try{
        new String(bytes,"UTF-8")
      }catch {
        case _:ZkMarshallingError => null
      }
    }
  })
  }
}

消费数据自己维护offset


import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo, TopicMetadataRequest}
import kafka.common.TopicAndPartition
import kafka.consumer.SimpleConsumer
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, Success}
import utils.ZKUtil

import scala.collection.mutable
import scala.util.{Success, Try}

object StreamText {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("StreamText")
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))
    val topics = Set("text_01")
    val kafkaParams = mutable.Map[String, String]()
    kafkaParams.put("bootstrap.servers", "note01:9092")
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaParams.put("session.timeout.ms", "30000")
    kafkaParams.put("enable.auto.commit", "false")
    kafkaParams.put("max.poll.records", "100")
    kafkaParams.put("kafka.topics", "spark_streaming_test")
    kafkaParams.put("group.id", "g_spark_test")
    val zkHost = "note01:2181,note02:2181,note03:2181"
    val sessionTimeout = 120000
    val connectionTimeout = 60000
    val zkClient = ZKUtil.initZKClient(zkHost, sessionTimeout, connectionTimeout)

    val zkTopic = "spark_streaming_test"
    val zkConsumerGroupId = "g_spark_test"

    val zKGroupTopicDirs = new ZKGroupTopicDirs(zkConsumerGroupId, zkTopic)
    val zkTopicPath = zKGroupTopicDirs.consumerOffsetDir
    val childrenCount = zkClient.countChildren(zkTopicPath)
    var kafkaStream: InputDStream[(String, String)] = null
    var fromOffsets: Map[TopicAndPartition, Long] = Map()

    /**
     * 对应kafka信息存储在 /consumer/groupId/offsets/topic/分区号
     */
    //手动维护偏移量
    kafkaStream = if (childrenCount > 0) {
      //非第一次
      //构建TopicMetadataRequest实例向kafka发送请求,获取TopicMetadataResponse实例得到kafka指定主题的各个分区分布状态
      val req = new TopicMetadataRequest(topics.toList, 0)
      val consumer = new SimpleConsumer("note01", 9092, 10000, 10000, "StreamingOffsetObserver")
      val res = consumer.send(req)
      val option = res.topicsMetadata.headOption
      val partitions = option match {
        case Some(tm) => tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String]
        case None => Map[Int, String]()
      }
      for (partition <- 0 until childrenCount) {
        val partitionOffset = zkClient.readData[String](zkTopicPath + "/" + partition)
        val tp = TopicAndPartition(kafkaParams("kafka.topics"), partition)

        val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
        val consumerMin = new SimpleConsumer(partitions(partition), 9092, 10000, 10000, "getMinOffset")
        val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
        var nextOffset = partitionOffset.toLong
        if (curOffsets.nonEmpty && nextOffset < curOffsets.head) {
          nextOffset = curOffsets.head
        }
        fromOffsets += (tp -> nextOffset)
      }

      val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.key, mam.message)
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams.toMap, fromOffsets, messageHandler)
    } else {
      //没有从头开始消费
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams.toMap, topics)
    }

    /**
     * 用于存储偏移量信息
     */
    var offsetRanges: Array[OffsetRange] = null
    val kafkaInputDStream = kafkaStream.transform {
      rdd => {
        offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd
      }
    }
    val kafkaValues = kafkaInputDStream.map(_._2)

    //处理数据
    val kafkaSplits = kafkaValues.map(_.split(","))
    val kafkaFilters = kafkaSplits.filter(arr => {
      if (arr.length == 3) {
        Try(arr(2).toInt) match {
          case Success(_) if arr(2).toInt > 3 => true
          case _ => false
        }
      } else {
        false
      }
    })

    val results = kafkaFilters.map(_.mkString(","))
    results.foreachRDD(rdd => {
      //在Driver端执行
      rdd.foreachPartition(p => {
        //在Worker端执行
        //如果将输出结果保存到某个数据库,可在此处实例化数据库的连接器
        p.foreach(result => {
          //在Worker端执行,保存到数据库时,在此处使用连接器。
          println(result)
        })
      })
      //ZkUtils不可序列化,所以需要在Driver端执行
      for (o <- offsetRanges) {
        ZkUtils.updatePersistentPath(zkClient, zKGroupTopicDirs.consumerOffsetDir + "/" + {
          o.partition
        }, o.fromOffset.toString)
        println("本次消息消费成功后,偏移量状态:" + o)
      }
    })

    ssc.start()
    ssc.awaitTermination()

  }
}

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐