kafka-consumer-api 相关问题

用于与Apache Kafka使用者API相关的问题

虽然在Junit测试案例中@Autowired与@Qualifier显示无法加载应用程序上下文和非法状态异常

我在我的应用程序中使用@KafkaListener,这就是为什么我使用@Configuration静态类ContextConfiguration {//创建bean的原因。我的类使用@Autowired @Qualifier(“ ...

回答 1 投票 1

如何使用python在kafka使用者中聚合json数据?

我在KAFKA Transactions中产生的数据如下所示:ConsumerRecord(topic ='Transactions',partition = 0,offset = 3,timestamp = 1591277946735,timestamp_type = 0,key = None,value = {'transaction_id':. ..

回答 1 投票 0

Kafka-消费缓慢时的最佳做法

我有一个用例,其中我有3个Kafka消费者在写一个主题,每个消费者中的消息需要按顺序进行处理。如果某个使用者有滞后,则...

回答 1 投票 0

如何使用C#点网络客户端以编程方式创建主题并将消息发送到Kafka

我是kafka的新手,我想尝试创建主题并将邮件从.net应用程序发送到kafka。我正在使用kafka.net dll,并使用以下代码成功创建了主题:Uri uri = new Uri(“ ...

回答 1 投票 0

将Apache Kafka扩展到Web和移动应用程序

我正在探索Apache Kafka来构建应用程序,它确实符合我们的需求。但是,作为应用程序的一部分,我们还需要将数据流和通知推送到用户的应用程序和Web ...

回答 1 投票 0

同一主题的2个kafka消费者的分区结构

如果我创建两个传递相同属性的kafka消费者实例,就订阅同一主题,这两个消费者实例是否具有相似的分区结构,或者可能不同?我的实际问题...

回答 1 投票 0

将kafka消费者调查保持在MAX_VALUE会延迟组重新平衡吗?

consumer.poll(Long.MAX_VALUE);这是为了允许处理(高延迟)完成。保持这种高价值有副作用吗?它会阻止分区中的其他消息吗...

回答 1 投票 0

无法在Docker容器中使用Kafka代理运行控制台使用者

我在Docker容器中运行Kafka代理时遇到问题。我已经下载并解压缩了Kafka 2.12-2.4.1的tar存档。我可以从命令行运行Zookeeper和Kafka经纪人,...

回答 2 投票 2

仅允许来自某些主机/ IP的消费者访问kafka主题

[我们有一个基于Kafka的系统,以及正在使用该系统的大量开发人员。我们仍处于开发和测试阶段,尚未投入生产。对于大多数本地开发人员...

回答 1 投票 0

Kafka出现火花流问题:无法从具有现有数据的主题中读取数据

我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....

回答 1 投票 0

如何通过kafka控制台生产者发送密钥,价值消息

我有一个用例,需要用Kafka Console Producer发送键值消息。那么如何通过Kafka Console Producer命令实现这一目标?

回答 1 投票 0

我应该如何在Kafka中实现延迟的主题体系结构

如果我的使用者暂时无法处理该消息,我希望将其推送到延迟5分钟的主题,如果无法从那里处理,也希望将其推送到延迟30分钟的主题。 ...

回答 1 投票 1

如何订阅消费者组以仅阅读一个主题?

[我是Kafka的新手,出于测试目的,我使用docker-compose.yml启动了Kafka集群:版本:'3.7'服务:zookeeper:图片:'bitnami / zookeeper:3'container_name:...

回答 1 投票 0

具有自动提交功能的KafkaConsumer CPP API Assign()

我有一个CPP Kafka使用者,它使用assign指定分区。由于我使用assign()分配分区,而不使用我擅长的subscribe()。因此,我的重新...

回答 1 投票 0

Kafka消费者集团合并?

我有4个主题:topicA topicB topicC topicD我有2个组:groupA groupB groupA当前用于topicA,topicB groupB当前用于topicC和topicD是否有可能...

回答 1 投票 0

无法从Java连接到在Docker中运行的Kafka

[尝试使用Debezium将MySql数据库流式传输到Kafka。因此,在Docker容器中,我启动了Zookeeper,Kafka,MySQL数据库,MySQL命令行和Kafka Connect。当我运行任何DML命令时...

回答 1 投票 0

从Kafka消费数据

[我正在通过为以下三个属性指定类来使用来自Kafka Topic的消息:--property key.deserializer = org.apache.kakfa.common.serialization.StringDesrializer --...

回答 1 投票 1

在Spring boot kafka中的ProducerConfigs中将ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG设置为IntegerSerializer时,它给出类强制转换例外

我正在使用带有kafka的Spring引导程序来设置我的项目。但是当我运行它时,它将给出org.apache.kafka.common.errors.SerializationException:无法将类java.lang.Integer的键转换为类...

回答 1 投票 0

spring-kafka KafkaListener中的并行处理和自动缩放

我正在使用spring-kafka来吸收来自两个Kafka主题的消息,它们发送的消息格式如下。 @KafkaListener(topics = {“ topic_country1”,“ topic_country2”},groupId = KafkaUtils ....

回答 1 投票 0

Kafka Streams API:会话窗口异常

我正在尝试创建一个Kafka拓扑并将其分解为更易读的。我有一个按键分组的流,然后尝试像这样对它进行窗口化:SessionWindowedKStream

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.