apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

Kafka流:NUM_STREAM_THREADS_CONFIG > 1是否会破坏分区的总排序?

我们开始吧。我有一个相当复杂的拓扑结构,包括各种连接,聚合,过滤器,地图等等。默认情况下,NUM_STREAM_THREADS_CONFIG参数等于1,这完全是决定性的。

回答 1 投票 1

是否使用Kafka Streams和or KSQL来对数据库中的数据流进行去正常化处理。

在网上做了很多阅读之后,我终于接触到了这个论坛。我的挑战是将数据库中通过CDC来源的事务性数据去正常化到Kafka中,然后再写出来成 ...

回答 1 投票 0

KafkaStreams Scala对2个字段进行分组以获得不同的计数。

在我的Kafka主题0中,我有以下Key-Value数据 -> {"UserId": "123", "DateAndTime": "2020-06-10T10:30:03.000Z"}。0 -> {"UserId": "123", "DateAndTime": "2020-05-10T10:30:03.000Z"}。1 -> {"...

回答 1 投票 -1

npm库kafka-node和kafka-streams之间的区别。

在一个项目中,Node.js应用连接到Kafka消息队列,从队列中获取所有消息。我搜索了一下,发现有两个包kafka-node和kafka-streams。请问哪个包适合这里?

回答 1 投票 0

KT表suppress(suppressed.untilTimeLimit())不包含指定时间的记录。

我已经实现了一个流处理应用程序,它可以进行一些计算和转换,并将结果发送到一个输出主题。之后,我从该主题中读取,我想抑制 ...

回答 1 投票 0

Kafka Streams 2.3到2.5的升级打破了Scala的编译功能

当从KafkaStreams库从2.3升级到2.5,保持相同的Scala版本时,运行时出现以下错误。[info] com.mypackage.dp.streams.applications...。

回答 1 投票 0

升级到kafka-streams:5.5.0-css(Apache Kafka 2.5.0)后,GlobalKTable的getting store崩溃了 [已解决] 。

我有一个使用GlobalKTable的Spring Boot App。直到更新到kafka-streams-5.5.0-css(Confluent Platform版本与Apache Kafka 2.5.0兼容),从5.3.2-css(Apache Kafka 2......)更新到kafka-streams-5.5.0-css之前,它工作得很好。

回答 1 投票 0

使用SendToDlqAndContinue spring kafka stream活页夹的序列化异常

我正在尝试在处理异常时将消息发送到DLQ,但是当我从Spring-boot-kafka-streams-binder @EnableBinding(...]使用SendToDlqAndContinue时,我不断收到序列化异常,这是由于<>

回答 1 投票 0

Kafka POM依赖关系问题-ClassNotFoundException:org.apache.kafka.test.TestCondition

我正在查看-java.lang.NoClassDefFoundError:集成测试中的org / apache / kafka / test / TestCondition。我认为这与存储库导入与kafka相关的软件包的方式有关。我是...

回答 1 投票 0

Kafka:与多个使用者的sendOffsetsToTransaction

对于Kafka项目,我使用消费/加工/生产模型,但有两个消费者。所以我想知道,是否有可能对具有唯一生产者的两个使用者使用sendOffsetsToTransaction()函数?...

回答 1 投票 0

访问Kafka流中聚合器内部的TimeWindow属性

我想在一个时间窗口内使用Kafka-Streams流式传输一个主题的最新记录,并且我希望将输出记录的时间戳设置为等于该记录的时间窗口的结尾...

回答 1 投票 2

Kafka StateStore共享最佳实践

创建处理器API拓扑时,我注意到Topology#addStateStore(StoreBuilder,String ...)接受多个处理器,这意味着一个状态存储可以由多个处理器共享。是...

回答 1 投票 -1

Kafka Stream随身携带输入标题

我想保留输入流记录头,就像它在输出主题中一样。我该如何实现? Kafka Stream版本:2.3.1使用ProcessorContext.forward()方法。如果有人可以给样品...

回答 1 投票 0

ProcessorContext#header()为空

我们有kafka流应用程序。生产者将标题发送到Kafka Streaming应用程序之前,先在kafka消息中添加标题。在Kafka流媒体应用程序中,我们使用AbstractProcessor和上下文。...

回答 1 投票 0

Kafka-流与主题

Kafka主题和流之间有什么区别?我以为两者都一样。这个文档说是从引起混乱的主题创建流。 https://docs.ksqldb.io/en/latest / ...

回答 1 投票 0

如何在Spring Cloud Kafka Streams应用程序中执行flatTransform?

我正在尝试在Spring Cloud Kafka Streams应用程序中执行flatTransform。但我不确定将KafkaStreamsStateStore批注确切放置在何处。目前,我收到错误消息:Invalid ...

回答 1 投票 1

Spring Cloud Stream Kafka Streams:下游消息数与发送给该主题的消息总数不匹配

我有一个基于Spring Boot的Spring Cloud Stream Kafka Streams Binder应用程序。它定义了一个拓扑,其中包含以下内容:绿色的数字表示通过...

回答 1 投票 1

如何处理Spring cloud流kafka流活页夹中的序列化错误?

我正在使用Spring Cloud Stream Kafka Streams绑定器编写Kafka Streams应用程序。使用者将消息发布到输出主题时,可能会出现诸如序列化错误或...

回答 1 投票 0

Kafka流无法在StreamTask内部解码时间戳元数据

[在启动应用程序java.lang.IllegalArgumentException时,在Kafka Streams上出现了奇怪的错误:java.base / java.util.Base64 $ Decoder.decode0(Base64.java:743)处的base64字符非法7b ...

回答 1 投票 2

如果在处理步骤期间发生故障,如何使Spring云流Kafka流绑定程序重试处理消息?

我正在使用Spring Cloud Stream开发Kafka Streams。在消息处理应用程序中,可能会产生错误。因此,不应提交和重试该消息...

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.