概述
我正在与Java Spark合作以计算大量数据。我正在将称为Dataset
的许多.DAT
文件的内容加载到feeders
中。这些.DAT
文件除其他字段外还包含时间戳(从1970年1月1日开始,以秒为单位),这些数据开始记录]。在这里,文件中的每一行都代表一秒钟的时间。开始记录数据时的.DAT
file 1546297200(2018年12月31日,星期一,格林尼治标准时间晚上11:00:00)的示例如下:
创建窗口的偏移量,即:id & deltaRSTs| timestamp| | | SQXCXBAXY4P-02,1546297200,825,2065,391 1,0,-8,0 |1546297200| 1,0,-2,0 |1546297201| 1,0,0,0 |1546297202| 1,0,10,0 |1546297203| 1,0,-6,0 |1546297204| 1,0,-4,0 |1546297205| 1,0,0,0 ... 1,0,6,0 ... 1,0,1,0 ... 1,0,-8,0 ...
[另一方面,我还有另一个
Dataset
,其中包含有关某些电气设备的信息(以.csv
格式)。这里的重要部分是,这样的设备创建了一组带有不同时间戳(从1970年1月1日起经过的秒数)的事件(用EVT
来称呼它)。我想从
.DAT
的文件中获取满足特定时间条件的所有行:给定一个事件,考虑所有DAT
的行,使EVT(timestamp)
位于DAT(timestamp)
中
maxEpoch = DAT(timestamp) + rows_of_DAT DAT(timestamp) + offset <= EVT(timestamp) && EVT(timestamp) <= maxEpoch - offset
不用担心,如果您不完全了解这一点,那是为了提供一些背景知识。但是您必须掌握这个主意。
问题
我将介绍我认为适合解决上述情况的类:
CLASS ReadCSV
(主):
public class ReadCSV { private static final SparkSession spark = new SparkSession .Builder() .master("local[*]") .getOrCreate(); public static void main(String[] args) { spark.sparkContext().setLogLevel("ERROR"); Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs Dataset<ProcessEvRow> processEvRowDataset = eventCSVRowDataset .map(new ProcessEvTransformation(feederFileRowDataset), Encoders.bean(ProcessEvRow.class)) } }
在此类中注意,当创建类型为
ProcessEvTransformation
的对象时,我将所有Dataset
行的DAT
作为参数传递。CLASS
ProcessEvTransformation
:
public class ProcessEvTransformation implements MapFunction<EventCSVRow, ProcessEvRow> { private Dataset<FeederFile> feederFileDataset; private int offset = 40; public ProcessEvTransformation(Dataset<FeederFile> feederFileDataset) { this.feederFileDataset = feederFileDataset; // I did here, this.feederFileDataset.show(); and it was successfull } public ProcessEvTransformation withOffset(int offset) { this.offset = offset; return this; } @Override public ProcessEvRow call(EventCSVRow eventCSVRow) throws Exception { String stdPattern = ... String rejectedFlag = ... Dataset<FeederFile> deltaRSTs = this.feederFileDataset .filter(feederFileRow -> { final long epochTime = Long.parseLong(eventCSVRow.getEpochTime()); final long maxEpoch = Long.parseLong(feederFileRow.getEpoch()) + feederFileRow.getLineCount(); return Long.parseLong(feederFileRow.getEpoch()) + offset <= epochTime && epochTime <= maxEpoch - offset; }); String[] rstDistances = getRstDistancesAndMinimum(deltaRSTs, eventCSVRow.getIncrements()); // whatever algorithmic procedure ... } }
这里的问题是我得到了
NullPointerException
,因为有些feederFileDataset
属性为空。奇怪的是,我很确定它已达到100%的定义,但是当调用call
方法时,它变成空值或Invalid tree; null:
(打印时显示的消息)]问题和结论
Dataset
作为参数传递给基于MapFunction
接口的类吗?Dataset
正确传递后变成无效的东西?它与Java Spark的内部过程有关吗?我希望我已经清楚了。感谢您提供的任何帮助。
最良好的祝愿,托马斯。
概述我正在使用Java Spark来计算大量数据。我正在将许多名为feeder的.DAT文件的内容加载到数据集中。这些.DAT文件除其他字段外,还包含...
由于必须保留所有数据,因此我建议使用crossJoin
方法。请注意,此方法非常昂贵。
public class ReadCSV {
private static final SparkSession spark = new SparkSession
.Builder()
.master("local[*]")
.getOrCreate();
public static void main(String[] args) {
spark.sparkContext().setLogLevel("ERROR");
Dataset<EventCSVRow> eventCSVRowDataset = getEventCSVRow(".\\eventLog\\*.csv"); //events from the CSVs
Dataset<FeederFileRow> feederFileRowDataset = getFeederFileDataset(".\\feeder\\*.DAT"); // all rows from all .DATs
Dataset<Andamyo> joined = eventCSVRowDataset
.crossJoin(feederFileRowDataset).as(Encoders.bean(Andamyo.class))
.filter(andamyo -> {
final long eventEpoch = Long.parseLong(andamyo.getEventEpoch());
final long maxEpoch = Long.parseLong(andamyo.getFeederEpoch()) + andamyo.getLineCount();
return Long.parseLong(andamyo.getFeederEpoch()) <= eventEpoch && eventEpoch <= maxEpoch;
});
}
}