apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

Spring Cloud Stream Kafka应用程序无法使用正确的Avro架构生成消息

我有一个带有KStream的应用程序(spring-boot-shipping-service),它获取由外部生产者(spring-boot-order-service)生成的OrderCreatedEvent消息。这个制片人使用以下......

回答 1 投票 0

Spring Cloud Stream Kafka Stream与本机Kafka Stream应用程序和生产者之间不兼容的Avro消息

验证这一点的示例应用程序可以在https://github.com/codependent/event-carried-state-transfer/tree/avro kafka-xxx中找到:本机应用程序spring-boot-xxx:Spring Cloud Stream ...

回答 1 投票 0

KStream是否在kafka 2.2中自动创建主题?

KStream / TopicNameExtractor javadocs指示当使用to(TopicNameExctractor提取器)时,不会自动创建主题,而是必须已经存在。但是,我看到......

回答 1 投票 1

我们如何为Spring Cloud Stream Kafka生产者,消费者和KStreams中的模式配置value.subject.name.strategy?

我想在Spring Cloud Stream Producers,Consumers和KStreams中自定义Avro架构主题的命名策略。这将在Kafka中使用属性key.subject.name ....

回答 1 投票 1

如何使用Kafka流DSL功能处理重复的消息

我的要求是跳过或避免使用kafka流DSL API从INPUT主题收到的重复消息(具有相同的密钥)。源系统有可能向INPUT发送重复的消息......

回答 3 投票 0

为什么Kafka经纪人在__consumer_offsets主题中存储消费者群体的重新平衡元数据?

在处理Kafka Streams应用程序时,我们发现组协调器代理将重新平衡元数据存储在__consumer_offsets主题中。因为在我们的例子中我们有1200个流线程,...

回答 1 投票 2

kafka流会话窗口保留期限

我们正在使用Kafka stream的SessionWindows来聚合相关事件的到来。与聚合一起,我们使用until()API指定窗口的保留时间。流信息:...

回答 1 投票 3

如何在加入Kafka Streams时访问原始记录

我有一个工作的Kafka Streams应用程序,目前正在从两个不同的主题创建两个KStream。那部分工作正常。现在,我想加入他们,并获得...的“汇总记录”

回答 1 投票 1

如何使用KStream发送标头

我正在研究一个用例,我在其中创建了将数据从mongo发送到elasticsearch的管道。 Mongo - > Spring Boot - > Kafka - > Transformer(KStream) - > Kafka - > Consumer(发送到Elastic ...

回答 1 投票 0

使用特定avro服务实现的Kafka聚合会产生NullPointerException

我正在尝试窗口化数据流,对于每个窗口,我需要该窗口中的值列表,并且这样做我创建了一个自定义avro架构,其中包含一个字段记录,这是一个Input列表。 ......

回答 1 投票 0

Kafka流合并消息

我有一个数据有效负载,对于一条消息来说太大了。考虑一个avro:记录喜欢{...}记录评论{...}记录帖子{喜欢喜欢;评论评论;弦体;假设,喜欢......

回答 1 投票 0

如何调查Kafka-stream处于ERROR状态的原因

我正在使用Spring Cloud Finchley.RELEASE与RocksDb和Kafka-Streams进行生产应用。有时Kafka-Streams进入ERROR状态,除了...之外无法访问商店

回答 1 投票 0

Java Streams中lambda函数的集合

我有一个流函数KStream [] branch(final Predicate super K,?super V> ...谓词)。我想动态创建一个谓词列表。那可能吗? KStream&...

回答 1 投票 2

已请求默认活页夹,但“org.springframework.cloud.stream.messaging.DirectWithAttributesChannel”没有可用的活页夹

我试图用Spring Cloud + Kafka Streams + Spring Boot 2创建最简单的hello世界。我意识到我错过了基本的概念。基本上,我理解:1 - 我需要定义一个......

回答 1 投票 0

打印Kafka Stream输入到控制台?

我一直在查看我正在研究的Java应用程序的很多Kafka文档。我已经尝试了Java 8中引入的lambda语法,但我对...有点粗略。

回答 1 投票 12

如何在spring-cloud-stream中的kafka流程拓扑中使用交互式查询?

是否可以在Spring Cloud Stream中使用带有@EnableBinding注释的类或使用@StreamListener的方法中的交互式查询(InteractiveQueryService)?我试过实例化......

回答 1 投票 0

在kafka流中使用Redis池是否安全?

所以,问题是关于在流函数中使用外部状态存储的安全性,如过滤器,映射等。可以这样做:JedisPool pool = ...; KStream stream = ...; ...

回答 2 投票 2

自动装配时没有打印值的KStream对象

我正在尝试创建kstreams bean并在我的服务中自动装配它。但即使我得到相同的对象stream.print()没有给出任何值,但在同一个bean内打印正在工作。我觉得我是 ...

回答 1 投票 0

使用24小时的时间窗口聚合时是否可以设置时区

我希望使用24小时的时间窗口汇总一些值,它工作正常,但是时间窗口从00:00到00:59的utc时间,是否可以设置时区以便时间窗口开始和结束我的... 。

回答 1 投票 0

聚合使用错误的序列化程序

我正在使用kafka-streams应用程序来处理日志事件。在这种情况下,我想将WorkflowInput类型聚合到工作流类型中。我在使聚合工作时遇到问题。 ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.