我们的应用场景是分析用户使用手机App的行为,描述如下所示:
1、手机客户端会收集用户的行为事件(我们以点击事件为例),将数据发送到数据服务器,我们假设这里直接进入到Kafka消息队列
2、后端的实时服务会从Kafka消费数据,将数据读出来并进行实时分析,这里选择Spark Streaming
3、经过Spark Streaming实时计算程序分析,将结果写入Redis
本例子是采用模拟的kafka生产json的数据,通过spark进行消费,然后将结果保存至redis中。其中spark的streaming实现有两种方式,即有receiver和没有receiver的两种。
在开始本例子之前需要确保以下几件事情:
1、redis是通过授权的,且密码是admin(可修改)。
//admin是密码
/usr/local/redis/bin/redis-cli -a admin
2、确保zookeeper是服务正常的且端口是默认的,或者修改本实例代码中的端口即可
3、确保kafka的服务是正常的且端口是默认的,或者修改本实例代码中的端口即可
运行环境:
centos7
jdk1.8
kafka: kafka_2.11-0.8.2.2
spark :spark-2.2.0-bin-hadoop2.7
scala :2.11.8
redis :redis-4.0.1
以下正文开始:
一、生产kafka数据
/**
* Created by Administrator on 2017/9/13.
* kafka生产者用来模拟向Kafka实时写入用户行为的事件数据,数据是JSON格式
* 一个事件包含4个字段:
*   1、uid:用户编号
*   2、event_time:事件发生时间戳
*   3、os_type:手机App操作系统类型
*   4、click_count:点击次数
*/
object KafkaEventProducer {

private val users = Array(
"user_01", "user_02", "user_03", "user_04", "user_05", "user_06","user_06", "user_08", "user_09","user_10")

private val random = new Random()
private var pointer = -1
def getUserID(): String = {
pointer = pointer + 1
if (pointer >= users.length) {
pointer = 0
users(pointer)
} else {
users(pointer)
}
}
def click(): Double = {
random.nextInt(10)
}
def main(args: Array[String]): Unit = {
val topic = "kafka_spark_redis_T"
//kafka集群
val brokers = "hadoop2:9092,hadoop3:9092,hadoop4:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
//可以不要
props.put("group.id", "sparkTest")

val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)

while (true) {
// prepare event data
val event = new JSONObject()
event
.put("uid", getUserID)
.put("event_time", System.currentTimeMillis.toString)
.put("os_type", "ios")
.put("click_count", click)
// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
//control produce rate
Thread.sleep(200)
}
}
}

二、建立redis连接池

object RedisClient extends Serializable {
val redisHost = "hadoop4"//redis服务器
val redisPort = 6379
val redisTimeout = 30000
val MAX_ACTIVE: Int = 1024
val MAX_IDLE: Int = 200
val MAX_WAIT: Int = 10000
val TEST_ON_BORROW: Boolean = true
val AUTH = "admin"//授权密码

val config: JedisPoolConfig = new JedisPoolConfig
config.setMaxTotal(MAX_ACTIVE)
config.setMaxIdle(MAX_IDLE)
config.setMaxWaitMillis(MAX_WAIT)
config.setTestOnBorrow(TEST_ON_BORROW)
lazy val pool = new JedisPool(config, redisHost, redisPort, redisTimeout, AUTH)
lazy val hook = new Thread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}
三、spark streaming消费数据,并存往redis中。
1、采用spark streaming中的receiver的方式,KafkaUtils.createStream
object Ka_spark_redis {

def main(args: Array[String]): Unit = {
val topics = "kafka_spark_redis_T"//与produce中的topics相对应
val numThreads = 3

val zkQuorum = "hadoop2:2181"//zookeeper地址,可以是集群
val group = "spaekTest"//与produce中的group相对应
val sparkConf = new SparkConf().setAppName("Ka_spark_redis_T") .setMaster("local[2]")
Logger.getLogger("spark").setLevel(Level.WARN)
val ssc = new StreamingContext(sparkConf, Seconds(5))
val clickHashKey = "app_users_click"//redis中Hash的名字,存储的格式<k,v>

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)
//{"uid":"user_02",
// "event_time":"1505270531256",
// "os_type":"Android",
// "click_count":4}
val events = data.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})
// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}

2、采用spark streaming中的no receiver的方式,KafkaUtils.createDirectStream

object UserClickCountAnalytics {
def main(args: Array[String]): Unit = {
var masterUrl = "local[1]"
// Create a StreamingContext with the given master URL
val conf = new SparkConf().setMaster(masterUrl).setAppName("UserClickCountStat")
val ssc = new StreamingContext(conf, Seconds(5))
// Kafka configurations
val topics = Set("kafka_spark_redis_T")
val brokers = "hadoop2:9092,hadoop3:9092,hadoop4:9092"
val groupId = "sparkTest"
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"serializer.class" -> "kafka.serializer.StringEncoder"
)
val clickHashKey = "app_users_click"
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val events = kafkaStream.flatMap(line => {
val data = JSONObject.fromObject(line._2)
Some(data)
})

// Compute user click times
val userClicks = events.map(x => (x.getString("uid"), x.getInt("click_count"))).reduceByKey(_ + _)
userClicks.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
partitionOfRecords.foreach(pair => {
val uid = pair._1
val clickCount = pair._2
val jedis = RedisClient.pool.getResource
jedis.hincrBy(clickHashKey, uid, clickCount)
RedisClient.pool.returnResource(jedis)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
四、运行环境依赖
本实例依赖spark和scala,版本已经在上面有列出,以下是部分依赖。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.0</version>
</dependency>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.3.8</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.43</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.2</version>
</dependency>

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐