Flink Avro 期望类型是 PojoTypeInfo

问题描述 投票:0回答:0

我有 avro 类的实例,我想编写一个自定义文件接收器,它将处理 avro 数据流。

val list = List.fill(1000) {
  Person.newBuilder() // avro class
    .setName("Test")
    .setAge(44)
    .build()
}
    val avroStream: DataStream[Person] = env.fromCollection(list)

    avroStream.addSink(new ParquetFileSink("output/demoFileSink.txt"))
    env.execute()

我尝试将 RichSinkFunction 与参数 avro 类型一起使用

  class ParquetFileSink(path: String) extends RichSinkFunction[Person] {
    var writer: PrintWriter = _

    override def invoke(event: Person, context: SinkFunction.Context): Unit = {
      println(event)
      writer.println(event)
      writer.flush()
    }

但是面对异常:

Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
    at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:72)
apache-flink avro
© www.soinside.com 2019 - 2024. All rights reserved.