我在java 11上使用flink 1.81.1 api,我尝试使用
BroadcastProcessFunction
来过滤产品数据流,并将品牌授权的数据流作为广播。
因此,我的第一个产品 Datastream 包含具有现场品牌的不同产品,而我的第二个品牌 Datastream 仅包含应允许的品牌。
问题是,当我的产品到达
processElement
的 BroadcastProcessFunction
时,brandState 尚未充满品牌数据流记录,例如我的品牌数据流中有 4800 个品牌,但是当产品到达 processElement
时,brandState 仅包含其中的少数几个(例如 200 个品牌),这会引起问题,因为我的产品不会被允许,因为它们的品牌尚未上传到 brandState
这是我的
BroadcastProcessFunction
public class GateCoProcess extends BroadcastProcessFunction<CrawlData, Brand, CrawlData> {
private final MapStateDescriptor<String, Boolean> broadcastStateDescriptor;
public GateCoProcess(MapStateDescriptor<String, Boolean> broadcastStateDescriptor) {
this.broadcastStateDescriptor = broadcastStateDescriptor;
}
@Override
public void processElement(CrawlData value, ReadOnlyContext ctx, Collector<CrawlData> out) throws Exception {
ReadOnlyBroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor);
if (brandState.contains(value.data.product.brand)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(Brand brand, Context ctx, Collector<CrawlData> out) throws Exception {
BroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor);
if (brand.active) {
brandState.put(brand.getName(), true);
}
}
}
这是我的数据流和函数调用
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Brand> brands = env.fromSource(KafkaSources.brandsSource, WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");
MapStateDescriptor<String, Boolean> broadcastStateDescriptor = new MapStateDescriptor<>(
"broadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO);
BroadcastStream<Brand> broadcastStream = brands.broadcast(broadcastStateDescriptor);
// integration is the products Datastream
DataStream<CrawlData> integration = ExtractData.extractProducts(env);
DataStream<CrawlData> filtered = integration.connect(broadcastStream).process(new GateCoProcess(broadcastStateDescriptor));
env.execute("mon job de products");
我应该怎么做才能解决这个问题?谢谢
我尝试使用水印,但没有结果,我的课程没有时间戳
这是冷启动问题,您无法控制来自不同流的记录到达运算符的顺序。
一种简单的方法是在您的工作流程中添加
--coldstart
标志。这假设您在广播状态下正确保存了 brands
数据。
设置后,您可以为
products
数据流使用虚拟源(不生成数据)。启动工作流程,并等待所有 brands
流存储在状态中。然后使用保存点停止工作流程,并从该保存点重新启动它,而不使用 --coldstart
标志。