flink如何设置以每天零点到第二天零点为区间的window进行计算
环境flink1.11.2JAVAstream APItimewindow背景公司之前的指标是以分钟为单位的滚动窗口进行检查,然后在查询系统里查询的时候,对该天所有的分钟数据进行聚合统计。当前需要在flink中添加以天为单位的Job进行额外指标检查。指标出来之后和发现数据口径不一致,flink中默认是timeWindow按天进行滚动统计的数据是每天八点到第二天八点的数据。导致统计指标的含义对不上,
环境
flink1.11.2
JAVA
stream API
timewindow
背景
公司之前的指标是以分钟为单位的滚动窗口进行检查,然后在查询系统里查询的时候,对该天所有的分钟数据进行聚合统计。
当前需要在flink中添加以天为单位的Job进行额外指标检查。指标出来之后和发现数据口径不一致,flink中默认是timeWindow按天进行滚动统计的数据是每天八点到第二天八点的数据。
导致统计指标的含义对不上,没有参考意义和进行不同数据间的join。
解决方案
使用window配置自定义的窗口分隔TumblingEventTimeWindows对象(因为现在处理数据基本都使用的flink eventTime作为数据时间进行处理,所以例子中需要数据流的时间用的是eventtime, 使用processtime的话可以使用TumblingProcessTimeWindows处理,讲道理应该配置都一样)
话不多说直接上代码吧。
默认情况8点->8点的时间统计的代码:
// 原始数据流
DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));
// 进行数据清洗统计的逻辑
DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null)
.map(StreamTransformCommon::renameAppInfoName)
.filter(x -> x != null)
.keyBy("page")
.timeWindow(Time.days(1)) // 默认情况下 以天为单位的滚动窗口
.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());
每天0点->0点的时间窗口统计代码(实际上可以举一反三搞出任意想要的时间的规则):
// 原始数据流
DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));
// 进行数据清洗统计的逻辑
DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null)
.map(StreamTransformCommon::renameAppInfoName)
.filter(x -> x != null)
.keyBy("page")
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16))) // 改改参数,就可以调整到自己想要的时间窗口统计规则
.aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());
结果
大家可以在操作windowFunction的时候打印一下apply方法参数中的TimeWindow对象的起止时间验证一下。我这边屡试不爽,问题解决了记录一下这个过程。
更多推荐
所有评论(0)