用于与Apache Kafka使用者API相关的问题
spring-kafka KafkaListener中的并行处理和自动缩放
我正在使用spring-kafka来吸收来自两个Kafka主题的消息,它们发送的消息格式如下。 @KafkaListener(topics = {“ topic_country1”,“ topic_country2”},groupId = KafkaUtils ....
我正在尝试创建一个Kafka拓扑并将其分解为更易读的。我有一个按键分组的流,然后尝试像这样对它进行窗口化:SessionWindowedKStream
在使消费者保持生命力方面,有些事情令我感到困惑。假设我有一个不断向其写入数据的主题。但是,在一天的一个小时内,没有新消息。如果我有...
我正在运行一个测试,kafka消费者正在从一个主题的多个分区读取数据。在进程运行时,我添加了更多分区。使用者线程花了大约5分钟的时间才能完成...
我有以下代码段:groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds))。grace(Duration.ZERO)); KTable mergedTable = ...
我是kafka的初学者,已经开发了kafka消息的消费者,现在看起来不错。尽管在测试消费者时有一个要求可能会有些节制...
我在初始化Kafka使用者时,通过将属性enable.auto.commit设置为false来使用手动kafka提交,并在接收和处理消息后手动调用kafka commit。 ...
如何通过在pyspark上插入子文档将两个文档合并为一个文档?
我有一个大问题,希望在说明要做什么时要明确。我正在尝试在pyspark(Spark结构化流)上获取Stream-Stream结构,并且我想更新相同的内容...
我们服务组中的所有客户端配置的第一要使能。auto.commit = false和auto.offset.reset =最早,以确保未提交的未处理消息不被提交。现在我想...
我是kafka的新手,我想尝试创建主题并将消息从.net应用程序发送到kafka。我正在使用kafka.net dll,并使用以下代码成功创建了主题:Uri uri = new Uri(“ ...
使用发布/订阅如何确认消息?当消息以唯一组的形式发送给某些使用者时。这是否表示所有消费者都确认了消息或所有消息都得到了确认...
我有一个主题加入了Kafka Streams。流分析数据并将结果放入另一个主题“建议”。一项推荐可以分配给一个或多个用户。如何存储...
[当我尝试使用特定的消费者组来消费主题时,它会失败。我可以在新的消费群体中使用同一主题。当在主题上使用describe命令时,没有使用者...
kafka.errors.UnsupportedCodecError:UnsupportedCodecError:在使用kafka时找不到快速压缩编解码器的库
在消费者行代码中的消息中获取以下错误:消费者=消费者中的消息的消费者= KafkaConsumer(topic,group_id = groupid,bootstrap_servers = [主机]):错误:对于消费者中的消息:...
poll()kafka消息以时间间隔为底?因此如何在特定时间内阻止KafkaListenerEndpointRegistry轮询消息
poll()Kafka消息以@kafkaListener为间隔区间?以及如何在特定时间内阻止KafkaListenerEndpointRegistry轮询消息
我有一个使用者,它从一个主题读取数据并产生一个线程进行处理。在单个时间点,服务器中可能正在处理多个消息。应用程序遇到了...
我想获取主题分区中的最后一个记录偏移量。使用者中有endOffsets方法。通常,endOffsets-1可以正常工作。但是在事务性生产者主题的情况下,可以...
我在与Kafka消费者合作时遇到了一个特殊的问题。当我有一个包含多个分区的主题和一个消费者组时,如果消费者...
我需要为我的项目使用什么正确的Kafka结构以及为什么的建议。我的项目Im正在创建投资机器人管理平台。非常高级-您可以编码...
我有一个Kafka基础架构,其主题只有一个分区,将有2个使用者订阅该分区。第二个使用者将运行备份副本应用程序。现在的问题...