Flink DataStream如何将自定义POJO合并到另一个DataStream中

问题描述 投票:0回答:1

我想将数据流转换为具有模式信息的数据流

输入

args [0] DataStream

{"fields":["China","Beijing"]}

args [1]模式

message spark_schema {
  optional binary country (UTF8);
  optional binary city (UTF8);
}

期望输出

{"country":"china", "city":"beijing"}

我这样的代码

public DataStream<String> convert(DataStream source, MessageType messageType) {

        SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
            JSONObject data = new JSONObject();
            this.fields = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
            for (int i = 0; i < fields.size(); i++) {
                data.put(fields.get(i), row.getField(i));
            }
            return data.toJSONString();
        });
        return dataWithSchema;
    }

异常错误

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.xxxx.ParquetDataSourceReader$$Lambda$64/1174881426@d78795 is not serializable
    at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:180)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1823)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:188)
    at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:590)

但是下面的代码可以正常工作

public DataStream<String> convert(DataStream source, MessageType messageType) {
        if (this.fields == null) {
            throw new RuntimeException("The schema of AbstractRowStreamReader is null");
        }

        List<String> field = messageType.getFields().stream().map(Type::getName).collect(Collectors.toList());
        SingleOutputStreamOperator<String> dataWithSchema = source.map((MapFunction<Row, String>) row -> {
            JSONObject data = new JSONObject();
            for (int i = 0; i < field.size(); i++) {
                data.put(field.get(i), row.getField(i));
            }
            return data.toJSONString();
        });
        return dataWithSchema;
    }

Flink映射运算符如何组合外部复杂POJO?

java serialization apache-flink converters
1个回答
0
投票

为了使Flink在各个任务之间分配代码,代码必须完全为Serializable。在您的第一个示例中,它不是。在第二个是。特别地,Type::getName将生成不是Serializable的lambda。

[要获得Serializable的lambda,您需要将其显式转换为可序列化的接口(例如Flink MapFunction)或将其与(Serializable & Function)一起使用

由于第二个也节省了计算,因此在任何情况下都更好。在作业编译期间,转换将仅执行一次,而每条记录将调用DataStream#map。如果不清楚,我建议在IDE中执行它并使用断点。

© www.soinside.com 2019 - 2024. All rights reserved.