Flink定时器
6.7 定时器6.7.1 定时器API以keyBy()之后的键控流的定时器为例://TODO 定时器 API//1. 注册定时器ctx.timerService().registerProcessingTimeTimer();ctx.timerService().registerEventTimeTimer();//2. 删除定时器ctx.timerService().deleteProcessi
·
6.7 定时器
6.7.1 定时器API
以keyBy()之后的键控流的定时器为例:
//TODO 定时器 API
//1. 注册定时器
ctx.timerService().registerProcessingTimeTimer();
ctx.timerService().registerEventTimeTimer();
//2. 删除定时器
ctx.timerService().deleteProcessingTimeTimer();
ctx.timerService().deleteEventTimeTimer();
//3. 获取当前时间的进展
ctx.timerService().currentProcessingTime();
ctx.timerService().currentWatermark()
6.7.2 定时器的使用
基于处理时间的定时器 & 基于事件时间的定时器
public class Flink23_Process_Timer {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] dats = value.split(",");
return new WaterSensor(
dats[0],
Long.valueOf(dats[1]),
Integer.valueOf(dats[2])
);
}
})
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000L;
}
})
);
sensorDS
.keyBy(sensor -> sensor.getId())
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//获取的是事件时间
// Long timestamp = ctx.timestamp();
//TODO 定时器 API
//1. 注册定时器
// ctx.timerService().registerProcessingTimeTimer();
// ctx.timerService().registerEventTimeTimer();
//2. 删除定时器
// ctx.timerService().deleteProcessingTimeTimer();
// ctx.timerService().deleteEventTimeTimer();
//3. 获取当前时间的进展
// ctx.timerService().currentProcessingTime();
// ctx.timerService().currentWatermark()
//TODO 定时器使用
//TODO 1 处理时间
// long processTs = ctx.timerService().currentProcessingTime() + 5000L;
// System.out.println("注册定时器,ts=" + processTs);
// ctx.timerService().registerProcessingTimeTimer(processTs);
//TODO 2 事件时间
long eventTs = 5000L;
System.out.println("注册定时器,ts=" + eventTs);
ctx.timerService().registerEventTimeTimer(eventTs);
}
/*
* @Description //TODO 定时器触发:表示时间进展到了 注册的那个时间
* @Param [timestamp, ctx, out] 触发的时间 也就是注册的那个时间, 上下文, 采集器
* @return void
**/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println("定时器触发了,timestamp = " + timestamp);
}
})
.print();
env.execute();
}
}
6.7.3 定时器源码分析
1、定时器注册?
=> 每调用一次 register方法,都会 new一个定时器对象,并把定时器对象,放入一个 队列 里
=> 这个 队列 add 对象,会去重 => 如果重复注册同一个时间的定时器,只会放入一个对象
=> 去重,范围是同一分组,组内去重
2、定时器触发?
watermark >= 注册的时间
=>多个分组的多个定时器,都会触发。
=> 为什么sensor_2,9,1,能触发sensor_1分组的数据?
因为watermark表示是一个时间进展,它不会分组的,达到之后多个分组的定时器都会触发。
"思考:为什么定时器延迟了 1s触发?"
因为 watermark = 最大事件时间 - 乱序程度 - 1ms, 减去了 1ms
例如,乱序程度设置3s,注册了一个 10s的定时器,
当 eventtime = 13s时, watermark = 13s - 3s -1ms = 9999ms,小于 注册的时间 10s,无法触发。
只有当 eventtime = 14s, watermark = 14s -3 - 1ms = 10999ms, 大于 注册的时间 10s,可以触发。
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
=> 每调用一次 register方法,都会 new一个定时器对象
=> 把定时器对象,放入一个 队列 里
=> 这个 队列 add对象,会去重 => 如果重复注册同一个时间的定时器,只会放入一个对象
timer.getTimestamp() <= time
=> timer.getTimestamp() 就是定时器注册的时间
=> time = watermark.getTimestamp(),就是 watermark
=> 注册的时间 <= watermark
6.7.4 定时器练习题
监控水位传感器的水位值,如果水位值在五分钟之内(processing time)连续上升,则报警。
public class Flink24_Process_TimerPractice {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("hadoop102", 9999)
.map(new MapFunction<String, WaterSensor>() {
@Override
public WaterSensor map(String value) throws Exception {
String[] dats = value.split(",");
return new WaterSensor(
dats[0],
Long.valueOf(dats[1]),
Integer.valueOf(dats[2])
);
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000L;
}
})
)
.keyBy(sensor -> sensor.getId())
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private long timeTs = 0L;
private int lastVC = 0;
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
if (timeTs == 0){
//第一条数据来的时候注册一个定时器
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000L);
timeTs = ctx.timestamp() + 5000L;
}
//之后来的数据,都与上一条数据比较,判断水位值上升还是下降
if (value.getVc() < lastVC){
//只要遇到下降,就删除掉定时器
ctx.timerService().deleteEventTimeTimer(timeTs);
//注册新的定时器
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000L);
timeTs = ctx.timestamp() + 5000L;
}
//更新水位值
lastVC = value.getVc();
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//触发后,重置保存的ts,避免影响后面的检测
timeTs = 0L;
System.out.println("传感器=" + ctx.getCurrentKey() + "在ts=" + timestamp + "监测到水位连续5s上升");
}
})
.print("result");
env.execute();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)