CustomDeserializer 无法在 pyflink 中接受字符串作为元组

问题描述 投票:0回答:1

我收到了 Kafka 数据,看起来像

("field1", "field2")
。所以,现在我尝试使用
Pyflink 1.17.1
通过
custom deserializer
解析此输入,如下所示,参考 this 链接:

from pyflink.common import DeserializationSchema, Types, TypeInformation
from model.exceptions import SystemException

class StringToTupleDeserializationSchema(DeserializationSchema):
    
    def __init__(self):
        super().__init__()

    def deserialize(self, message):
        parts = message.split(',')
        try:
            if len(parts) == 2:
                return (parts[0], parts[1])
        except Exception as e:
            raise SystemException(e)

    def get_produced_type(self):
        return TypeInformation.of((Types.STRING(), Types.STRING()))

现在,我不再是

SimpleStringSchema()
,而是将该类作为 KafkaSource 的输入传递,如下所示:

source = KafkaSource.builder() \
            .set_bootstrap_servers("localhost:9092") \
            .set_topics("test-topic1") \
            .set_group_id("my-group") \
            .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
            .set_value_only_deserializer(StringToTupleDeserializationSchema()) \
            .build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
       

但这会引发错误

py4j.protocol.Py4JJavaError: An error occurred while calling o25.fromSource.
: java.lang.NullPointerException
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
    at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2643)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2015)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

我可以使用

string.strip
string.split
获取我想要的元素,但这不是访问元素的有效方法。

我在这里缺少什么?

蒂亚

apache-flink pyflink
1个回答
0
投票

Kafka 消息不是字符串而是字节,它们必须首先转换为字符串吗?

当您检查

SimpleStringSchema
JsonRowDeserializationSchema
时,它们利用相关的JAVA类,我猜这些类在内部将字节转换为字符串。

我建议使用

SimpleStringSchema
反序列化值并应用映射函数将其拆分为元组。

© www.soinside.com 2019 - 2024. All rights reserved.