我的用例真的是双写问题吗?或者我的解决方案过于复杂化了

问题描述 投票:0回答:1

我有一个应用程序需要将文件写入S3,然后将新写入的S3对象的路径放在Kafka消息上。在 REST API 请求期间调用此功能。

根据我的理解,如果我有像下面这样的解决方案,那就是双重写入的经典案例,就好像在写入 S3 后,如果我对 Kafka 的写入失败,则 S3 文件将不会在下游处理。

func processEvent(event Event) {
    String s3Path = writeToS3(event);
    writeToKafka(messageWithS3Path);
}

因此,为了避免这种情况,我使用 S3 通知,并且仅将写入 S3 作为 REST 调用的一部分。一旦写入 S3,就会通过 SQS 触发对象创建通知,我会使用该通知,然后写入 Kafka。

现在我的问题是我的解决方案是否变得复杂了?如果写入 Kafka 失败或一般情况下任何操作都失败,我是否不能重试整个processingEvent 逻辑?像下面这样

func retryHelper(Event event) {
  boolean isDone = false;
  do{
    try{
      processEvent(event);
      isDone = true;
    } catch(Exception ex) {
      log.error("Write to at least one system failed.. retrying..")
    }
  } while(!isDone)
}


func processEvent(Event event) {
    String s3Path = writeToS3(event);
    writeToKafka(messageWithS3Path);
}

如果我不关心是否有一些孤立的 S3 文件(Kafka 写入失败),这是否是避免双重写入的有效解决方案?

我知道使用这个解决方案可能会有无限次重试,但如果我们只考虑暂时性错误(或将重试次数限制为较大的次数,例如 50-100),这将是一个有效的(尽管很hacky)解决方案吗?

amazon-s3 apache-kafka design-patterns anti-patterns
1个回答
0
投票

双写问题是很常见的场景,对于解决这个问题有不同的看法。

最常见的方法是使用

Outbox
模式来解决此问题,其中首先应将数据写入 S3,然后单独的进程将从 S3 接收事件并将其发布到 Kafka 主题/SQS。使用这种方法,您可以异步调用,将其与事件驱动方法解耦,并使用 SQS 或 Kafka 中的重试逻辑。

然而,如果它全部隐含在这样的代码中,则在更糟糕的情况下可能会陷入无限循环。

func retryHelper(Event event) {
  boolean isDone = false;
  do{
    try{
      processEvent(event);
      isDone = true;
    } catch(Exception ex) {
      log.error("Write to at least one system failed.. retrying..")
    }
  } while(!isDone)
}

func processEvent(Event event) {
    String s3Path = writeToS3(event);
    writeToKafka(messageWithS3Path);
}

我通常使用

S3:PutObject
或其他 S3 事件触发通知并将其发布到 SQS 以及由 Kafka 或其他要处理的服务进行处理。这是很常见的做法。

© www.soinside.com 2019 - 2024. All rights reserved.