Kstream-Ktable 与 CloudEvent 值的连接不起作用

问题描述 投票:0回答:1

我想在 kafka 流和 ktable 之间进行连接。 poc 可以很好地处理流数据。但是,当我使用 CloudEvent 时,我不断遇到与序列化相关的一些或其他问题。

这是我的代码示例 -

Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);

CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);

CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
Serde<CloudEvent> cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
KStream<String, CloudEvent> kStream = builder.stream("stream-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KTable<String, CloudEvent> kTable = builder.table("ktable-topic", Consumed.with(Serdes.String(), cloudEventSerde));
KStream<String, CloudEvent> joined = kStream
    .join(kTable, (left, right) -> CloudEventBuilder.v1().withId(left.getId().concat(right.getId())).build());
joined.to(output, Produced.with(Serdes.String(), eventsSerde));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamProps);
kafkaStreams.start();

我还尝试使用 WrapperSerde - 为 Kafka Streams 配置 Serdes 的问题

但是我不断遇到异常 -

18:12:08.691 [基本流-更新-0630c691-0080-4e02-8c85-7bff650f34e9-StreamThread-1] 错误 org.apache.kafka.streams.KafkaStreams - 流客户端 [基本流更新-0630c691-0080-4e02-8c85-7bff650f34e9] 处理过程中遇到如下异常 注册的异常处理程序选择 SHUTDOWN_CLIENT。溪流 客户端现在要关闭。 org.apache.kafka.streams.errors.StreamsException:异常捕获 过程。任务 ID=0_0,处理器=KSTREAM-SOURCE-0000000002, 主题=cloudevent-ktable,分区=0,偏移=80, stacktrace=java.lang.UnsupportedOperationException: CloudEventSerializer仅支持签名serialize(String, 标头、CloudEvent)

原因:java.lang.UnsupportedOperationException: CloudEventSerializer仅支持签名serialize(String, 标题、CloudEvent)位于 io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] 在 io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] 在 org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:73) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:30) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$4(MeteredKeyValueStore.java:200) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:200) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:120) 〜[kafka-streams-2.8.0.jar:?] 在 org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:122) ~[kafka-streams-2.8.0.jar:?]

有人成功使用 CloudEvent 和 ktable 吗?

apache-kafka apache-kafka-streams cloudevents
1个回答
0
投票

你的堆栈跟踪是这样说的:

Caused by: java.lang.UnsupportedOperationException: CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent) at 
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:84) ~[cloudevents-kafka-2.5.0.jar:?] at 
io.cloudevents.kafka.CloudEventSerializer.serialize(CloudEventSerializer.java:38) ~[cloudevents-kafka-2.5.0.jar:?] at 
org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:82) ~[kafka-streams-2.8.0.jar:?] at 

本质上就是这段代码:

    final byte[] rawValue = valueSerializer.serialize(topic, data);

因此,您使用的

CloudEventSerializer
不适合Kafka Streams:

public byte[] serialize(String topic, CloudEvent data) {
    throw new UnsupportedOperationException("CloudEventSerializer supports only the signature serialize(String, Headers, CloudEvent)");
}

这是因为 Kafka Streams 不支持序列化(反)序列化标头。

我建议您扩展该

CloudEventSerializer
并覆盖其
serialize(String topic, CloudEvent data)
以委托给具有空
serialize(String topic, Headers headers, CloudEvent data)
new RecordHeaders()

© www.soinside.com 2019 - 2024. All rights reserved.