我有一个用例,我想在 Flink 上运行 2 个独立的处理流程。 所以 2 个流程看起来像
源 1 -> 运算符 1 -> 接收器 1
Source2 -> 操作员2 -> Sink2
我想为两个流程重复使用相同的 Flink 集群。我可以考虑通过两种方式做到这一点:
1)在同一个 Flink 应用程序上提交 2 个不同的作业
2) 在同一个作业中设置 2 个管道
我能够设置第一个选项,但不确定如何执行第二个选项。 以前有人尝试过这样的设置吗? 一个比另一个有什么优势?
您可以在 setupJob() 方法中简单地创建多个管道(具有单独或共享的源消费者)。这是一个例子:
private void buildPipeline(StreamExecutionEnvironment env, String sourceName, String sinkName) {
DataStream<T> stream = env
.addSource(getInputs().get(sourceName))
.name(sourceName);
stream = stream.filter(evt -> filter());
....
}
@Override
public void setupJob(AthenaFlinkJobConfiguration jobConfig, StreamExecutionEnvironment env) throws Exception {
...
buildPipeline(env, sourceTopic1, sink1, ...);
buildPipeline(env, sourceTopic2, sink2, ...);
...
}
以下是两种方法的快速对比。使用单独作业的优点/缺点:
在单个作业中使用单独管道的好处:
第二种方法可以通过在同一个
StreamExecutionEnvironment
中定义两个独立的管道并仅调用StreamExecutionEnvironment.execute()
一次来实现。
我会使用第一种方法,因为它可以提供更好的隔离。如果发生故障,Flink 会重新启动整个作业。因此,如果您在同一个作业中实现两个管道,则在发生故障时,两个管道都将重置并重新启动。如果您遵循方法一,您也可以独立获取保存点。
请原谅我在评论中提出问题。
我也面临上述场景,我们需要在两个流之间提供隔离。
您能否指导我如何通过简单的步骤实施以下方法
在同一个 Flink 应用程序上提交 2 个不同的作业
在 flink 的 Main 方法中,我有 2 个流和一个 StreamExecutionEnvironment
管道 1 - 源 --> 运算符 --> 接收器
管道 2 - 源头 --> 操作员 --> 接收器
管道 1 中的故障会影响管道 2 并停止整个 flink 应用程序。
现在,如果我想单独运行它们,我该怎么办?