我将 Camel AWS2-S3 与基于 MemoryIdempotRepository 的幂等消费者一起使用,并尝试使用 Camel AWS2-S3 版本 3.14.2 读取 AWS/S3 存储桶中的所有文件。目的是将检索到的文件保留在存储桶中,因此,deleteAfterRead 设置为 false。 在调试我的代码时,我看到连续的轮询始终只检索一个文件,该文件始终相同。第二次轮询后,该文件被正确地视为重复文件并被忽略。在我的调试日志中,我看到该文件的“忽略带有 id 的重复消息...”。 然而,尽管我的 AWS/S3 存储桶中有更多文件需要检索,但在启动下一次轮询的轮询延迟到期之前,什么也不会发生。 但接下来的轮询仅检索相同的已标记重复文件,依此类推......
如何让 s3client 在忽略上一个文件后继续检索存储桶中的下一个文件。 从 3.9.0 开始的所有 Camel 版本都显示相同的行为。
有人遇到同样的问题并且知道如何解决吗?
提前非常感谢。
Apache Camel AWS2 S3 组件 应根据需要正常轮询 AWS S3 存储桶文件。
仅轮询一个文件意味着:
bucketName
和fileName
,这将解决一次又一次轮询同一文件的问题maxMessagesPerPoll
设置为 1,同时不删除已处理的消息,这也将解决一遍又一遍轮询同一文件的问题prefix
或 delimiter
配置选项您应该开始检查端点配置,看看是否存在任何这些错误配置选项。
另一方面,要实现幂等性,您有两种方法之一:
moveAfterRead
,以便将处理后的文件复制到目标存储桶,避免它们在下一次迭代时重新轮询(您需要配置 destination
存储桶)。此解决方案取决于您的要求,并且仅当您负担得起将文件放在另一个存储桶中时才适用,这意味着没有其他组件/服务依赖于同一源存储桶中的这些文件IdempotentConsumer
,在这种情况下您可以标记(并保存)已处理的消息我一直在努力解决这个问题,对我来说这是最好的解决方案。 设置一个路由来检查文件是否存在,只有当存在时才调用第二个路由
from("timer:foo?period=5000")
process(e -> {
S3Client s3Client = // get your S3 client
String bucketName = "com.xxx.input";
String key = "tarjetas.csv";
try { s3Client.headObject(HeadObjectRequest.builder().bucket(bucketName).key(key).build());
e.getIn().setBody(true);
} catch (NoSuchKeyException ex) {
e.getIn().setBody(false);
}
})
.filter(body().isEqualTo(true))
.to("direct:startMainRoute");
from("direct:startMainRoute")
.pollEnrich("aws2-s3://com.xxx.input?amazonS3Client=#s3Client&fileName=tarjetas.csv&moveAfterRead=true&destinationBucket=com.ffresco.processed&delay=5000", 5000)
.process(new TestProcessor())
.split().tokenize("\n", 1).streaming()
.log("Hello ${body}");