使用Java Spark将对象A的数据集转换为对象B的数据集,我使用
Encoder<B> encoder = Encoders.bean(B.class);
Dataset<B> datasetB = datasetA.map(x -> service.generate(x), encoder)
// where service interface has a function such as "B generate(A a)"
我的想法是将业务逻辑放在一个与 Spark 无关的包中,并将 Spark 的所有大数据代码放在另一个包中(称为业务逻辑包)。
到目前为止,这个想法对我来说相当有效(需要注意的是,让一堆类在业务逻辑中实现可序列化接口),但当我想为我的业务对象生成唯一标识符时,它开始陷入困境。我需要某种同步以避免重复的 ID,并且业务逻辑包不应该知道它实际上在多个工作节点上运行。
我考虑过使用基于纪元的东西,但使用 10M id 代进行测试显示,当使用 System.nanotime() 作为 id 时,即使在单个节点上,也会出现数百个 id 冲突。
是否可以使用 Spark 绘制一个 monotonically_increasing_id() 并将其传递给 Dataset.map() 调用?是在地图转换后添加或填充我的 id 列的唯一解决方案吗?
编辑:我刚刚有了这个想法
Dataset<A> a = b.rdd().zipWithIndex().map(x -> service.generate(x._1(), x._2()), ablEncoder);
但是 x 是一个
scala.Tuple2<a, Object>
但我似乎无法访问它的元素,尽管它们是公共的?
Edit2:如果我使用
toJavaRDD()
而不是 rdd()
,Tuple2 就可用
这看起来是正确的方法吗?
这个问题并不是那么容易解决,为了质量,我们添加了一个雪花id,如保证唯一行id仅依赖于mac地址唯一性,结合分区id,起始时间戳和计数器(160位,一个整数和两个长整型) .
这种方法的一个主要缺点是使用该函数多次重新运行操作会生成新的唯一 ID,因此先写出结果然后使用它们是保证通常预期行为的唯一方法。
monotonocally_incrementing_id 仅为每个分区提供一个计数器,并且仅在数据帧内是唯一的,因此重复运行进程将最终导致冲突。