flink DataStream API(四)状态和容错-广播状态模式
文章目录广播状态模式提供的APIBroadcastProcessFunction 和 KeyedBroadcastProcessFunction重要的注意事项广播状态模式在本节中,您将了解如何在实践中使用广播状态。请参阅有 Stateful Stream Processing 以了解有状态流处理背后的概念。提供的API为了展示所提供的 API,在展示它们的全部功能之前,我们将从一个示例开始,然后再
广播状态模式
在本节中,您将了解如何在实践中使用广播状态。请参阅有 Stateful Stream Processing 以了解有状态流处理背后的概念。
提供的API
为了展示所提供的 API,在展示它们的全部功能之前,我们将从一个示例开始,然后再展示它们的完整功能。
在此示例中,第一个流将包含具有 Color
和 Shape
属性的 Item
类型的元素。另一个流将包含规则。作为我们的运行示例,我们将使用这样一种场景:我们有一个不同颜色和形状的对象流,我们想要找到遵循特定模式的一组相同颜色的对象,例如一个矩形后跟一个三角形。
从项目流开始,我们只需要按颜色键控它,因为我们需要一组相同颜色的对象。这将确保相同颜色的元素最终出现在同一台物理机器上。
// key the items by color
KeyedStream<Item, Color> colorPartitionedStream = itemStream
.keyBy(new KeySelector<Item, Color>(){...});
广播流应该被广播到下游的所有任务中,并且这些任务将他们存储在本地,以便通过这些规则对传入的item
进行评估
下面的代码片段将
- 使用提供的
MapStateDescriptor
,它将创建将存储规则的广播状态。 - 广播规则流
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
最后,为了对来自Item流的传入元素应用规则,我们需要:
- 连接两个流
- 指定我们的匹配检测逻辑。
通过调用非广播流的connect(BroadcastStream)
方法将一个流与BroadcastStream
连接。这将返回一个BroadcastConnectedStream
,我们可以该流上的process()
。函数将包含我们的匹配逻辑:
- 如果是键控,则该函数是 KeyedBroadcastProcessFunction。
- 如果它是非键控的,则该函数是一个 BroadcastProcessFunction。
鉴于我们的非广播流是键控的,以下代码段包含上述调用:
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// my matching logic
}
);
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
与CoProcessFunction
一样,这些函数有两种处理方法要实现;processBroadcastElement()
负责处理广播流中的传入元素,processElement()
用于非广播流。这些方法的完整签名如下所示:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
首先要注意的是,这两个函数都需要实现 processBroadcastElement()
方法来处理广播端的元素,以及 processElement()
来处理非广播端的元素。
这两个方法在提供它们的上下文中有所不同。非广播端有一个ReadOnlyContext
,而广播端有一个Context
。
这两个上下文(ctx在下面的枚举中):
- 访问广播状态:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 允许查询元素的时间戳:
ctx.timestamp()
- 获取当前水印:
ctx.currentWatermark()
- 获取当前处理时间:
ctx.currentProcessingTime()
- 向侧输出输出元素:
ctx.output(OutputTag<X> outputTag, X value)。
getBroadcastState()
中的 stateDescriptor
应该与上面的 .broadcast(ruleStateDescriptor)
中的 stateDescriptor
相同。
不同之处在于它们对广播状态的访问类型不同。广播端对其具有读写访问权限,而非广播端具有只读访问权限。原因在于在flink中不能进行跨任务的通信。因此为了保证广播状态中的内容在所有并行的算子实例中都是相同的,非广播端只能对广播有读权限。
注意:processBroadcastElement()
中实现的逻辑必须在所有并行实例中具有相同的确定性行为!
最后,由于KeyedBroadcastProcessFunction
在keyed
流上运行,因此它公开了一些BroadcastProcessFunction
无法使用的功能。即:
-
processElement()
方法中的ReadOnlyContext提供对Flink底层计时器服务的访问,该服务允许注册事件和/或处理时间定时器。当定时器触发时,onTimer()
(如上所示)被OnTimerContext
调用,它暴露了与ReadOnlyContext plus
相同的功能 -
具有询问触发的定时器是事件时间还是处理时间的能力
-
可以查询与定时器关联的键。
-
processBroadcastElement()
方法中的Context
包 含applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)
方法。这个方法允许注册一个KeyedStateFunction
,这个被应用到与所提供的stateDescriptor
关联的所有键的所有状态。
注意:只能在KeyedBroadcastProcessFunction
的processElement()
处注册定时器,并且只能在该位置注册定时器。这在processBroadcastElement()
方法中是不可能的,因为没有与广播元素关联的键。
回到我们最初的例子,我们的 KeyedBroadcastProcessFunction
可能如下所示:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要的注意事项
在描述了提供的 API 之后,本节重点介绍使用广播状态时要记住的重要事项。这些是:
-
不能跨任务通信:如前所述,这就是为什么只有
(Keyed)-BroadcastProcessFunction
的广播端才能修改广播状态的内容。此外,用户必须确保所有任务对每个传入的元素都以相同的方式修改广播状态的内容。否则,不同的任务可能有不同的内容,导致结果不一致。 -
广播状态中的事件顺序可能因任务而异:尽管广播流的元素可以保证所有元素(最终)都将进入所有下游任务,但元素到达每个任务的顺序可能不同,因此,每个传入元素状态的更新不能依赖于传入事件的顺序。
-
所有任务都
checkpoint
其广播状态:尽管在检查点发生时所有任务的广播状态中具有相同的元素,但所有任务都checkpoint
其广播状态。这是一个设计方案,以避免在还恢复期间从同一文件中读取所有任务(从而避免热点问题),尽管这样做的代价是将检查点状态的大小增加了 p 倍(等于并行度)。Flink 保证在restoring/rescaling
时不会有重复和丢失数据。如果以相同或更小的并行度进行恢复,每个任务都会读取其检查点状态。 -
没有 RocksDB 状态后端:广播状态在运行时保存在内存中,并应相应地进行内存配置。这适用于所有算子状态。
更多推荐
所有评论(0)