将函数的使用者转换为 Stream JavaRdd

问题描述 投票:0回答:1

消费者applyMapping后如何处理数据流?还是按1000个一包加工?

private JavaRDD<TracingOdsProjection> enrichWithExternalData(JavaRDD<ExternalDto<T>> rdd) {
    return rdd.flatMap(rec -> {
        List<Dto> res = new ArrayList<>();
        new Library().applyMapping(dto -> res.add(dto));

        return res.iterator();
    });
}

我尝试了 rdd.foreach 或 rdd.foreachPartition 收集 1000,然后 SparkContext.parallelize(res)。

SparkSession sparkSession = SparkSession.builder().config(jssc.sparkContext().getConf()).getOrCreate();
try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext())) {
 final JavaRDD<TracingOdsProjection> packOfMappingsJRdd = sparkContext.parallelize(res);
   } catch (Exception e) {
 log.error("On create or useing spark context: ", e);
   }

但是 JavaSparkContext 的任务不可序列化。当调用 jssc.sparkContext() 时

我无法在 applyMapping(..) 之后收集到一个 Arraylist 中,因为它太大了。

java apache-spark spark-streaming
1个回答
0
投票

new Library() 必须实现 Iterator、Iterable 接口。 接下来 - 返回新元素。

不要使用“res.add(dto)”!请改用迭代器!

© www.soinside.com 2019 - 2024. All rights reserved.