很长时间以来,我一直在研究卡夫卡流。我陷入了一个问题,无法解决。我希望这个平台会有所帮助。所以场景就是这样。有许多客户和许多为这些客户工作的员工。有一个主题存储所有这些数据。我想串流那个话题并希望以一种将同一客户下的所有员工归为一组的方式建立计量商店。我正在使用开源ListSerializer和ListDeserializer类来完成这项工作。
所以主题定义是这样的。 c代表客户,e代表员工。
key value
c1-e1 e1
c1-e2 e2
c1-e3 e3
c2-e1 e4
c2-e2 e5
c3-e3 e6
我想要这个
c1 - [e1, e2, e3]
c2 - [e4, e5, e6]
我已经使用实现ArrayListtSerializer
的Serializer<ArrayList<T>>
类,这里T是Employee
的对象。 ArrayListDeserializer
实现Deserializer<ArrayList<T>>
。
我已经在下面编写了代码来实现此目的,并且代码工作正常。它从存储中的主题和数据中读取数据,其中以客户ID为键,并以员工列表作为该键的值。
KStream<String, Employee> source = builder.stream(topicName, consumed.with(Serdes.String(), getSerdeForObject(Employee));
Serde<Employee> employeeSerde = getSerdeForObject(Employee);
Serde<ArrayList<Employee>> employeeArrayListSerde = Serdes.serdeFrom(new ArrayListtSerializer<>(employeeSerde.serializer()), new ArrayListDeserializer<>(employeeSerde.deserializer()));
source.groupBy((k, v) -> v.getCustomer())
.aggregate(() -> new ArrayList<>(), (key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, ArrayList<Employee>, KeyValueStore<Bytes, byte[]>>as("NewStore")
.withValueSerde(employeeArrayListSerde));
现在,我正在努力使列表序列化器和列表反序列化器类成为通用类。这样ArrayListtSerializer
必须是ListtSerializer
,它将实现Serializer<List<T>>
而不是Serializer<ArrayList<T>>
,反之亦然,列表反序列化器也是如此。我已经将ArrayListtSerializer
和ArrayListDeserializer
类都转换为这些ListtSerializer
和ListDeserializer
,并将代码更改为此。
KStream<String, Employee> source = builder.stream(topicName, consumed.with(Serdes.String(), getSerdeForObject(Employee));
Serde<Employee> employeeSerde = getSerdeForObject(Employee);
Serde<List<Employee>> employeeListSerde = Serdes.serdeFrom(new ListtSerializer<>(employeeSerde.serializer()), new ListDeserializer<>(employeeSerde.deserializer()));
source.groupBy((k, v) -> v.getCustomer())
.aggregate(() -> new ArrayList<>(), (key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, List<Employee>, KeyValueStore<Bytes, byte[]>>as("NewStore")
.withValueSerde(employeeListSerde));
更改我的代码后,流媒体不断进入错误状态,并且应用程序无法启动。我低于以下错误。
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to t
he actual key or value type (key type: java.lang.String / value type: Employee). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
... 6 more
Caused by: java.lang.ClassCastException: class Employee cannot be cast to class [B (Employee is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 25 more
我花了足够的时间来理解这个问题,但是我仍然无法解决这个问题。有人可以帮我解决这个问题。任何帮助将不胜感激。谢谢
原因:org.apache.kafka.streams.errors.StreamsException:序列化器
键:org.apache.kafka.common.serialization.StringSerializer
值:org.apache.kafka.common.serialization.ByteArraySerializer
与实际键或值类型不兼容
键类型:java.lang.String
值类型:雇员
更改StreamConfig中的默认Serdes或提供通过方法参数正确的Serdes。
错误表明您正在尝试使用ByteArraySerializer
来序列化Employee
类。ByteArraySerializer
只需获取byte[]
并将其发送(这里没有要序列化的内容)。
因此,您需要将Employee
转换为byte[]
并发送它,或为您的Employee
类使用适当的序列化程序,即采用Employee
对象并将其转换为byte[]
的东西。] >
您可以将Employee
对象序列化/反序列化为JSON或Avro(仅举几例)。
例如,您可以为所有自定义对象编写通用JSON序列化器和反序列化器。
请参见Spring中的JsonSerializer,JsonDeserializer(即使您不使用Spring,也可以查看code)] >>
然后从序列化器和反序列化器中创建Serde。