我在Apache-Beam管道中设置了一个缓慢变化的查找映射。它不断更新查找映射。对于查找映射中的每个键,我使用累积模式检索全局窗口中的最新值。但它始终符合例外:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey
这段代码有什么问题吗?
如果我改用.discardingFiredPanes()
,我将丢失最后一次发射的信息。
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply(new ReadSlowChangingTable())
.apply(Latest.perKey())
.apply(View.asMap());
示例输入触发器:
t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>
accumulatingFiredPanes
=> t2 => KV(k1,v1),KV(k2,v2)的预期结果但由于重复的异常而失败
discardingFiredPanes
=>预期结果在t2 => KV(k1,v1)成功
特别关于view.asMap
和评论中的累积窗格讨论:
如果您想使用View.asMap
侧输入(例如,当地图元素的源本身分布时 - 通常是因为您从前一个变换的输出创建侧输入),还有一些其他因素需要考虑:View.asMap
本身就是一个聚合,它将继承触发并累积其输入。在此特定模式中,即使在Latest.perKey
变换之前使用诸如View.asMap
之类的变换,在此变换之前将管道设置为accumulatingPanes模式也将导致重复的键错误。
鉴于读取更新了整个地图,那么我认为使用View.asSingleton
对于这个用例来说是更好的方法。
关于这种模式的一些一般性说明,希望对其他人也有用:
对于这种模式,我们可以使用GenerateSequence
源变换定期发出一个值,例如每天一次。通过激活每个元素的数据驱动触发器将此值传递到全局窗口。在DoFn
中,使用此过程作为触发器从您的SideInput的有限源Create
中提取数据,以用于下游转换。
重要的是要注意,因为此模式使用全局窗口侧输入触发处理时间,所以匹配在事件时间内处理的元素将是不确定的。例如,如果我们有一个在事件时间窗口化的主管道,那些窗口将看到的SideInput视图的版本将取决于在处理时间而不是事件时间中触发的最新触发器。
同样重要的是要注意,一般来说,侧输入应该适合内存。
Java(SDK 2.9.0):
在下面的示例中,侧输入以非常短的间隔更新,这样可以很容易地看到效果。期望是侧输入缓慢更新,例如每隔几小时或每天一次。
在下面的示例代码中,我们使用我们在Map
中创建的DoFn
,它成为View.asSingleton,这是此模式的推荐方法。
下面的示例说明了模式,请注意View.asSingleton
在每次计数器更新时重建。
public static void main(String[] args) {
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
// Using View.asSingleton, this pipeline uses a dummy external service as illustration.
// Run in debug mode to see the output
Pipeline p = Pipeline.create(options);
// Create slowly updating sideinput
PCollectionView<Map<String, String>> map = p
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement public void process(@Element Long input,
OutputReceiver<Map<String, String>> o) {
// Do any external reads needed here...
// We will make use of our dummy external service.
// Every time this triggers, the complete map will be replaced with that read from
// the service.
o.output(DummyExternalService.readDummyData());
}
})).apply(View.asSingleton());
// ---- Consume slowly updating sideinput
// GenerateSequence is only used here to generate dummy data for this illustration.
// You would use your real source for example PubSubIO, KafkaIO etc...
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
@ProcessElement public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));
}
}).withSideInputs(map));
p.run();
}
public static class DummyExternalService {
public static Map<String, String> readDummyData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}