嗨,flink 用户/贡献者,
我在这里发帖,看看有没有更聪明的想法。
我正在使用 Flink 以 Avro 通用记录格式使用 kafka 主题,使用通用记录反序列化器,然后在其上创建一个 Flink 表。
由于Generic类型的数据流,为了避免RAW数据类型(像
f0
RAW('org.apache.avro.generic.GenericRecord', '...') 在我的表中,我终于找到了一种使用 returns(TypeInformation) 来推断底层模式的方法,感谢这个 post.
基于这个数据流的表模式正是我想要的:
(
`common` ROW<`eventType` STRING, `orderNumber` STRING, `orderLineId` STRING>,
`rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
)
然而,当我试图打印出消耗的数据流时,它没有输出任何东西,也没有错误。我以前看到没有
returns
的输出。
有人知道原因吗?提前谢谢你!
这是我的代码:
var kafkaReader =
KafkaReader.\<GenericRecord\>builder()
.config(config)
.topics(sourceTopic)
.deserializationSchema(
new ConfluentRegistryGenericAvro(
schema,
config.getSchemaRegistryOptions().registryUrl,
config.getSchemaRegistryOptions().toMap()))
.build();
TypeInformation\<GenericRecord\> datatype =
AvroSchemaConverter.convertToTypeInfo(schema.toString());
var stream =
streamExecutionEnvironment
.fromSource(
kafkaReader.kafkaSource(),
WatermarkStrategy.noWatermarks(),
sourceTopic)
.returns(datatype);
//print nothing
stream.printToErr();
returns
添加到数据流中,以便 flink 推断通用记录流的底层模式。