@Component
public class MessageConsumer {
private Logger log = LoggerFactory.getLogger(this.getClass());
// some class fields and autowired beans
@Autowired
public KPLService kplService;
@SqsListener(value="sqs-queue")
public void consumingSqs(SqsMessage message) {
// applying some logic
// also doing some mapping here
// kinesis part
kplService.addRecord(mappedSqsRecord);
}
}
@Service
public class KPLService {
private Logger log = LoggerFactory.getLogger(this.getClass());
@Autowired
public KinesisProducer producer;
public void addRecord(MappedSqsRecord record) {
producer.addUserRecord(
"kinesis-stream-name",
record.uuid,
ByteBuffer.wrap(new ObjectMapper().writeValueAsBytes(record))
);
}
public void flush() {
producer.flushSync();
}
@PreDestroy
public void onShutdown() {
flush();
}
}
我想要达到的目标: 在发送到 Kinesis 流之前,我想通过调用
producer.addUserRecord
添加一组记录,然后通过调用 flushSync
将它们作为批次发送,而不是一条一条地发送记录。我创建了一个名为 onShutdown
的函数。因此,例如,在通过调用 addUserRecord
添加 500 条记录之后,我想调用 onShutdown
一次以将它们作为一批 500 条发送。
我遇到的问题: 我想要实现的是有一个函数(如示例中的 onShutdown),当应用程序在给定时间内没有收到来自 sqs 的请求时调用该函数并将该函数调用
flush
以将记录发送到运动。
我尝试使用
@PreDestroy
注释。但我意识到我部署到 AWS Fargate 的应用程序永远不会休眠或摊牌。它一直在运行,似乎 @PreDestroy
永远不会被调用。我该如何解决这种情况。非常感谢任何建议或意见。提前谢谢你。
我看过统计记录条数的例子。如果超过 500(例如)调用
flush
,否则通过调用 addUserRecord
继续添加。但是,如果可能的话,我现在想知道另一种方法。