我有一个 Spark 流应用程序。它需要一批记录并对记录执行多个映射函数。
当少数记录在 .map 阶段失败时,我希望能够知道失败的原始 id/记录,以便我可以将其放在副线中并稍后重试。有人可以建议围绕这个的模式吗?在每个 .map 函数上放置 try catch 听起来不太可行(如果这是唯一的方法,我应该尝试减少 .map 函数吗?)
由于 Storm 和 Flink 等其他流框架都在创纪录的水平上工作,因此在那里实现侧线会更容易吗?
使用 Flink,您将使用 ProcessFunction 而不是映射,并将可能失败的转换包装在 try catch 中。然后,导致异常的记录被发送到侧面输出。
它看起来像这样:
final OutputTag<String> errors = new OutputTag<>("errors") {};
final SingleOutputStreamOperator<Event> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Source")
.process(
new ProcessFunction<>() {
@Override
public void processElement(
String value,
ProcessFunction<String, Event>.Context ctx,
Collector<Event> out) {
final Event transformed;
try {
transformed = myTransformation(value);
} catch (IOException e) {
ctx.output(errors, value);
return;
}
out.collect(transformed);
}
});