我有 avro 类的实例,我想编写一个自定义文件接收器,它将处理 avro 数据流。
val list = List.fill(1000) {
Person.newBuilder() // avro class
.setName("Test")
.setAge(44)
.build()
}
val avroStream: DataStream[Person] = env.fromCollection(list)
avroStream.addSink(new ParquetFileSink("output/demoFileSink.txt"))
env.execute()
我尝试将 RichSinkFunction 与参数 avro 类型一起使用
class ParquetFileSink(path: String) extends RichSinkFunction[Person] {
var writer: PrintWriter = _
override def invoke(event: Person, context: SinkFunction.Context): Unit = {
println(event)
writer.println(event)
writer.flush()
}
但是面对异常:
Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo
at org.apache.flink.formats.avro.typeutils.AvroTypeInfo.generateFieldsFromAvroSchema(AvroTypeInfo.java:72)