在将 returns(TypeInformation) 应用到我的 avro 通用 DataStreamSource 之后,它什么都不打印,但没有错误

问题描述 投票:0回答:0

嗨,flink 用户/贡献者,

我在这里发帖,看看有没有更聪明的想法。

  • 我正在使用 Flink 以 Avro 通用记录格式使用 kafka 主题,使用通用记录反序列化器,然后在其上创建一个 Flink 表。

  • 由于Generic类型的数据流,为了避免RAW数据类型(像

    f0
    RAW('org.apache.avro.generic.GenericRecord', '...') 在我的表中,我终于找到了一种使用 returns(TypeInformation) 来推断底层模式的方法,感谢这个 post.

  • 基于这个数据流的表模式正是我想要的:

(
  `common` ROW<`eventType` STRING, `orderNumber` STRING, `orderLineId` STRING>,
  `rowtime` TIMESTAMP_LTZ(3) *ROWTIME* METADATA,
  WATERMARK FOR `rowtime`: TIMESTAMP_LTZ(3) AS SOURCE_WATERMARK()
)

然而,当我试图打印出消耗的数据流时,它没有输出任何东西,也没有错误。我以前看到没有

returns
的输出。 有人知道原因吗?提前谢谢你!

这是我的代码:


var kafkaReader =
KafkaReader.\<GenericRecord\>builder()
           .config(config)
           .topics(sourceTopic)
           .deserializationSchema(
               new ConfluentRegistryGenericAvro(
                    schema,
                    config.getSchemaRegistryOptions().registryUrl,
                    config.getSchemaRegistryOptions().toMap()))
               .build();
TypeInformation\<GenericRecord\> datatype =
AvroSchemaConverter.convertToTypeInfo(schema.toString());

var stream =
streamExecutionEnvironment
       .fromSource(
           kafkaReader.kafkaSource(),
           WatermarkStrategy.noWatermarks(),
           sourceTopic)
       .returns(datatype);
//print nothing
stream.printToErr();
  1. 试图将
    returns
    添加到数据流中,以便 flink 推断通用记录流的底层模式。
  2. 在我消费主题时期待一些输出
  3. 但它什么也没打印出来
apache-flink avro
© www.soinside.com 2019 - 2024. All rights reserved.