我有一个应用程序需要将文件写入S3,然后将新写入的S3对象的路径放在Kafka消息上。在 REST API 请求期间调用此功能。
根据我的理解,如果我有像下面这样的解决方案,那就是双重写入的经典案例,就好像在写入 S3 后,如果我对 Kafka 的写入失败,则 S3 文件将不会在下游处理。
func processEvent(event Event) {
String s3Path = writeToS3(event);
writeToKafka(messageWithS3Path);
}
因此,为了避免这种情况,我使用 S3 通知,并且仅将写入 S3 作为 REST 调用的一部分。一旦写入 S3,就会通过 SQS 触发对象创建通知,我会使用该通知,然后写入 Kafka。
现在我的问题是我的解决方案是否变得复杂了?如果写入 Kafka 失败或一般情况下任何操作都失败,我是否不能重试整个processingEvent 逻辑?像下面这样
func retryHelper(Event event) {
boolean isDone = false;
do{
try{
processEvent(event);
isDone = true;
} catch(Exception ex) {
log.error("Write to at least one system failed.. retrying..")
}
} while(!isDone)
}
func processEvent(Event event) {
String s3Path = writeToS3(event);
writeToKafka(messageWithS3Path);
}
如果我不关心是否有一些孤立的 S3 文件(Kafka 写入失败),这是否是避免双重写入的有效解决方案?
我知道使用这个解决方案可能会有无限次重试,但如果我们只考虑暂时性错误(或将重试次数限制为较大的次数,例如 50-100),这将是一个有效的(尽管很hacky)解决方案吗?
双写问题是很常见的场景,对于解决这个问题有不同的看法。
最常见的方法是使用
Outbox
模式来解决此问题,其中首先应将数据写入 S3,然后单独的进程将从 S3 接收事件并将其发布到 Kafka 主题/SQS。使用这种方法,您可以异步调用,将其与事件驱动方法解耦,并使用 SQS 或 Kafka 中的重试逻辑。
然而,如果它全部隐含在这样的代码中,则在更糟糕的情况下可能会陷入无限循环。
func retryHelper(Event event) {
boolean isDone = false;
do{
try{
processEvent(event);
isDone = true;
} catch(Exception ex) {
log.error("Write to at least one system failed.. retrying..")
}
} while(!isDone)
}
func processEvent(Event event) {
String s3Path = writeToS3(event);
writeToKafka(messageWithS3Path);
}
我通常使用
S3:PutObject
或其他 S3 事件触发通知并将其发布到 SQS 以及由 Kafka 或其他要处理的服务进行处理。这是很常见的做法。