我们正在使用Kafka来存储由我们集群中的节点生成的消息,并将其分发到集群中的所有节点,我主要使用它来处理akka-streams,但是我有几个问题需要解决这个问题。这有一些限制。
首先,消息必须由集群中的每个节点使用,但仅由一个节点生成。我知道我可以为每个节点分配一个可能是其节点ID的组ID,这意味着每个节点都将获得该消息。那分类。但这里有问题。
数据非常短暂且相当大(仅低于1亿)并且无法进一步压缩或分解。如果有关于该主题的新消息,则旧消息几乎是垃圾。如何将主题限制为基本上只有一条消息当前最大?
鉴于数据是节点启动所必需的,我需要使用关于主题的最新消息,无论我之前是否已经消耗它,并且希望每次启动服务器时都不创建唯一的组ID。这是可能的,如果是这样,怎么办呢。
最后,数据通常是关于主题的,但有时它不在那里,理想情况下,我需要能够检查那里是否有消息,如果没有请求生产者创建消息。这可能吗?
这是我目前用于启动消费者的代码:
private Control startMatrixConsumer() {
final ConsumerSettings<Long, byte[]> matrixConsumerSettings = ConsumerSettings
.create(services.actorSystem(), new LongDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(services.config().getString("kafka.bootstrapServers"))
.withGroupId("group1") // todo put in the conf ??
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
final String topicName = Matrix.class.getSimpleName() + '-' + eventId;
final AutoSubscription subscription = Subscriptions.topics(topicName);
return Consumer.plainSource(MatrixConsumerSettings, subscription)
.named(Matrix.class.getSimpleName() + "-Kafka-Consumer-" + eventId)
.map(data -> {
final Matrix matrix = services.kryoDeserialize(data.value(), Matrix.class);
log.debug(format("Received %s for event %d from Kafka", Matrix.class.getSimpleName(), matrix.getEventId()));
return matrix;
})
.filter(Objects::nonNull)
.to(Sink.actorRef(getSelf(), NotUsed.getInstance()))
.run(ActorMaterializer.create(getContext()));
}
谢谢一堆。
所有消息都必须由集群中的每个节点使用,但只由一个节点生成。
您是对的,您可以通过为每个节点提供唯一的组ID来实现此目的。
如何将主题限制为基本上只有一条消息当前最大?
卡夫卡提供compacted topics。压缩主题仅维护给定键的最新消息。例如,Kafka消费者将他们的抵消存储在压缩的主题中。在您的情况下,使用相同的密钥生成每条消息,Kafka Log Cleaner将删除旧消息。请注意,压缩是定期执行的,因此您可以在短时间内使用相同的密钥最终得到两条(或更多条)消息(取决于您的Log Cleaner configuration。
无论我之前是否已经消耗了该主题,我都需要使用该主题的最新消息。
您可以通过不提交消费者偏移(enable.auto.commit
设置为false
)并将auto.offset.reset
设置为earliest
来实现此目的。通过在主题开头的压缩主题和使用者中添加一条消息,节点启动后始终会消耗该消息。
我需要能够检查那里是否有消息,如果没有请求生产者创建消息。
不幸的是,我不知道任何可以帮助你的Kafka功能。大多数时候,卡夫卡被用来解耦生产者和消费者。