第六章 Flink 中的 Window

6.1 Window

6.1.1 Window 概述

在这里插入图片描述
桶的概念,不是批处理

6.1.2 Window 类型

在这里插入图片描述
时间窗口的设置是左闭右开[ ),及包含开时间进入窗口的数据不包含结束时间进入窗口的数据
在这里插入图片描述

1. 滚动窗口(Tumbling Windows)

在这里插入图片描述

2. 滑动窗口(Sliding Windows)

在这里插入图片描述
在这里插入图片描述

3. 会话窗口(Session Windows)

在这里插入图片描述

在这里插入图片描述

6.2 Window API

在这里插入图片描述

6.2.1 TimeWindow

在这里插入图片描述

        DataStream<Integer> resultStream = dataStream.keyBy("id")
                //滑动窗口的简写方式
                .timeWindow(Time.seconds(15),Time.seconds(10))
                //滚动窗口的简写方式
               // .timeWindow(Time.seconds(15))

在这里插入图片描述
在这里插入图片描述

1. 滚动窗口

在这里插入图片描述

        DataStream<Tuple2<String, Double>> minTempPerWindowStream = dataStream
                .map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
                    @Override
                    public Tuple2<String, Double> map(SensorReading value) throws
                            Exception {
                        return new Tuple2<>(value.getId(), value.getTemperature());
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow(Time.seconds(15))
                .minBy(1);

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其
中的一个来指定。

2. 滑动窗口(SlidingEventTimeWindows)

在这里插入图片描述


        DataStream<SensorReading> minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId)
                .timeWindow( Time.seconds(15), Time.seconds(5) )
                .minBy("temperature");

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其
中的一个来指定。

6.2.2 CountWindow

在这里插入图片描述

1 滚动窗口

在这里插入图片描述

2 滑动窗口

在这里插入图片描述

6.2.3 window function

之前讲的其他聚合转换入sum,reduce都是有状态的属于增量聚合。
增量聚合是来一个计算一个,对窗口中的计算结果有状态,只是不输出,实时性相对于全窗口函数跟高。
全量聚合函数,更灵活些可以获取更多信息,可以自定义输出的类型
在这里插入图片描述

6.2.4 其它可选 API

在这里插入图片描述
在这里插入图片描述

package com.atguigu.apitest.window;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.window
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/9 14:37
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.util.Arrays;
import java.util.Collections;

/**
 * @ClassName: WindowTest1_TimeWindow
 * @Description:
 * @Author: wushengran on 2020/11/9 14:37
 * @Version: 1.0
 */
public class WindowTest1_TimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        // 从文件读取数据
//        DataStream<String> inputStream = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt");

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 开窗测试

        // 1. 增量聚合函数
        DataStream<Integer> resultStream = dataStream.keyBy("id")
//                .countWindow(10, 2);
//                .window(EventTimeSessionWindows.withGap(Time.minutes(1)));
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
                //滑动窗口的简写方式
                //.timeWindow(Time.seconds(15),Time.seconds(10))
                //滚动窗口的简写方式
                .timeWindow(Time.seconds(15))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                });

        // 2. 全窗口函数
        SingleOutputStreamOperator<Tuple3<String, Long, Integer>> resultStream2 = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
//                .process(new ProcessWindowFunction<SensorReading, Object, Tuple, TimeWindow>() {
//                })
                .apply(new WindowFunction<SensorReading, Tuple3<String, Long, Integer>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<SensorReading> input, Collector<Tuple3<String, Long, Integer>> out) throws Exception {
                        String id = tuple.getField(0);
                        Long windowEnd = window.getEnd();
                        Integer count = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id, windowEnd, count));
                    }
                });

        // 3. 其它可选API
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
        };

        SingleOutputStreamOperator<SensorReading> sumStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
//                .trigger()
//                .evictor()
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .sum("temperature");

        sumStream.getSideOutput(outputTag).print("late");

        resultStream2.print();


        env.execute();


    }
}

在这里插入图片描述

第七章 时间语义与 Wartermark

7.1 Flink 中的时间语义

在这里插入图片描述
在这里插入图片描述

7.2 EventTime 的引入

在这里插入图片描述

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

7.3 Watermark(水位线)

7.3.1 基本概念

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
watermark 的特点

在这里插入图片描述

watermark 的传递
watermark往下游是以广播的方式传递。
以最小的watermark为准。
在这里插入图片描述

在代码中设置 Event Time

在这里插入图片描述
执行环境指定为事件时间语义,及窗口的关闭会根据事件时间关闭
在这里插入图片描述

7.3.2 Watermark 的引入

watermark 的引入很简单,对于乱序数据,最常见的引用方式如下

前提是要指定执行环境指定为事件时间语义
在这里插入图片描述
代码中watermark 的引入,指定每条消息的时间时间
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

package com.atguigu.apitest.window;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.window
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/10 9:33
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

/**
 * @ClassName: WindowTest3_EventTimeWindow
 * @Description:
 * @Author: wushengran on 2020/11/10 9:33
 * @Version: 1.0
 */
public class WindowTest3_EventTimeWindow {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(100);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型,分配时间戳和watermark
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        })
                // 升序数据设置事件时间和watermark
//                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
//                    @Override
//                    public long extractAscendingTimestamp(SensorReading element) {
//                        return element.getTimestamp() * 1000L;
//                    }
//                })
                // 乱序数据设置时间戳和watermark
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading element) {
                        return element.getTimestamp() * 1000L;
                    }
                });
		//测输出流
        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
        };

        // 基于事件时间的开窗聚合,统计15秒内温度的最小值
        SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .minBy("temperature");

        minTempStream.print("minTemp");
        minTempStream.getSideOutput(outputTag).print("late");

        env.execute();
    }
}

第九章 Flink 状态管理和容错机制

9.2 状态一致性

在这里插入图片描述

9.2.1 一致性级别

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
幂等写入(Idempotent Writes):
在这里插入图片描述
事务写入(Transactional Writes):
在这里插入图片描述
预写日志(Write-Ahead-Log,WAL)

先把数据当成日志先保存,等checkpoint完成后再一次性写入sink系统,缺点是需要等待相当于一批一批的写入了。
在这里插入图片描述
两阶段提交(Two-Phase-Commit,2PC)
在这里插入图片描述
2PC 对外部 sink 系统的要求
在这里插入图片描述

9.2.2 端到端(end-to-end)状态一致性

Flink 状态管理

在这里插入图片描述

Flink 中的状态:
• 算子状态(Operatior State)
• 键控状态(Keyed State)针对每一个key有一个状态
• 状态后端(State Backends)

要想使用状态,必须先向flink注册状态,来告诉flink我有一个状态需要你帮忙保存。
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

9.3.1 算子状态(operator state)

在这里插入图片描述
在这里插入图片描述

package com.atguigu.apitest.state;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.state
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/10 15:30
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Collections;
import java.util.List;

/**
 * @ClassName: StateTest1_OperatorState
 * @Description:
 * @Author: wushengran on 2020/11/10 15:30
 * @Version: 1.0
 */
public class StateTest1_OperatorState {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义一个有状态的map操作,统计当前分区数据个数
        SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());

        resultStream.print();

        env.execute();
    }

    // 自定义MapFunction
    //实现ListCheckpointed接口来保存和重启读取状态值
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{
        // 定义一个本地变量,作为算子状态,只是一个本地内存值,如果不保存到状态中故障重启后状态值就不存在了
        private Integer count = 0;

        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }

        //保存状态值
        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }

        //发生故障的启后从状态读取之前保存的值
        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for( Integer num: state )
                count += num;
        }
    }
}

9.3.2 键控状态(keyed state)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

package com.atguigu.apitest.state;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.state
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/10 15:49
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @ClassName: StateTest2_KeyedState
 * @Description:
 * @Author: wushengran on 2020/11/10 15:49
 * @Version: 1.0
 */
public class StateTest2_KeyedState {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义一个有状态的map操作,统计当前sensor数据个数
        SingleOutputStreamOperator<Integer> resultStream = dataStream
                .keyBy("id")
                .map( new MyKeyCountMapper() );

        resultStream.print();

        env.execute();
    }

    // 自定义RichMapFunction
    public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer>{
        private ValueState<Integer> keyCountState;

        // 其它类型状态的声明
        private ListState<String> myListState;
        private MapState<String, Double> myMapState;
        private ReducingState<SensorReading> myReducingState;

        @Override
        public void open(Configuration parameters) throws Exception {
            keyCountState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("key-count", Integer.class, 0));

            myListState = getRuntimeContext().getListState(new ListStateDescriptor<String>("my-list", String.class));
            myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("my-map", String.class, Double.class));
//            myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>())
        }

        @Override
        public Integer map(SensorReading value) throws Exception {
            // 其它状态API调用
            // list state
            for(String str: myListState.get()){
                System.out.println(str);
            }
            myListState.add("hello");

            // map state
            myMapState.get("1");
            myMapState.put("2", 12.3);
            myMapState.remove("2");
            //清空状态
            myMapState.clear();

            // reducing state
            // myReducingState.add(value);


            //Value State
            Integer count = keyCountState.value();
            count++;
            keyCountState.update(count);
            return count;
        }
    }
}

检测温度跳变输出报警实例

我们可以利用 Keyed State,实现这样一个需求:检测传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警。

package com.atguigu.apitest.state;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.state
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/10 16:33
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @ClassName: StateTest3_KeyedStateApplicationCase
 * @Description:
 * @Author: wushengran on 2020/11/10 16:33
 * @Version: 1.0
 */
public class StateTest3_KeyedStateApplicationCase {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义一个flatmap操作,检测温度跳变,输出报警
        SingleOutputStreamOperator<Tuple3<String, Double, Double>> resultStream = dataStream.keyBy("id")
                .flatMap(new TempChangeWarning(10.0));

        resultStream.print();

        env.execute();
    }

    // 实现自定义函数类
    public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>>{
        // 私有属性,温度跳变阈值
        private Double threshold;

        public TempChangeWarning(Double threshold) {
            this.threshold = threshold;
        }

        // 定义状态,保存上一次的温度值
        private ValueState<Double> lastTempState;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class));
        }

        @Override
        public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
            // 获取状态
            Double lastTemp = lastTempState.value();

            // 如果状态不为null,那么就判断两次温度差值
            if( lastTemp != null ){
                Double diff = Math.abs( value.getTemperature() - lastTemp );
                if( diff >= threshold )
                    out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
            }

            // 更新状态
            lastTempState.update(value.getTemperature());
        }

        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

9.3.3状态后端(State Backends)

保存状态
在这里插入图片描述
状态后端保存方式(state backend):
在这里插入图片描述
设置状态后端为 FsStateBackend,并配置检查点和重启策略:

package com.atguigu.apitest.state;/**
 * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved
 * <p>
 * Project: FlinkTutorial
 * Package: com.atguigu.apitest.state
 * Version: 1.0
 * <p>
 * Created by wushengran on 2020/11/11 9:30
 */

import com.atguigu.apitest.beans.SensorReading;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @ClassName: StateTest4_FaultTolerance
 * @Description:
 * @Author: wushengran on 2020/11/11 9:30
 * @Version: 1.0
 */
public class StateTest4_FaultTolerance {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1. 状态后端配置
        env.setStateBackend( new MemoryStateBackend());
        env.setStateBackend( new FsStateBackend(""));
        env.setStateBackend( new RocksDBStateBackend(""));

        // 2. 检查点配置
        env.enableCheckpointing(300);

        // 高级选项
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);

        // 3. 重启策略配置
        // 固定延迟重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
        // 失败率重启
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        dataStream.print();
        env.execute();
    }
}

Flink 的容错机制

• 一致性检查点(checkpoint)
• 从检查点恢复状态
• Flink 检查点算法
• 保存点(save points)

9.3 检查点(checkpoint)

在这里插入图片描述
在这里插入图片描述
从检查点恢复状态在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

检查点的实现算法

在这里插入图片描述
Flink 检查点算法:
核心思想和步骤:
checkpoink线程发起checkpoint就是,往进入flink的数据流中插入一个特殊的数据barrier,source处理数据处理到checkpoink线程插入的barrier后就知道从barrier之前的数据做本次的状态的checkpoink保存, ,后面transition处理数据处理到checkpoink线程插入的barrier后就知
道从barrier之前的数据做本次的计算状态的checkpoink保存, sink处理到checkpoink线程barrier也是从barrier之前的数据做本次的计算状态的checkpoink保存。

每次状态checkpoink都是保存到对应的barrier中。
在这里插入图片描述

source
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
transition
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
slink
在这里插入图片描述

9.3.2 Flink+Kafka 如何实现端到端的 exactly-once 语义

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

9.4 保存点(Savepoints)

保存点和checkpoint很类似,如果没有开启checkpoint重启和升级时可以设置保存点,启动从保存点恢复之前计算的状态。
在这里插入图片描述

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐