我的项目需要使用 apache flink(版本 1.18.0)的接收器功能将 csv 文件写入 S3 存储桶,其中包含标头。使用的编程语言是java。 Hadoop 文件系统通过名为“flink-s3-fs-hadoop”的 flink 库使用。
CSV 数据示例是:
student_id,exam_id,subject,score,grade
1,1,Math,41,D
1,1,Spanish,51,C
下面的代码可以工作,但它没有将标题写入 S3 中的 csv 文件。 使用的代码片段:
private static final StreamExecutionEnvironment ENV;
private static final StreamTableEnvironment TABLE_ENV;
static {
ENV = StreamExecutionEnvironment.getExecutionEnvironment()
.setRuntimeMode(RuntimeExecutionMode.BATCH)
.setParallelism(1);
TABLE_ENV = StreamTableEnvironment.create(ENV);
}
DataType dataType = DataTypes.ROW(
DataTypes.FIELD("student_id", DataTypes.INT()),
DataTypes.FIELD("exam_id", DataTypes.INT()),
DataTypes.FIELD("subject", DataTypes.STRING()),
DataTypes.FIELD("score", DataTypes.INT()),
DataTypes.FIELD("grade", DataTypes.STRING())
);
RowType rowType = (RowType) dataType.getLogicalType();
CsvRowDataSerializationSchema serSchemaBuilder =
new CsvRowDataSerializationSchema.Builder(rowType).build();
FileSink<RowData> sink = FileSink.forRowFormat(new Path(s3FilePath), new SerializationSchemaAdapter(serSchemaBuilder))
.withOutputFileConfig(new OutputFileConfig("test", ".csv"))
.build();
rowData.sinkTo(sink);
ENV.execute();
S3 中 CSV 文件的预期输出:
student_id,exam_id,subject,score,grade
1,1,Math,41,D
1,1,Spanish,51,C
S3 中 CSV 文件的实际输出:
1,1,Math,41,D
1,1,Spanish,51,C
任何人都可以告诉我是否有一种方法可以使用接收器功能编写带标题的 csv 文件吗?
我为此使用的解决方案是实现我自己的
CsvEncoder
类,该类实现了 Flink 的 Encoder<T>
。然后我可以像这样使用它:
dataStream.sinkTo(
FileSink.forRowFormat(new Path(outputDirectory),
new CsvEncoder<MyType>())
.build())
在 CsvEncoder 内部,我使用了 Apache Commons CSV 支持类,通过:
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;