kafka-producer-api 相关问题

用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。

如何将Kafka Connect中的SourceRecord转换为AVRO?

我有一个用例,我需要将SourceRecord转换为GenericRecord。谁能帮助我如何做?基本上,我正在编写一个自定义的Kafka生产者,并将其嵌入到Connect运行时......

回答 1 投票 0

Kafka Producer在eclipse中不发送消息到主题。

我无法从Windows(Host OS)上的eclipse使用java从KafkaProducer发送消息到Hortonworks Sandbox上运行的kafka主题。我的java代码如下 import java.util.Properties; import ...

回答 1 投票 0

Kakfa Producer - Spring Boot应用程序 - 无法生成消息

我也是刚刚学习spring boot和kakfa。我探索了一下,并配置了一个示例生产者应用,如下图。但是我无法发布消息。如果我得到的是...

回答 1 投票 0

我如何创建一个既是kafka消费者又是kafka生产者的组件?

我试图创建一个组件,从一个主题消耗数据,处理数据并发送到另一个主题,也就是说,我需要让我的组件既是消费者又是生产者。我如何配置这个...

回答 1 投票 0

kafka服务器在windows环境下运行一段时间后会崩溃

我目前正在尝试一些测试,其中包括Kafka来发送和接收消息。在我的应用中,我有一个简单的Kafka生产者,它定期产生消息,而Kafka接收器......

回答 1 投票 0

Kafka极高延迟的C#

我正在Apache Kafka上做一些性能测试,以便与RabbitMQ和ActiveMQ等其他软件进行比较。我的想法是将它用于代理通信的消息系统。我正在测试多个...

回答 1 投票 0

当数据从生产者推送时,Kafka Node间歇性崩溃

我们有一个3节点的Kafka集群(版本5.2.1,apache kafka版本:2.2.0)。有一段时间,我们观察到一个异常,每当我们尝试推送 ...

回答 1 投票 0

如何从Kafka全局状态存储中删除记录?

在恢复过程中,全局状态存储将从源主题中转储数据(这被认为是全局存储的变更日志主题)。对于删除记录,我做了如下的操作 kvStore.put("key-...

回答 1 投票 0

Kakfa消费者在我自己的偏移ID上的提交没有工作 - commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)

我正在使用poll()从一个主题中获取一堆消息(比如100条),我在配置中设置了auto-commit为false,max.poll.records为100。我在配置中设置了auto-commit为false,max.poll.records为100。我将从100条消息中消耗10条消息 ...

回答 1 投票 0

在Alpakka中使用Transactional.Sink无法向Kafka主题生成消息,但我看到idempotent producer被启用了。

你好,我想使用Alpakka文档中的Producer api。使用Transactional source可以消费记录,制作人也创建了,但是不能把消息放到主题中。

回答 1 投票 0

如何用Kafka在生产者端有容错率?

我是Kafa和数据摄取的新手。我知道Kafka是容错的,因为它把数据冗余地保存在多个节点上。但是,我不明白的是,我们如何才能实现容错,在 ...

回答 1 投票 1

多个Kafka生产者对同一主题进行编写--如何平衡负载消耗?

所以我有一个设计,我有多个生产者P1,P2,P3,P4...。PN写到一个主题T1,有32个分区。在另一边,我有多达32个消费者在一个消费者组。...

回答 1 投票 0


将csv文件写入kafka主题

我有一个大的csv,我想写到一个kafka主题。 def producer(): producer = KafkaProducer(bootstrap_servers='mykafka-broker') with open('homeantonisrepostestfile.csv') as file: ...

回答 1 投票 0

Kafka streams Exception: org.apache.kafka.streams.errors.StreamsException - Deserialization异常处理程序。

我正试图用kafka流实现一个简单的计数器。它接受一个键值,如果相同的键值来了,它就会添加新的值。这是我目前写的代码 package exercises;import org.apache...。

回答 1 投票 0

spark writeStream into kafka - awaitTermination()与 awaitAnyTermination()之间的区别。

根据官方文档,我使用下面的代码段写入kafka主题,但它没有写入kafka。 finalStream = final.writeStream \ .format("kafka") \ .option("kafka......")。

回答 1 投票 0

如何在可配置的时间轴中以JSON格式获取ActiveInactive主题。

谁能解释一下,或者提供一些有用的链接,在Kafka中用Java获取activeinactive主题?

回答 1 投票 0

Kafka消息 从整数到字符串的值转换。

我在一个kafka主题里有一个kafka消息。这个消息的一个键key=ID,这个键的值是value=12345678910111213141。这个值的类型是整数。我想把类型转换为......

回答 1 投票 1

Spark Kafka Producer抛出太多打开的文件 Exception

我正在尝试运行一个用Java编写的Spark Kafka Job,以产生大约10K记录,每批到一个Kafka Topic。这是一个Spark批处理作业,它读取100个(共100万条记录)hdfs部分文件... ...

回答 1 投票 0

KafkaTimeoutError: 60.0秒后更新元数据失败。

我有一个高吞吐量kafka生产者的用例,我想每秒钟推送成千上万的json消息。我有一个3个节点的kafka集群,我使用最新的kafka-python库,并且有 ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.