6.7 定时器

6.7.1 定时器API

以keyBy()之后的键控流的定时器为例:

//TODO 定时器 API
//1. 注册定时器
ctx.timerService().registerProcessingTimeTimer();
ctx.timerService().registerEventTimeTimer();

//2. 删除定时器
ctx.timerService().deleteProcessingTimeTimer();
ctx.timerService().deleteEventTimeTimer();

//3. 获取当前时间的进展
ctx.timerService().currentProcessingTime();
ctx.timerService().currentWatermark()
6.7.2 定时器的使用

基于处理时间的定时器 & 基于事件时间的定时器

public class Flink23_Process_Timer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] dats = value.split(",");
                        return new WaterSensor(
                                dats[0],
                                Long.valueOf(dats[1]),
                                Integer.valueOf(dats[2])
                        );
                    }
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                                    @Override
                                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                        return element.getTs() * 1000L;
                                    }
                                })
                );

        sensorDS
                .keyBy(sensor -> sensor.getId())
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //获取的是事件时间
//                        Long timestamp = ctx.timestamp();
                        //TODO 定时器 API
                        //1. 注册定时器
//                        ctx.timerService().registerProcessingTimeTimer();
//                        ctx.timerService().registerEventTimeTimer();

                        //2. 删除定时器
//                        ctx.timerService().deleteProcessingTimeTimer();
//                        ctx.timerService().deleteEventTimeTimer();

                        //3. 获取当前时间的进展
//                        ctx.timerService().currentProcessingTime();
//                        ctx.timerService().currentWatermark()

                        //TODO 定时器使用
                        //TODO 1 处理时间
//                        long processTs = ctx.timerService().currentProcessingTime() + 5000L;
//                        System.out.println("注册定时器,ts=" + processTs);
//                        ctx.timerService().registerProcessingTimeTimer(processTs);
                        //TODO 2 事件时间
                        long eventTs = 5000L;
                        System.out.println("注册定时器,ts=" + eventTs);
                        ctx.timerService().registerEventTimeTimer(eventTs);

                    }

                    /*
                     * @Description //TODO 定时器触发:表示时间进展到了 注册的那个时间
                     * @Param [timestamp, ctx, out] 触发的时间 也就是注册的那个时间, 上下文, 采集器
                     * @return void
                     **/
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        System.out.println("定时器触发了,timestamp = " + timestamp);
                    }
                })
                .print();
        
        env.execute();
    }
}
6.7.3 定时器源码分析
1、定时器注册?
    => 每调用一次 register方法,都会 new一个定时器对象,并把定时器对象,放入一个 队列 里
    => 这个 队列 add 对象,会去重 => 如果重复注册同一个时间的定时器,只会放入一个对象
    => 去重,范围是同一分组,组内去重

2、定时器触发?
    watermark >= 注册的时间
    =>多个分组的多个定时器,都会触发。
    => 为什么sensor_2,9,1,能触发sensor_1分组的数据?
    	因为watermark表示是一个时间进展,它不会分组的,达到之后多个分组的定时器都会触发。

"思考:为什么定时器延迟了 1s触发?"
    因为 watermark = 最大事件时间 - 乱序程度 - 1ms, 减去了 1ms
    例如,乱序程度设置3s,注册了一个 10s的定时器,
    当 eventtime = 13s时, watermark = 13s - 3s -1ms = 9999ms,小于 注册的时间 10s,无法触发。
    只有当 eventtime = 14s, watermark = 14s -3 - 1ms = 10999ms, 大于 注册的时间 10s,可以触发。



eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    => 每调用一次 register方法,都会 new一个定时器对象
    => 把定时器对象,放入一个 队列 里
    => 这个 队列 add对象,会去重 => 如果重复注册同一个时间的定时器,只会放入一个对象

timer.getTimestamp() <= time
    => timer.getTimestamp() 就是定时器注册的时间
    => time = watermark.getTimestamp(),就是 watermark
    => 注册的时间 <= watermark
6.7.4 定时器练习题

监控水位传感器的水位值,如果水位值在五分钟之内(processing time)连续上升,则报警。

public class Flink24_Process_TimerPractice {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env
                .socketTextStream("hadoop102", 9999)
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String value) throws Exception {
                        String[] dats = value.split(",");
                        return new WaterSensor(
                                dats[0],
                                Long.valueOf(dats[1]),
                                Integer.valueOf(dats[2])
                        );
                    }
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                            @Override
                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                return element.getTs() * 1000L;
                            }
                        })
                )
                .keyBy(sensor -> sensor.getId())
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    private long timeTs = 0L;
                    private int lastVC = 0;
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        if (timeTs == 0){
                            //第一条数据来的时候注册一个定时器
                            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000L);
                            timeTs = ctx.timestamp() + 5000L;
                        }
                        //之后来的数据,都与上一条数据比较,判断水位值上升还是下降
                        if (value.getVc() < lastVC){
                            //只要遇到下降,就删除掉定时器
                            ctx.timerService().deleteEventTimeTimer(timeTs);
                            //注册新的定时器
                            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000L);
                            timeTs = ctx.timestamp() + 5000L;
                        }
                        //更新水位值
                        lastVC = value.getVc();
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        //触发后,重置保存的ts,避免影响后面的检测
                        timeTs = 0L;
                        System.out.println("传感器=" + ctx.getCurrentKey() + "在ts=" + timestamp + "监测到水位连续5s上升");
                    }
                })
                .print("result");
        
        env.execute();
    }
}
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐