apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

KafkaStreams,错误注册Avro模式

免责声明:我对KafkaStreams的体验非常有限。我不太明白为什么会收到org.apache.kafka.common.errors.SerializationException:错误注册Avro模式:和...

回答 1 投票 2

具有替代方法的重载方法构造函数FlinkKafkaConsumer010

我有此scala文件,此导入文件和此代码。我不知道如何解决此重载构造函数问题:包uimp import org.apache.flink.streaming.api.scala._ import org.apache ....

回答 1 投票 0

用于大量数据处理的Spring Boot应用程序的Spring Kafka或Kafka流

我正在使用Spring boot,Kafka和QuickFIX / J为银行创建大量JSON数据处理应用程序。这是我第一次使用Kafka,QuickFIX / J和...

回答 1 投票 0

Kafka AMQ流-主题命令

我从未使用过AMQ Streams(Kafka)。如何通过路由连接到主题(读/创建/删除)?对于Kafka,例如在本地计算机上,我将CLI与基本命令配合使用。但是使用...

回答 1 投票 0

如何比较两个kafka流或数据库表之间的数据(十亿条记录)

我们正在通过CDC从DB2(表1)向Kafka主题(主题1)发送数据。我们需要在DB2数据和Kafka主题之间进行协调。我们有两个选择-a)删除所有kafka主题数据...

回答 1 投票 2

GlobalKTable刷新逻辑

当对GlobalKTable的基础主题进行更新时,KStream应用程序的所有实例获取最新数据的逻辑是什么?以下是我的后续问题:...

回答 1 投票 0

KStream中给定密钥的最新值

我有如下KTable的KTable KTable KTable 然后我有KStream ,...

回答 2 投票 0

Kafka使用者以不可读的格式显示数字

我正在尝试kafka流。我正在从一个主题中读取消息,并执行groupByKey,然后进行组计数。但是问题是消息计数以“ ...

回答 1 投票 0

kafka流如何尝试为每个kafka主题获得相同的分区?

如果我有多个应用程序实例,并且我要加入两个主题,则每个实例必须具有相同的分区才能加入数据。同时,它必须均匀分配...

回答 1 投票 0

窗口组之后的Kafka流总和数据

我有一个仓库应用程序,在其中我需要按小时计算总库存。所有项目移动数据都发送到kafka流(添加/删除)。这意味着,我可以每小时获取一次...

回答 1 投票 1

ClickHouse:分布式和复制表中哈希和internal_replication的用法

我已经在Distributed Engine文档中阅读了有关internal_replication设置的内容。如果此参数设置为“ true”,则写入操作将选择第一个正常副本,然后写入...

回答 1 投票 0

如何使用静态记录列表创建KStream以进行单元测试

[我正在尝试为使用Kafka流的类编写单元测试,如下所示:KStream stream = streamBuilder.stream(topic)stream.foreach((key,value)-> {//做一些数据操作.. 。

回答 1 投票 0

[Kafka流的两个字段聚合

我正在使用kafka流为仓库中的物料创建汇总(总和)。可以添加(例如,从供应商处购买)或移除(例如,已出售的项目)。在该应用程序中,一个仓库可以服务...

回答 1 投票 0

Kafka Streams API:会话窗口异常

我正在尝试创建一个Kafka拓扑并将其分解为更易读的。我有一个按键分组的流,然后尝试像这样对它进行窗口化:SessionWindowedKStream

回答 1 投票 0

如何在哈希图中存储KStream的对象类型的值?

我有一个KStream对象,例如KStream 。我想在HashMap中获取Object的值并将其存储为其他KStream中处理后的值吗?例如,...

回答 1 投票 0

Kafka流转发方法抛出NullPointerException,因为ProcessorNode currentNode对象为空

我们已经编写了从源主题读取的Kafka流应用程序,它在2个处理器中执行了一些业务逻辑,然后将输出写入接收器主题。下面是创建拓扑的代码,添加...

回答 1 投票 1

Kafka流-GroupBy-晚事件-persistentWindowStore-具有宽限期和禁止的WindowBy

我的目的是每秒计算从源到目的地的成功和失败消息,并将其结果汇总为每日基准。我有两种选择可以做到这一点;流事件,然后将它们分组time#source#...

回答 1 投票 0


从Spark Streaming获取异常

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 {def main(assdf:Array [String]){val sc = new SparkContext(“ local”, “ Stream”)val ...

回答 1 投票 0

在Spring Cloud Stream上使用自定义Serde序列化聚合状态存储时出错

[我正在尝试使用Spring Cloud Stream创建一个简单的功能bean,该bean处理来自KStream和GlobalKTable的消息,将它们加入,聚合它们,并将结果输出到新的流中...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.