我收到了 Kafka 数据,看起来像
("field1", "field2")
。所以,现在我尝试使用 Pyflink 1.17.1
通过 custom deserializer
解析此输入,如下所示,参考 this 链接:
from pyflink.common import DeserializationSchema, Types, TypeInformation
from model.exceptions import SystemException
class StringToTupleDeserializationSchema(DeserializationSchema):
def __init__(self):
super().__init__()
def deserialize(self, message):
parts = message.split(',')
try:
if len(parts) == 2:
return (parts[0], parts[1])
except Exception as e:
raise SystemException(e)
def get_produced_type(self):
return TypeInformation.of((Types.STRING(), Types.STRING()))
现在,我不再是
SimpleStringSchema()
,而是将该类作为 KafkaSource 的输入传递,如下所示:
source = KafkaSource.builder() \
.set_bootstrap_servers("localhost:9092") \
.set_topics("test-topic1") \
.set_group_id("my-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(StringToTupleDeserializationSchema()) \
.build()
ds = self.env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
但这会引发错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o25.fromSource.
: java.lang.NullPointerException
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.getProducedType(KafkaValueOnlyDeserializationSchemaWrapper.java:56)
at org.apache.flink.connector.kafka.source.KafkaSource.getProducedType(KafkaSource.java:216)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getTypeInfo(StreamExecutionEnvironment.java:2643)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromSource(StreamExecutionEnvironment.java:2015)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
我可以使用
string.strip
和 string.split
获取我想要的元素,但这不是访问元素的有效方法。
我在这里缺少什么?
蒂亚