我有 4 版本的 Flink,想更新到 11。我尝试使用
StreamingFileSink
而不是弃用的 BucketingSink
。我的代码看起来像:
val sink = StreamingFileSink
.forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](clazz) )
.withBucketCheckInterval(toMillis(config.inactiveBucketThreshold))
.withBucketAssigner(bucketAssigner)
.build()
但是我在测试中写入 fs 时遇到问题。
使用
StreamingFileSink.forBulkFormat
代替 StreamingFileSink.forRowFormat
与 avro 编码器有哪些优势?
你能帮忙写一个使用second way的例子吗?
有关详细信息,请参阅以下“行格式或批量格式”线程。
简而言之,至少到
1.11
版本,flink只提供BulkWriter
avro格式的工厂-AvroWriterFactory
,您可以开箱即用。
要使用行格式 -
StreamingFileSink.forRowFormat
- 您需要提供自己的 org.apache.flink.api.common.serialization.Encoder
接口实现,该接口将能够逐条记录地编码数据并将数据附加到部分文件。
举个例子,你可以看看
org.apache.flink.formats.json.JsonFileSystemFormatFactory.JsonRowDataEncoder
.