如何在broadcastProcessFunction flink中同步数据流?

问题描述 投票:0回答:1

我在java 11上使用flink 1.81.1 api,我尝试使用

BroadcastProcessFunction
来过滤产品数据流,并将品牌授权的数据流作为广播。

因此,我的第一个产品 Datastream 包含具有现场品牌的不同产品,而我的第二个品牌 Datastream 仅包含应允许的品牌。

问题是,当我的产品到达

processElement
BroadcastProcessFunction
时,brandState 尚未充满品牌数据流记录,例如我的品牌数据流中有 4800 个品牌,但是当产品到达
processElement
时,brandState 仅包含其中的少数几个(例如 200 个品牌),这会引起问题,因为我的产品不会被允许,因为它们的品牌尚未上传到
brandState

这是我的

BroadcastProcessFunction

public class GateCoProcess extends BroadcastProcessFunction<CrawlData, Brand, CrawlData> {
    private final MapStateDescriptor<String, Boolean> broadcastStateDescriptor;


    public GateCoProcess(MapStateDescriptor<String, Boolean> broadcastStateDescriptor) {
        this.broadcastStateDescriptor = broadcastStateDescriptor;

    }
    @Override
    public void processElement(CrawlData value, ReadOnlyContext ctx, Collector<CrawlData> out) throws Exception {
        ReadOnlyBroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor);

        if (brandState.contains(value.data.product.brand)) {
            out.collect(value);
        }
    }
    @Override
    public void processBroadcastElement(Brand brand, Context ctx, Collector<CrawlData> out) throws Exception {
        BroadcastState<String, Boolean> brandState = ctx.getBroadcastState(broadcastStateDescriptor);
        if (brand.active) {
            brandState.put(brand.getName(), true);
        }
    }
}

这是我的数据流和函数调用

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Brand> brands = env.fromSource(KafkaSources.brandsSource, WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");

MapStateDescriptor<String, Boolean> broadcastStateDescriptor = new MapStateDescriptor<>(
                "broadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.BOOLEAN_TYPE_INFO);

BroadcastStream<Brand> broadcastStream = brands.broadcast(broadcastStateDescriptor);

// integration is the products Datastream
DataStream<CrawlData> integration = ExtractData.extractProducts(env);

DataStream<CrawlData> filtered = integration.connect(broadcastStream).process(new  GateCoProcess(broadcastStateDescriptor));

env.execute("mon job de products");


我应该怎么做才能解决这个问题?谢谢

我尝试使用水印,但没有结果,我的课程没有时间戳

java apache-flink data-processing
1个回答
0
投票

这是冷启动问题,您无法控制来自不同流的记录到达运算符的顺序。

一种简单的方法是在您的工作流程中添加

--coldstart
标志。这假设您在广播状态下正确保存了
brands
数据。

设置后,您可以为

products
数据流使用虚拟源(不生成数据)。启动工作流程,并等待所有
brands
流存储在状态中。然后使用保存点停止工作流程,并从该保存点重新启动它,而不使用
--coldstart
标志。

© www.soinside.com 2019 - 2024. All rights reserved.