我正在尝试编写用于将Java RDD中的数据转换为直方图的代码,以便我可以以某种方式对数据进行分区。例如,对于我想要创建大小直方图的数据,我可以找出哪个bin包含特定大小范围的条目数。我能够在不同的RDD中获得价值,但我不确定我在这里缺少什么。
有更简单的方法吗?
0 - 1 GB - 2 entries
1 - 5GB - 4 entries
and so on
EntryWithSize {
long size;
String entryId;
String groupId;
}
JavaRDD<EntryWithSize> entries = getEntries();
JavaRDD<HistoSize> histoSizeJavaRDD = entryJavaRDD.keyBy(EntryWithSize::getGroupId)
.combineByKey(
HistoSize::new,
(HistoSize h, EntryWithSize y) -> h.mergeWith(new HistoSize(y)),
HistoSize::mergeWith
).values();
@Data
@AllArgsConstructor
static class HistoSize implements Serializable {
int oneGB;
int fiveGB;
public HistoSize(EntryWithSize entry) {
addSize(entry);
}
private void addSize(EntryWithSize entry) {
long size = entry.getSize();
if (size <= ONE_GB) {
oneGB++;
} else {
fiveGB++;
}
}
public HistoSize mergeWith(HistoSize other) {
oneGB += other.oneGB;
fiveGB += other.fiveGB;
return this;
}
}
我能够通过在最终对rdd上使用reduce来使其工作。我的测试数据错误,导致输出中的红鲱鱼。
Function2<HistoSize, HistoSize, HistoSize> reduceSumFunc = (a, b) -> (new HistoSize(
a.oneGB + b.oneGB,
a.fiveGB + b.fiveGB,
));
HistoSize finalSize = histoSizeJavaRDD.reduce(reduceSumFunc);