假设 Quarkus Kafka Streams 应用程序通过物化进行某种重新键入和聚合,如下所示:
final KTable<Long, GroupedCommunications> groupedCommunications = communications //
.groupBy( //
(key, value) -> KeyValue.pair(value.personId(), value), //
Grouped.with( //
"communications-grouped", //
longKeySerde, //
communicationSerde)) //
.aggregate( //
GroupedCommunications::new, //
(key, value, aggregate) -> aggregate.add(value), //
(key, value, aggregate) -> aggregate.remove(value), //
Named.as("communications-grouped-aggregated"), //
Materialized //
.<Long, GroupedCommunications, KeyValueStore<Bytes, byte[]>>as("communications-grouped-aggregated-materialized") //
.withKeySerde(longKeySerde) //
.withValueSerde(groupedCommunicationSerde));
这会隐式创建两个 Kafka 主题
communications-grouped-repartition
和 communications-grouped-aggregated-materialized-changelog
,压缩类型为 producer
。
如何将其更改为
lz4
?设置 kafka-streams.producer.compression-type=lz4
没有效果。
“没有影响”是什么意思?数据不是生产者用lz4写的吗?或者你只是问主题配置本身?
压缩可以在主题级别配置和生产者级别配置中进行配置,并且这两种配置都可以“交互”。最后,将压缩类型设置为“生产者”的主题配置告诉代理将数据写入主题,仅接受生产者选择的格式 - 如果生产者配置为 lz4,则数据将以 lz4 写入。
如果更改主题级别压缩配置,代理可能需要重新压缩数据以匹配所需的格式,以防生产者配置了不同的压缩格式,因此,将压缩格式保持在“生产者”通常是最好的选择最好避免重新压缩开销。例如,如果生产者压缩已关闭,则代理将在写入数据之前使用 lz4 进行压缩 - 如果生产者压缩设置为 snappy,则代理将首先解压缩数据,然后在写入数据之前重新压缩为 lz4。
使用
producer.
前缀只会更改生产者配置(而不是主题配置),即生产者如何压缩数据并发送给代理。你是说生产者没有按照配置使用lz4?
如果您确实愿意,您可以使用配置名称前缀
topic.
,例如 topic.compression-type=lz4
,将内部主题的特定于主题的配置添加到 Kafka Streams 配置中。这将更改主题级别配置,但是,如果生产者未配置压缩(默认情况下为“无”IIRC),则压缩现在只会发生在代理端,如上所述,很可能不是您想要的。