与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
KStream left = builder.stream(“source1”); left.toStream(“soure2”)想要在发送到source2之前序列化
使用Spring访问Kafka Stream State Store
引用此文档的4.2.6 https://docs.spring.io/spring-kafka/reference/htmlsingle/#kafka-streams如何使用kafka流弹簧支持访问州商店?没有春天,你......
为什么我偶尔会收到一个InvalidStateStoreException PARTITIONS_REVOKED,而不是在检索商店进行查询时运行?
我正在访问一个状态存储来查询它,并且必须用try / catch块包装store()语句来重试它,因为有时我得到这个异常:org.apache.kafka.streams.errors ....
为什么此KStream / KTable拓扑会传播未通过过滤器的记录?
我有以下拓扑:创建状态存储过滤基于SOME_CONDITION的记录,将其值映射到新实体,最后将这些记录发布到另一个主题...
我试图从Kotlin调用以下Java方法:KStream [] branch(final Predicate super K,?super V> ...谓词);此方法是Kafka Streams API的一部分。 ......
Kafka Streams - 来自保留政策主题的KTable
我正在尝试kafka流,我有以下设置:我有一个现有的kafka主题,其密钥空间是无限的(但可预测且众所周知)。我的话题有保留......
我在Kafka Streaming中进行性能测试。我用Transformer创建了一个简单的Streams API。 //从输入主题builder.stream(Serdes.String(),Serdes.String(),inTopic)中流式传输数据...
我有一个经常打勾的计时器KStream(想想秒数),我想在24小时的窗口内计算各种统计数据。例如,24小时更改,给定的价格差异...
也许这是一个初学者的问题,但是读取KSQL中生成的数据的推荐方法是什么?我们假设我做了一些流处理并将数据写入KSQL表。现在我想访问这个...
Kafka Stream Rebalancing:从REBALANCING到ERROR的状态转变
我有4个主题,包括单个分区和3个应用程序实例。我尝试通过编写自定义PartitionGrouper来实现可伸缩性,它将创建3个任务,如下所示:1st instance -...
Kafka Streams:使用相同的`application.id`来使用多个主题
我有一个需要收听多个不同主题的应用程序;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个人使用相同的kafka属性......
我刚刚开始使用Kafka和Kafka Streaming Applications。我写了一个Kafka Stream应用程序,它消耗一个主题,处理这些消息,并将它们发送到另一个主题。尽我所能......
流运行一段时间后会出现以下错误?我无法找到谁负责创建.sst文件?环境:Kafka版本0.10.0-cp1 scala 2.11.8 org.apache ....
我一直在监控4台机器的Kafka集群上的指标。我有一个输入应用程序写入消息到Kafka和几个Kafka Streams应用程序处理这些消息和...
我创建了一个Kafka主题并向其推送了一条消息。所以bin / kafka-console-consumer --bootstrap-server abc.xyz.com:9092-topic myTopic --from-beginning --property print.key = true --property key ....
max.poll.intervals.ms默认设置为int.Max
Apache Kafka文档说明:内部Kafka Streams使用者max.poll.interval.ms默认值已从300000更改为Integer.MAX_VALUE因为此值用于检测何时...
我使用Processor API创建kafka流应用程序。这是我如何创建一个主题,将时间戳附加到所有传入的消息kafka-topics.sh --create --zookeeper localhost:2181 --...
我需要知道如何在我的kafka KStreams行中使用'for'循环...下面是我的'for'循环,需要包含在KStreams中(int i = 0; i <6; i ++){try { ...
KafkaStream createTopic不尊重Kafka服务器的auto.create.topics.enable设置
我们有一个生产Kafka集群,最近受到了一系列新主题的污染。 Kafka群集具有以下设置:auto.create.topics.enable = false delete.topic.enable = false ...
如今,我正在开发一个使用Kafka Streams处理消息的项目。我们的消息由两个标识符组成,其中一个用于用户标识符,第二个用于消息标识符...