kafka-consumer-api 相关问题

用于与Apache Kafka使用者API相关的问题

Kakfa消费者在我自己的偏移ID上的提交没有工作 - commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)

我正在使用poll()从一个主题中获取一堆消息(比如100条),我在配置中设置了auto-commit为false,max.poll.records为100。我在配置中设置了auto-commit为false,max.poll.records为100。我将从100条消息中消耗10条消息 ...

回答 1 投票 0

Kafka监听器:轮询间隔:如何用15分钟的间隔来安排kafka消费者轮询()。

**如何在Kafka监听器中安排15分钟的poll()间隔。我的5分钟轮询间隔的示例代码工作正常,但我有一个15分钟不同的schedule poll()间隔的要求。

回答 1 投票 0

每个键的动态窗口聚合

我有一个特定的用例,在这个用例中,我从一个单一的主题中消耗数据。该主题接收包含特定类型的消息。我的服务在这些类型和时间窗口之间有一个映射 ...

回答 1 投票 0

Kafka无法跟踪最后提交的偏移量。

kakfa-broker在管理偏移量方面有什么已知的问题吗?Bcz,我们面临的问题是,当我们尝试重启kafka-consumer(即应用程序重启)时,有时所有的偏移量都被重置为......

回答 1 投票 0

Kafka流转为CSV

我在java中使用Kafka,将JSON消息以字符串的形式消耗,速率是每分钟100万条消息,我需要将字符串拆分,只取一些值,并保存到CSV中加载到数据库中,如何才能做出这样的 ...

回答 1 投票 0

KSQL窗口式聚合流

我试图通过它的一个属性对事件进行分组,并随着时间的推移使用KSQL窗口化聚合,特别是会话窗口。我有一个由kafka主题制作的STREAM,其TIMESTAMP ...

回答 1 投票 0

使用轮询模式消耗Kafka消息

我是新手,使用的是Kafka 1.0。我使用拉模式读取Kafka消息,也就是说,我周期性地poll()ing Kafka主题以获取新消息,但我没有把偏移量写回Kafka......。

回答 1 投票 1

如何在Kafka中进行话题分类?

比如用户可以订阅特定类别的电影。当一个新的电影出现在Kafka中时,我必须将该信息发送给订阅该电影类别的消费者。如何...

回答 1 投票 1

kafka消费者什么时候会被逐出群组?

我正在使用spring kafka,想知道kafka消费者什么时候从组中被驱逐。当处理时间超过投票间隔时,它是否会被驱逐?如果是,那么是不是 ...

回答 1 投票 -1

多个Kafka生产者对同一主题进行编写--如何平衡负载消耗?

所以我有一个设计,我有多个生产者P1,P2,P3,P4...。PN写到一个主题T1,有32个分区。在另一边,我有多达32个消费者在一个消费者组。...

回答 1 投票 0

Kafka消费者投票行为和秩序

让我们说,我的消费者是从一个经纪人有多个主题和每个主题有多个分区的投票。我有5个消费者在同一个消费者组。如果我的每个消费者...

回答 1 投票 1

Kafka消费者--待取的记录永远不会被删除,投票一直返回0条记录。

我们写了一个Kafka消费者,它根据配置进行数据轮询,每次轮询返回大约400条avro记录,我们对其进行缓冲。缓冲后,我们做了一个寻找端偏移。当缓冲区大小...

回答 1 投票 1

SSL与默认的KafkaConsumer

默认的KafkaConsumer实例是否使用支持SSL的消费?我认为没有,但是我还没有找到使用默认属性的文档。

回答 1 投票 0

IllegalStateException订阅主题、分区和模式是相互排斥的。

需要从一个Kafka主题中获取消息,从一个特定的偏移量中获取。 如果我不使用assign(),那么消费者不会执行seek,因为......

回答 1 投票 0

在服务器之间发送数据 Kafka Apache on Python

producer = KafkaProducer(bootstrap_servers='kf-p1l-node3:9092', value_serializer=lambda x: dumps(x).encode('utf-8')) # utf-8 consumer = KafkaConsumer(...)

回答 1 投票 0

Kafka取最大字节数不能如期工作

我有一个价值1GB的消息的主题。A. Kafka消费者决定消耗这些消息。我怎么做才能禁止消费者一次消耗所有消息?我尝试设置fetch.max....

回答 1 投票 1

如何在可配置的时间轴中以JSON格式获取ActiveInactive主题。

谁能解释一下,或者提供一些有用的链接,在Kafka中用Java获取activeinactive主题?

回答 1 投票 0

Kafka消息 从整数到字符串的值转换。

我在一个kafka主题里有一个kafka消息。这个消息的一个键key=ID,这个键的值是value=12345678910111213141。这个值的类型是整数。我想把类型转换为......

回答 1 投票 1

区分librdkafka中不存在的和未授权的主题。

我怎样才能确定一个主题是否被授权?我需要这个,因为在我的消费者中,我得到了所有已知主题的元数据,然后进行分配调用。元数据调用并没有给出未...

回答 1 投票 0

为什么要用AVRO做Kafka?

如果Java在网络上发送时可以序列化任何东西。为什么他们为Kafka(AVRO)创建了一个全新的框架,而不仅仅是序列化普通的JSON?

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.