与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
在一个简单的Kafka Stream程序中,当我使用下面的代码时,它可以工作而不会抛出任何错误:KTable result = source.mapValues(textLine - > textLine ....
Kafka Streams with processing.guarantee设置为EXACTLY_ONCE问题
我正在我的系统上使用3个(dockerized)kafka代理开发环境。经纪人将transaction.state.log.replication.factor设置为3.在流应用程序配置中我设置了...
根据文档(https://docs.confluent.io/current/streams/developer-guide/manage-topics.html#internal-topics),内部主题遵循命名约定 -
我有一个流媒体应用程序,连续接收坐标流以及一些自定义元数据,其中还包括一个位串。使用...将该流生成到kafka主题上
Kafka - 根据标题值将消息从“主题A”重定向到“主题B”
我想将kafka消息从名为“all-topic”的主题重定向到名为“headervalue-topic”的主题,其中headervalue是每条消息所具有的自定义标头的值。此刻我...
有没有办法在Apache Kafka 2.0中优先处理消息?
编辑如果其他人处于这种特殊情况,我会在调整消费者配置后获得类似于我所寻找的东西。我创建了一个发送优先级的制作人......
我有一个接收记录的Kafka流,我想根据特定字段连接消息。流中的消息如下所示:密钥:2099 Payload {email:[email protected] ...
Kafka Streams:在标点符号函数中生成的消息的完全一次语义
我想使用Kafka Streams Processor API并在计划的标点符函数中每分钟生成一些消息。 Kafka Streams可以保证将这些消息写入输出中......
无法使用云kafka流将autoCommitOffset设置为true
我使用spring-cloud-starter-stream-kafka-1.3.3和Spring-cloud-stream与Spring Boot连接到Kafka,它包含我想要enable.auto的典型Publisher Subscriber用例....
当我尝试从流访问状态立体声时,我得到以下错误,当我试图从商店访问ReadOnlyKeyValueStore时,状态存储,计数存储,可能已迁移到另一个实例,...
我有业务需求,我需要12小时的窗口,并且需要查询流数据。 12小时内的音量约为100M。另外我需要保持...的顺序
我想调整Kafka Streams的性能,为此我必须使用RocksDb配置值。我看到我可以使用StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG来设置......
我试图控制KStream消耗的消息数量,但我不是很成功。我正在使用:max.poll.interval.ms = 100和max.poll.records = 20来获得每条200条消息...
如何限制kafka-streams中的rocksdb内存使用量
我正在使用kafka-streams,并且off heap内存使用量增长到了机器的物理极限。但是当在docker中运行kafka-streams时,内存使用量会超过容器的限制......
UnsatisfiedLinkError:/tmp/snappy-1.1.4-libsnappyjava.so加载共享库时出错ld-linux-x86-64.so.2:没有这样的文件或目录
我正在尝试在kubernetes中运行Kafka Streams应用程序。当我启动pod时,我得到以下异常:线程中的异常“streams-pipe-e19c2d9a-d403-4944-8d26-0ef27ed5c057-StreamThread -...
Kafka Streams:ConsumerRebalanceListener实现
你能否告诉我们如何在流配置中注册以下类? public class MyConsumerRebalanceListener实现ConsumerRebalanceListener {static final Logger oLogger = ...
是否应该使用GlobalKTable为本地查询实施Kafka事件携带状态转移系统?
事件携带状态转移消除了远程调用以查询来自其他服务的信息的需要。让我们假设一个实际案例:我们有一个发布的客户服务......
我需要聚合一些传感器数据,我正在尝试使用窗口聚合来进行PoC /研究项目。在研究了这里和那里以及很多尝试之后,我想出了以下代码,......
如何在两个Kafka Streams之间使用持久化StateStore
我在尝试通过Kafka Streams实现以下方面遇到了一些麻烦:在应用程序启动时,(压缩的)主题alpha被加载到Key-Value StateStore映射中Kafka Stream消耗...
如何在Kstream DSL .transform()方法中访问所有statestore数据
尝试访问定义的statestore中的所有键值,但是在.transform()方法中,我只能使用一个键(源键)访问KeyValueStore SS =背景....