Flink学习笔记(3) -- Flink API解析
1、Flink DataStreamAPIⅠ、DataStream API 之 Data Sources部分详解 source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。 flink提供了大量的已经实现好的source方法,你也可以自定义source 通过实现s...
1、Flink DataStreamAPI

获取执行环境-Environment
getExecutionEnvironment
创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment – 获取批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment --获取流处理执行环境
如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1。
createLocalEnvironment
返回本地执行环境,需要在调用时指定默认的并行度。
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
createRemoteEnvironment
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
Ⅰ、DataStream API 之 Data Sources部分详解
Source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source;
Flink提供了大量的已经实现好的source方法,你也可以自定义source;
通过实现SourceFunction接口来自定义无并行度的Source,或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source。

(1) 从文件读取数据
val stream2 = env.readTextFile("YOUR_FILE_PATH")
(2) 从集合读取数据
// 定义样例类,传感器id,时间戳,温度
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object Sensor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream1 = env
.fromCollection(List(
SensorReading("sensor_1", 1547718199, 35.8),
SensorReading("sensor_6", 1547718201, 15.4),
SensorReading("sensor_7", 1547718202, 6.7),
SensorReading("sensor_10", 1547718205, 38.1)
))
stream1.print("stream1:").setParallelism(1)
env.execute()
}
}

(3) 以kafka消息队列的数据作为来源
需要引入kafka连接器的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))


具体调用如下:
val stream4 = env.addSource( new MySensorSource() )
class MySensorSource extends SourceFunction[SensorReading]{
// flag: 表示数据源是否还在正常运行
var running: Boolean = true
override def cancel(): Unit = {
running = false
}
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 初始化一个随机数发生器
val rand = new Random()
var curTemp = 1.to(10).map(
i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
)
while(running){
// 更新温度值
curTemp = curTemp.map(
t => (t._1, t._2 + rand.nextGaussian() )
)
// 获取当前时间戳
val curTime = System.currentTimeMillis()
curTemp.foreach(
t => ctx.collect(SensorReading(t._1, curTime, t._2))
)
Thread.sleep(100)
}
}
}
Ⅱ、DataStream API 之 Transformations部分详解

Map
val streamMap = stream.map { x => x * 2 }
FlatMap
**flatMap的函数签名**:
def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]
例如: flatMap(List(1,2,3))(i ⇒ List(i,i)),结果是List(1,1,2,2,3,3);
而List("a b", "c d").flatMap(line ⇒ line.split(" ")),结果是List(a, b, c, d)。
val streamFlatMap = stream.flatMap{
x => x.split(" ")
}
Filter
val streamFilter = stream.filter{
x => x == 1
}
KeyBy

DataStream → KeyedStream:
将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。
滚动聚合算子(Rolling Aggregation)
这些算子可以针对KeyedStream的每一个支流做聚合。
1 sum()
2 min()
3 max()
4 minBy()
5 maxBy()
Reduce
KeyedStream → DataStream:
一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
.map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.keyBy("id")
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )

Split

DataStream → SplitStream:根据某些特征把一个DataStream拆分成两个或者多个DataStream。
Select

SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。
-- 需求:传感器数据按照温度高低(以30度为界),拆分成两个流。
val splitStream = stream2
.split( sensorData => {
if (sensorData.temperature > 30) Seq("high") else Seq("low")
} )
val high = splitStream.select("high")
val low = splitStream.select("low")
val all = splitStream.select("high", "low")
Connect和 CoMap
(1) Connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
(2) CoMap,CoFlatMap

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别进行map和flatMap处理。
val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) )
val connected = warning.connect(low)
val coMap = connected.map(
warningData => (warningData._1, warningData._2, "warning"),
lowData => (lowData.id, "healthy")
)
Union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
//合并以后打印
val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
unionStream.print("union:::")
Connect与 Union 区别:
1. Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的。
2. Connect只能操作两个流,Union可以操作多个。
Trigger
trigger定义了何时开始使用窗口计算函数计算窗口。每个窗口分配器都会有一个默认的Trigger。如果,默认的Trigger不能满足你的需求,你可以指定一个自定义的trigger().
Trigger接口有五个方法允许Trigger对不同的事件做出反应:
onElement():进入窗口的每个元素都会调用该方法。
onEventTime():事件时间timer触发的时候被调用。
onProcessingTime():处理时间timer触发的时候会被调用。
onMerge():有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
clear():该方法主要是执行窗口的删除操作。
关于上述方法需要注意两点:
1).前三方法决定着如何通过返回一个TriggerResult来操作输入事件。
CONTINUE:什么都不做。
FIRE:触发计算。
PURE:清除窗口的元素。
FIRE_AND_PURE:触发计算和清除窗口元素。
2).这些方法中的任何一个都可用于为将来的操作注册处理或事件时间计时器
内置和自定义触发器
Flink内部有一些内置的触发器:
EventTimeTrigger:基于事件时间和watermark机制来对窗口进行触发计算。
ProcessingTimeTrigger:基于处理时间触发。
CountTrigger:窗口元素数超过预先给定的限制值的话会触发计算。
PurgingTrigger作为其它trigger的参数,将其转化为一个purging触发器。
WindowAssigner的默认触发器适用于很多案例。比如,所有基于事件时间的窗口分配器都用EventTimeTrigger作为默认触发器。该触发器会在watermark达到窗口的截止时间时直接触发计算输出。

Ⅲ、DataStream API之Data Sink部分详解
Flink没有类似于Spark中foreach方法,让用户进行迭代的操作。虽有对外的输出操作都要利用Sink完成。最后通过类似如下方式完成整个任务最终输出操作。
stream.addSink(new MySink(xxxx))
官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink。


(1) Kafka
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
主函数中添加sink
val union = high.union(low).map(_.temperature.toString)
union.addSink(new FlinkKafkaProducer011[String]("localhost:9092", "test", new SimpleStringSchema()))
(2) Redis
pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
定义一个redis的mapper类,用于定义保存到redis时调用的命令:
class MyRedisMapper extends RedisMapper[SensorReading]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")
}
override def getValueFromData(t: SensorReading): String = t.temperature.toString
override def getKeyFromData(t: SensorReading): String = t.id
}
在主函数中调用:
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()
dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper) )
(3) Elasticsearch
pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.10.0</version>
</dependency>
在主函数中调用:
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
println("saving data: " + t)
val json = new util.HashMap[String, String]()
json.put("data", t.toString)
val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)
requestIndexer.add(indexRequest)
println("saved successfully")
}
} )
dataStream.addSink( esSinkBuilder.build() )
(4) JDBC 自定义sink
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.44</version>
</dependency>
添加MyJdbcSink
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// open 主要是创建连接
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456")
insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor, temp) VALUES (?, ?)")
updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE sensor = ?")
}
// 调用连接,执行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
if (updateStmt.getUpdateCount == 0) {
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
在main方法中增加,把明细保存到mysql中
dataStream.addSink(new MyJdbcSink())


2、Flink DataSetAPI
Ⅰ、DataSet API之Data Sources部分详解

Ⅱ、DataSet API之Transformations部分详解



Ⅲ、DataSet API之Data sinks部分详解

3、ProcessFunction API(底层API)
普通的transform算子只能获取到当前的数据,或者加上聚合状态;
如果是RichFuction还可以访问生命周期方法、获取运行时上下文、进行状态编程;
但是转换算子和RichFuction是无法访问事件的时间戳信息和水位线信息的。而这在一些应用场景下,极为重要。例如MapFunction这样的map转换算子就无法访问时间戳或者当前事件的事件时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。Process Function用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。
Flink提供了8个Process Function:
• ProcessFunction
• KeyedProcessFunction
• CoProcessFunction
• ProcessJoinFunction
• BroadcastProcessFunction
• KeyedBroadcastProcessFunction
• ProcessWindowFunction
• ProcessAllWindowFunction
KeyedProcessFunction
KeyedProcessFunction用来操作KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:
• processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用结果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务(可以注册定时器)。Context还可以将结果输出到别的流(side outputs)。
• onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出结果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,例如定时器触发的时间信息(事件时间或者处理时间)。
TimerService 和 定时器(Timers)
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
• currentProcessingTime(): Long 返回当前处理时间
• currentWatermark(): Long 返回当前watermark的时间戳
• registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
• registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
• deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
• deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面使用。
下面举个例子说明KeyedProcessFunction如何操作KeyedStream。
需求:监控温度传感器的温度值,如果温度值在一秒钟之内(processing time)连续上升,则报警。
val warnings = readings
.keyBy(_.id)
.process(new TempIncreaseAlertFunction)
看一下TempIncreaseAlertFunction如何实现, 程序中使用了ValueState这样一个状态变量。
class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] {
// 保存上一个传感器温度值,因为需要和之前的温度对比
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
)
// 保存注册的定时器的时间戳,为了方便删除定时器
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
override def processElement(r: SensorReading,
ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
out: Collector[String]): Unit = {
// 取出上一次的温度
val prevTemp = lastTemp.value()
// 将当前温度更新到上一次的温度这个变量中
lastTemp.update(r.temperature)
val curTimerTimestamp = currentTimer.value()
if (prevTemp == 0.0 || r.temperature < prevTemp) {
// 温度下降或者是第一个温度值,删除定时器
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
// 清空状态变量
currentTimer.clear()
} else if (r.temperature > prevTemp && curTimerTimestamp == 0) {
// 温度上升且我们并没有设置定时器
val timerTs = ctx.timerService().currentProcessingTime() + 1000
ctx.timerService().registerProcessingTimeTimer(timerTs)
currentTimer.update(timerTs)
}
}
override def onTimer(ts: Long,
ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
out: Collector[String]): Unit = {
out.collect("传感器id为: " + ctx.getCurrentKey + "的传感器温度值已经连续1s上升了。")
currentTimer.clear()
}
}
侧输出流(SideOutput)
大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。
下面是一个示例程序:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import beans.SensorReading
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector
object SideoutputTest {
//侧输出流
def main(args: Array[String]): Unit = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.setParallelism(1)
val inputstream = environment.socketTextStream("localhost", 7777)
val datastream = inputstream.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
}
)
//用侧输出流实现分流操作
val highTempStream:DataStream[SensorReading] = datastream.process(
new splitTempProcessor(50)
)
val lowTempStream = highTempStream.getSideOutput(new OutputTag[(String,Double,Long)]("lowTemp"))
highTempStream.print("high")
lowTempStream.print("low")
environment.execute("SideoutputTest")
}
}
//自定义process function 用于区分高低温数据
class splitTempProcessor(threshold:Int) extends ProcessFunction[SensorReading,SensorReading] {
override def processElement(i: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit =
{
//判断当前数据的温度值,如果大于阈值则输出到主流否则输出到侧输出流
if(i.temperature>threshold)
{
collector.collect(i)
}
else
{
context.output(new OutputTag[(String,Double,Long)]("lowTemp"),(i.id,i.temperature,i.timestamp))
}
}
}
CoProcessFunction
对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操作。CoProcessFunction提供了操作每一个输入流的方法: processElement1()和processElement2()。
类似于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问事件数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。
4、Flink 支持的DataType和序列化
Flink流应用程序处理的是以数据对象表示的事件流。所以在Flink内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink需要明确知道应用程序所处理的数据类型。Flink使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
Flink还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常见数据类型。使用最广泛的类型有以下几种。


更多推荐
所有评论(0)