StreamingFileSink forBulkFormat vs forRowFormat with avro encoder

问题描述 投票:0回答:1

我有 4 版本的 Flink,想更新到 11。我尝试使用

StreamingFileSink
而不是弃用的
BucketingSink
。我的代码看起来像:

val sink = StreamingFileSink
  .forBulkFormat(new Path(s"${conf.output}/$path") , AvroWriters.forReflectRecord[T](clazz) )
  .withBucketCheckInterval(toMillis(config.inactiveBucketThreshold))
  .withBucketAssigner(bucketAssigner)
  .build()

但是我在测试中写入 fs 时遇到问题。

使用

StreamingFileSink.forBulkFormat
代替
StreamingFileSink.forRowFormat
与 avro 编码器有哪些优势?

你能帮忙写一个使用second way的例子吗?

apache-flink
1个回答
0
投票

有关详细信息,请参阅以下“行格式或批量格式”线程

简而言之,至少到

1.11
版本,flink只提供
BulkWriter
avro格式的工厂-
AvroWriterFactory
,您可以开箱即用。

要使用行格式 -

StreamingFileSink.forRowFormat
- 您需要提供自己的
org.apache.flink.api.common.serialization.Encoder
接口实现,该接口将能够逐条记录地编码数据并将数据附加到部分文件。

举个例子,你可以看看

org.apache.flink.formats.json.JsonFileSystemFormatFactory.JsonRowDataEncoder
.

© www.soinside.com 2019 - 2024. All rights reserved.