同一 Kinesis 流的多个不同消费者

问题描述 投票:0回答:1

我有一个 Kinesis 生产者,它将单一类型的消息写入流。我想在多个完全不同的消费者应用程序中处理这个流。因此,对于给定的主题/流,具有单个发布者的发布/订阅。我还想利用检查点来确保每个消费者处理写入流的每条消息。

最初,我为所有消费者和生产者使用相同的应用程序名称。然而,一旦我启动了多个消费者,我就开始收到以下错误:

com.amazonaws.services.kinesis.model.InvalidArgumentException:StartingSequenceNumber 49564236296344566565977952725717230439257668853369405442 在流 PackageCreated 上的分片 shardId-000000000000 上使用 GetShardIterator帐户 ************ 下无效,因为它不是来自此流。 (服务:AmazonKinesis;状态代码:400;错误代码:InvalidArgumentException;请求 ID:..)

这似乎是因为消费者使用相同的应用程序名称时与他们的检查点发生冲突。

从阅读文档来看,使用检查点进行发布/订阅的唯一方法似乎是为每个消费者应用程序创建一个流,这要求每个生产者了解所有可能的消费者。这比我想要的更紧密地耦合;这真的只是一个队列。

Kafka 似乎支持我想要的:任意消费给定的主题/分区,因为消费者完全控制自己的检查点。如果我想要带有检查点的发布/订阅,我唯一的选择是迁移到 Kafka 或其他替代方案吗?

我的 RecordProcessor 代码,在每个消费者中都是相同的:

override def processRecords(processRecordsInput: ProcessRecordsInput): Unit = {
  log.trace("Received record(s) from kinesis")
  for {
    record <- processRecordsInput.getRecords
    json   <- jawn.parseByteBuffer(record.getData).toOption
    msg    <- decode[T](json.toString).toOption
  } yield subscriber ! msg
  processRecordsInput.getCheckpointer.checkpoint()
}

代码解析消息并将其发送给订阅者。现在,我只是将所有消息标记为已成功接收。我可以看到在 AWS Kinesis 仪表板上发送的消息,但没有发生读取,大概是因为每个应用程序都有自己的 AppName 并且看不到任何其他消息。

amazon-web-services scala publish-subscribe amazon-kinesis
1个回答
15
投票

支持您想要的模式,即一个发布者到一个 Kinesis 流的多个消费者的模式。您不需要为每个消费者提供单独的流。

你是怎么做到的?您需要为每个消费者提供不同的应用程序名称。这样,一个消费者的检查点信息就不会与另一个消费者的信息发生冲突。

检查对此的第一条回复:https://forums.aws.amazon.com/message.jspa?messageID=554375

© www.soinside.com 2019 - 2024. All rights reserved.