Dataset 将其传递到基于MapFunction接口的类Java Spark时为null

问题描述 投票:2回答:1

概述

我正在与Java Spark合作以计算大量数据。我正在将称为Dataset的许多.DAT文件的内容加载到feeders中。这些.DAT文件除其他字段外还包含时间戳(从1970年1月1日开始,以秒为单位),这些数据开始记录]。在这里,文件中的每一行都代表一秒钟的时间。开始记录数据时的.DAT file 1546297200(2018年12月31日,星期一,格林尼治标准时间晚上11:00:00)的示例如下:


id & deltaRSTs| timestamp| 
              |          |
SQXCXBAXY4P-02,1546297200,825,2065,391
1,0,-8,0      |1546297200|
1,0,-2,0      |1546297201|
1,0,0,0       |1546297202|
1,0,10,0      |1546297203|
1,0,-6,0      |1546297204|
1,0,-4,0      |1546297205|
1,0,0,0       ... 
1,0,6,0       ...
1,0,1,0       ...
1,0,-8,0      ...

[另一方面,我还有另一个Dataset,其中包含有关某些电气设备的信息(以.csv格式)。这里的重要部分是,这样的设备创建了一组带有不同时间戳(从1970年1月1日起经过的秒数)的事件(用EVT来称呼它)。

我想从.DAT的文件中获取满足特定时间条件的所有行:给定一个事件,考虑所有DAT的行,使EVT(timestamp)位于DAT(timestamp)

创建窗口的偏移量,即:
maxEpoch = DAT(timestamp) + rows_of_DAT
DAT(timestamp) + offset <= EVT(timestamp) && EVT(timestamp) <= maxEpoch - offset

不用担心,如果您不完全了解这一点,那是为了提供一些背景知识。但是您必须掌握这个主意。

问题

我将介绍我认为适合解决上述情况的类:

CLASS ReadCSV(主):

public class ReadCSV {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs

        Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs

        Dataset<ProcessEvRow> processEvRowDataset = eventCSVRowDataset
            .map(new ProcessEvTransformation(feederFileRowDataset), Encoders.bean(ProcessEvRow.class))

    }
}

在此类中注意,当创建类型为ProcessEvTransformation的对象时,我将所有Dataset行的DAT作为参数传递。

CLASS ProcessEvTransformation

public class ProcessEvTransformation implements MapFunction<EventCSVRow, ProcessEvRow> {

    private Dataset<FeederFile> feederFileDataset;
    private int offset = 40;

    public ProcessEvTransformation(Dataset<FeederFile> feederFileDataset) {
        this.feederFileDataset = feederFileDataset;
        // I did here, this.feederFileDataset.show(); and it was successfull
    }

    public ProcessEvTransformation withOffset(int offset) {
        this.offset = offset;
        return this;
    }

    @Override
    public ProcessEvRow call(EventCSVRow eventCSVRow) throws Exception {
        String stdPattern = ...
        String rejectedFlag = ...
        Dataset<FeederFile> deltaRSTs = this.feederFileDataset
                .filter(feederFileRow -> {
                    final long epochTime = Long.parseLong(eventCSVRow.getEpochTime());
                    final long maxEpoch = Long.parseLong(feederFileRow.getEpoch()) + feederFileRow.getLineCount();
                    return Long.parseLong(feederFileRow.getEpoch()) + offset <= epochTime && epochTime <= maxEpoch - offset;
                });

        String[] rstDistances = getRstDistancesAndMinimum(deltaRSTs, eventCSVRow.getIncrements()); // whatever algorithmic procedure
        ...
    }
}

这里的问题是我得到了NullPointerException,因为有些feederFileDataset属性为空。奇怪的是,我很确定它已达到100%的定义,但是当调用call方法时,它变成空值或Invalid tree; null:(打印时显示的消息)]

问题和结论

  • 有人知道如何成功地将Dataset作为参数传递给基于MapFunction接口的类吗?
  • 为什么Dataset正确传递后变成无效的东西?它与Java Spark的内部过程有关吗?
  • 我希望我已经清楚了。感谢您提供的任何帮助。

    最良好的祝愿,托马斯。

概述我正在使用Java Spark来计算大量数据。我正在将许多名为feeder的.DAT文件的内容加载到数据集中。这些.DAT文件除其他字段外,还包含...

java apache-spark apache-spark-sql
1个回答
0
投票

由于必须保留所有数据,因此我建议使用crossJoin方法。请注意,此方法非常昂贵。

public class ReadCSV {

    private static final SparkSession spark = new SparkSession
            .Builder()
            .master("local[*]")
            .getOrCreate();

    public static void main(String[] args) {

        spark.sparkContext().setLogLevel("ERROR");

        Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs

        Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs

        Dataset<Andamyo> joined = eventCSVRowDataset
            .crossJoin(feederFileRowDataset).as(Encoders.bean(Andamyo.class))
            .filter(andamyo -> {
                final long eventEpoch = Long.parseLong(andamyo.getEventEpoch());
                final long maxEpoch = Long.parseLong(andamyo.getFeederEpoch()) + andamyo.getLineCount();
                return Long.parseLong(andamyo.getFeederEpoch()) <= eventEpoch && eventEpoch <= maxEpoch;
            });

    }
}
© www.soinside.com 2019 - 2024. All rights reserved.