环境

flink1.11.2
JAVA
stream API

timewindow

背景

公司之前的指标是以分钟为单位的滚动窗口进行检查,然后在查询系统里查询的时候,对该天所有的分钟数据进行聚合统计。

 当前需要在flink中添加以天为单位的Job进行额外指标检查。指标出来之后和发现数据口径不一致,flink中默认是timeWindow按天进行滚动统计的数据是每天八点到第二天八点的数据

导致统计指标的含义对不上,没有参考意义和进行不同数据间的join。

解决方案

使用window配置自定义的窗口分隔TumblingEventTimeWindows对象(因为现在处理数据基本都使用的flink  eventTime作为数据时间进行处理,所以例子中需要数据流的时间用的是eventtime, 使用processtime的话可以使用TumblingProcessTimeWindows处理,讲道理应该配置都一样)

话不多说直接上代码吧。

默认情况8点->8点的时间统计的代码:

// 原始数据流
DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));


// 进行数据清洗统计的逻辑
DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null)
                .map(StreamTransformCommon::renameAppInfoName)
                .filter(x -> x != null)
                .keyBy("page")
                .timeWindow(Time.days(1))    // 默认情况下 以天为单位的滚动窗口
                .aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());

每天0点->0点的时间窗口统计代码(实际上可以举一反三搞出任意想要的时间的规则):

// 原始数据流
DataStream<RawDataEvent> gyhUserRegisterStream = StreamTransformCommon.preprocessingLogData(rawDataStreamMap.get("gyhUserRegister"));


// 进行数据清洗统计的逻辑
DataStream<Object> targetData = rawWebLogData.filter(x -> x.userId != null)
                .map(StreamTransformCommon::renameAppInfoName)
                .filter(x -> x != null)
                .keyBy("page")
                .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))  // 改改参数,就可以调整到自己想要的时间窗口统计规则
                .aggregate(new StreamTransformCommon.CountAgg(), new StreamTransformCommon.WindowResultFuction());

结果

大家可以在操作windowFunction的时候打印一下apply方法参数中的TimeWindow对象的起止时间验证一下。我这边屡试不爽,问题解决了记录一下这个过程。

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐