当应用程序关闭/进入睡眠状态时,如何使用 kpl 将记录作为批处理发送到运动流?

问题描述 投票:0回答:0
  • 关于申请: 我有一个部署到 AWS Fargate 服务的 spring 应用程序。此应用程序使用来自 SQS 队列的数据。然后,应用一些逻辑并将这些数据发送到 Kinesis Stream。应用程序在一天中的某个时间点收到大量请求。这是我的应用程序的样子。
@Component
public class MessageConsumer {
  private Logger log = LoggerFactory.getLogger(this.getClass());
  // some class fields and autowired beans
  @Autowired
  public KPLService kplService;
  
  @SqsListener(value="sqs-queue")
  public void consumingSqs(SqsMessage message) {
    // applying some logic
    // also doing some mapping here

    // kinesis part
    kplService.addRecord(mappedSqsRecord);
  }
}
@Service
public class KPLService {
  private Logger log = LoggerFactory.getLogger(this.getClass());

  @Autowired
  public KinesisProducer producer;
  
  public void addRecord(MappedSqsRecord record) {
    producer.addUserRecord(
      "kinesis-stream-name",
      record.uuid,
      ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(record))
    );
  }

  public void flush() {
    producer.flushSync();
  }

  @PreDestroy
  public void onShutdown() {
    flush();
  }
}
  • 我想要达到的目标: 在发送到 Kinesis 流之前,我想通过调用

    producer.addUserRecord
    添加一组记录,然后通过调用
    flushSync
    将它们作为批次发送,而不是一条一条地发送记录。我创建了一个名为
    onShutdown
    的函数。因此,例如,在通过调用
    addUserRecord
    添加 500 条记录之后,我想调用
    onShutdown
    一次以将它们作为一批 500 条发送。

  • 我遇到的问题: 我想要实现的是有一个函数(如示例中的 onShutdown),当应用程序在给定时间内没有收到来自 sqs 的请求时调用该函数并将该函数调用

    flush
    以将记录发送到运动。

我尝试使用

@PreDestroy
注释。但我意识到我部署到 AWS Fargate 的应用程序永远不会休眠或摊牌。它一直在运行,似乎
@PreDestroy
永远不会被调用。我该如何解决这种情况。非常感谢任何建议或意见。提前谢谢你。

我看过统计记录条数的例子。如果超过 500(例如)调用

flush
,否则通过调用
addUserRecord
继续添加。但是,如果可能的话,我现在想知道另一种方法。

spring-boot amazon-sqs amazon-kinesis aws-fargate amazon-kinesis-kpl
© www.soinside.com 2019 - 2024. All rights reserved.