kafka-producer-api 相关问题

用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。

如何列出kafka集群的所有生产者?

我能够使用KafkaAdminClient列出所有Kafka Consumer:AdminClient client = AdminClient.create(conf); KafkaFuture >名称= ltr.names(); ArrayList

回答 1 投票 0

多线程事务性kafka生产者-我应该在关闭之前刷新吗?

让我们考虑多线程跨国kafka生产者。我应该在关闭之前冲洗()生产者吗?换句话说,事务性生产者是否在发送批量数据之前先对它们进行缓冲?

回答 1 投票 1

带有键的键值,消息类型为“ Spring cloud stream SOURCE

我正在尝试使用Spring Cloud Stream发送带有键,值对的消息。我找不到为此的任何API。 org.springframework.messaging.MessageChannel仅将有效负载作为发送的一部分...

回答 1 投票 0

生产者配置的request.timeout.ms和代理配置的request.timeout.ms如何一起发挥

我正在尝试从生产者端了解请求超时机制。假设我在生产者配置下配置了以下配置:delivery.timeout.ms = 15000 request.timeout.ms = ...

回答 2 投票 2

什么是卡夫卡最好的同步制作人响应时间

我们已经建立了一个三节点的kafka集群,并创建了一个具有复制因子3的主题。我们正在向该集群生成数据,并且看到一些消息花费的时间超过50ms。在分析...

回答 1 投票 0

使用Kafka滚动窗口查询时返回空数据

我正在尝试查询状态存储以在5分钟的窗口中获取数据。为此,我正在使用滚动窗口。添加了REST查询数据。我已经流了A,它消耗了topic1和...

回答 1 投票 1

无法运行Kafka控制台生产者(NoSuchMethodError)

运行kafka producer时出错。/kafka-console-producer.sh--broker-list localhost:9092 --topic testing线程“ main”中的异常java.lang.NoSuchMethodError:kafka.utils.CommandLineUtils $ ....

回答 2 投票 0

如何一起对Kafka Streams和Producer API进行单元测试

[目前,我有一个基本的Kafka流应用程序,该应用程序涉及仅具有源和处理器但没有接收器的拓扑。本质上,拓扑仅处理消息的使用。至于...

回答 1 投票 0

Kafka可验证的生产者和消费者问题

我正在尝试使用卡夫卡。我已经启动了kafka-console-producer和kafka-console-consumer。我通过kafka-producer发送消息,并在kafka-console -...中成功接收。

回答 1 投票 0

Kafka生产者超时异常随机出现

我正在以下kafka配置中使用我的生产者之一,功能正常。 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,“ hostaddress:9092”); props.put(ProducerConfig.CLIENT_ID_CONFIG,“ ...

回答 1 投票 1

Spring Kafka生产者无法正常工作

我正在使用非阻塞(异步)发送消息到Kafka,方法是:ListenableFuture > future = template.send(record); future.addCallback(new ...

回答 1 投票 0

Python KafkaConsumer未连接

设置:我有3个docker容器1)对于Kafka 2)对于Zookeeper 3)对于JupyterLab,我在这些容器之间建立了网络连接,我看到kafka生产者能够运行并产生数据。 ...

回答 1 投票 2

Kafka Producer API:bootstrap.servers属性值

我正在学习Kafka Producer API,并且在教程中,他们提到“ bootstrap.servers”是强制性属性,用于指定当前正在运行的代理(以逗号分隔的值)。我怀疑为什么...

回答 1 投票 0

Kafka:分区数量多于代理数量

关于Kafka,我有以下问题:如果我创建一个主题,并且指定的分区数不超过代理数,那么单个代理将处理多个分区?如果我创建一个主题,然后...

回答 2 投票 1

当脚本给出分区和副本因子的详细信息时,将在代理中创建主题的位置和位置

当我们创建主题时,我们在其中决定分区数和复制因子。是否在所有经纪人中创建了此主题?它特定于任何一个经纪人吗?

回答 2 投票 0

主题在60000毫秒后不存在于元数据中

我已经在MSK(Kafka)中创建了一个主题。而且我已经注册了avro模式。现在,我试图生成有关该主题的消息,但是当我运行生产者时,出现以下错误java.util.concurrent ....

回答 1 投票 0

模式注册表中的向后兼容性问题和不确定性

我有一个用例,其中我有一个JSON,我想生成模式并从JSON中记录并发布记录。我已经配置了值序列化程序,并且架构设置是向后兼容的。 ...

回答 1 投票 1

消费端的卡夫卡压缩失败

我已通过以下更改在生产者端启用了快速压缩:props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,“ snappy”);现在是否需要在消费者方面进行任何更改? ...

回答 1 投票 0

Confent Kafka Python生产者未使用ACKS =所有配置进行生产

我有一些将在kafka主题中产生的python代码,在默认设置acks = 1时可以正常工作,但是当我更改为acks = all或acks = 2时,消息不会出现在主题中。 ...

回答 2 投票 0

仅在ProducerStream中只对单个分区进行生产的高级生产者

我正在尝试向具有2个分区的单个主题生成一些消息。由于我使用的是高级生成器,因此我希望所有消息都将平均分配到这2个...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.