消费者applyMapping后如何处理数据流?还是按1000个一包加工?
private JavaRDD<TracingOdsProjection> enrichWithExternalData(JavaRDD<ExternalDto<T>> rdd) {
return rdd.flatMap(rec -> {
List<Dto> res = new ArrayList<>();
new Library().applyMapping(dto -> res.add(dto));
return res.iterator();
});
}
我尝试了 rdd.foreach 或 rdd.foreachPartition 收集 1000,然后 SparkContext.parallelize(res)。
SparkSession sparkSession = SparkSession.builder().config(jssc.sparkContext().getConf()).getOrCreate();
try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkSession.sparkContext())) {
final JavaRDD<TracingOdsProjection> packOfMappingsJRdd = sparkContext.parallelize(res);
} catch (Exception e) {
log.error("On create or useing spark context: ", e);
}
但是 JavaSparkContext 的任务不可序列化。当调用 jssc.sparkContext() 时
我无法在 applyMapping(..) 之后收集到一个 Arraylist 中,因为它太大了。
new Library() 必须实现 Iterator、Iterable 接口。 接下来 - 返回新元素。
不要使用“res.add(dto)”!请改用迭代器!