当deleteAfterRead为假时,幂等Camel AWS2-S3消费者无法从存储桶中检索所有文件

问题描述 投票:0回答:2

我将 Camel AWS2-S3 与基于 MemoryIdempotRepository 的幂等消费者一起使用,并尝试使用 Camel AWS2-S3 版本 3.14.2 读取 AWS/S3 存储桶中的所有文件。目的是将检索到的文件保留在存储桶中,因此,deleteAfterRead 设置为 false。 在调试我的代码时,我看到连续的轮询始终只检索一个文件,该文件始终相同。第二次轮询后,该文件被正确地视为重复文件并被忽略。在我的调试日志中,我看到该文件的“忽略带有 id 的重复消息...”。 然而,尽管我的 AWS/S3 存储桶中有更多文件需要检索,但在启动下一次轮询的轮询延迟到期之前,什么也不会发生。 但接下来的轮询仅检索相同的已标记重复文件,依此类推......

如何让 s3client 在忽略上一个文件后继续检索存储桶中的下一个文件。 从 3.9.0 开始的所有 Camel 版本都显示相同的行为。

有人遇到同样的问题并且知道如何解决吗?

提前非常感谢。

java amazon-s3 apache-camel
2个回答
0
投票

Apache Camel AWS2 S3 组件 应根据需要正常轮询 AWS S3 存储桶文件。

仅轮询一个文件意味着:

  • 您在配置消费者时同时指定了
    bucketName
    fileName
    ,这将解决一次又一次轮询同一文件的问题
  • maxMessagesPerPoll
    设置为 1,同时不删除已处理的消息,这也将解决一遍又一遍轮询同一文件的问题
  • 设置与单个文件匹配的
    prefix
    delimiter
    配置选项

您应该开始检查端点配置,看看是否存在任何这些错误配置选项。

另一方面,要实现幂等性,您有两种方法之一:

  • 将您的消费者端点配置为
    moveAfterRead
    ,以便将处理后的文件复制到目标存储桶,避免它们在下一次迭代时重新轮询(您需要配置
    destination
    存储桶)。此解决方案取决于您的要求,并且仅当您负担得起将文件放在另一个存储桶中时才适用,这意味着没有其他组件/服务依赖于同一源存储桶中的这些文件
  • 配置由您选择的存储库支持的
    IdempotentConsumer
    ,在这种情况下您可以标记(并保存)已处理的消息

0
投票

我一直在努力解决这个问题,对我来说这是最好的解决方案。 设置一个路由来检查文件是否存在,只有当存在时才调用第二个路由

        from("timer:foo?period=5000")
    process(e -> {
    S3Client s3Client = // get your S3 client
    String bucketName = "com.xxx.input";
    String key = "tarjetas.csv";

    try { s3Client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(key).build());
           e.getIn().setBody(true);
        } catch (NoSuchKeyException ex) {
            e.getIn().setBody(false);
        }
    })
    .filter(body().isEqualTo(true))
    .to("direct:startMainRoute");

    from("direct:startMainRoute")
.pollEnrich("aws2-s3://com.xxx.input?amazonS3Client=#s3Client&fileName=tarjetas.csv&moveAfterRead=true&destinationBucket=com.ffresco.processed&delay=5000", 5000)
    .process(new TestProcessor())
    .split().tokenize("\n", 1).streaming()
    .log("Hello ${body}");
© www.soinside.com 2019 - 2024. All rights reserved.