我有kafka生产者配置,直到这次我将密钥作为String类型发送,并配置了SERIALIZER密钥,如下所示,
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ObjectSerializer.class.getName());
但是我们有一个要求,即某个时间键也可能很长,因此,如果不创建另一个生产者配置,我们仍然可以像ObjectSerializer一样配置上面的东西吗?像这样的东西存在吗?
版本2.3引入了DelegatingSerializer和DelegatingDeserializer,它们允许产生和使用具有不同键和/或值类型的记录。生产者必须将标头DelegatingSerializer.SERIALIZATION_SELECTOR设置为选择器值,该选择器值用于选择要使用的序列化器。如果找不到匹配项,则抛出IllegalStateException。
[需要一个标头来告诉它要使用哪个序列化程序,但是您可以编写一个简单的版本,仅查看密钥类型。
((我想我会增强DelegatingSerializer
的功能)。
[另一种选择是为每个使用不同的生产者。
使用版本2.5,您可以在KafkaTemplate
上覆盖生产者工厂属性:
/**
* Create an instance using the supplied producer factory and properties, with
* autoFlush false. If the configOverrides is not null or empty, a new
* {@link DefaultKafkaProducerFactory} will be created with merged producer properties
* with the overrides being applied after the supplied factory's properties.
* @param producerFactory the producer factory.
* @param configOverrides producer configuration properties to override.
* @since 2.5
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {
因此,只需为每种类型定义不同的KafkaTemplate
。
使用旧版本,您将需要两个具有不同配置的生产者工厂。