为物化状态存储设置 lz4 主题压缩

问题描述 投票:0回答:1

假设 Quarkus Kafka Streams 应用程序通过物化进行某种重新键入和聚合,如下所示:

final KTable<Long, GroupedCommunications> groupedCommunications = communications //
        .groupBy( //
                (key, value) -> KeyValue.pair(value.personId(), value), //
                Grouped.with( //
                        "communications-grouped", //
                        longKeySerde, //
                        communicationSerde)) //
        .aggregate( //
                GroupedCommunications::new, //
                (key, value, aggregate) -> aggregate.add(value), //
                (key, value, aggregate) -> aggregate.remove(value), //
                Named.as("communications-grouped-aggregated"), //
                Materialized //
                        .<Long, GroupedCommunications, KeyValueStore<Bytes, byte[]>>as("communications-grouped-aggregated-materialized") //
                        .withKeySerde(longKeySerde) //
                        .withValueSerde(groupedCommunicationSerde));

这会隐式创建两个 Kafka 主题

communications-grouped-repartition
communications-grouped-aggregated-materialized-changelog
,压缩类型为
producer

如何将其更改为

lz4
?设置
kafka-streams.producer.compression-type=lz4
没有效果。

quarkus apache-kafka-streams
1个回答
0
投票

“没有影响”是什么意思?数据不是生产者用lz4写的吗?或者你只是问主题配置本身?

压缩可以在主题级别配置和生产者级别配置中进行配置,并且这两种配置都可以“交互”。最后,将压缩类型设置为“生产者”的主题配置告诉代理将数据写入主题,仅接受生产者选择的格式 - 如果生产者配置为 lz4,则数据将以 lz4 写入。

如果更改主题级别压缩配置,代理可能需要重新压缩数据以匹配所需的格式,以防生产者配置了不同的压缩格式,因此,将压缩格式保持在“生产者”通常是最好的选择最好避免重新压缩开销。例如,如果生产者压缩已关闭,则代理将在写入数据之前使用 lz4 进行压缩 - 如果生产者压缩设置为 snappy,则代理将首先解压缩数据,然后在写入数据之前重新压缩为 lz4。

使用

producer.
前缀只会更改生产者配置(而不是主题配置),即生产者如何压缩数据并发送给代理。你是说生产者没有按照配置使用lz4?

如果您确实愿意,您可以使用配置名称前缀

topic.
,例如
topic.compression-type=lz4
,将内部主题的特定于主题的配置添加到 Kafka Streams 配置中。这将更改主题级别配置,但是,如果生产者未配置压缩(默认情况下为“无”IIRC),则压缩现在只会发生在代理端,如上所述,很可能不是您想要的。

© www.soinside.com 2019 - 2024. All rights reserved.