我们有一个庞大的数据集,在 加速器 其中,我们需要对每一行进行一些非常昂贵的操作.如果我们想一次性处理整个数据集,将需要数周的时间,因此,我们希望一次只取一小部分,比如说1%,并在晚上进行处理。
目前,所有数据都在一个数据集中。设置作业工作比如说100万行到200万行,有什么选择?
我可以看到三种可能的路径。
我的问题是,这个方法需要跳过n^2 2行,而我们有很多行。
迭代主数据集,并将每条记录添加到新的数据集中,每100万行建立新的数据集,并与旧的数据集进行链式连接。
这个应该是比较快的,它需要更多的空间,但它在以后仍然会有用。
就用标准库中的方法把数据集拆成一条链,这个就很好了!
理想情况下,你会把你的数据从一开始就以合理的分块导入,但如果因为某些原因这不切实际,我认为拆分你所拥有的数据集是最好的办法。如你所说,如果n不小的话,n^2并不好用)。
也许应该增加一个标准的方法来做这件事。下面是这样一种方法可能的样子。
options = dict(lines_per_dataset=int)
datasets = ('source', 'previous',)
def prepare(job, slices):
previous = datasets.previous
writers = []
for sliceno, linecnt in enumerate(datasets.source.lines):
writers.append([])
for ix in range(0, linecnt, options.lines_per_dataset):
previous = job.datasetwriter(
columns=datasets.source.columns,
previous=previous,
name='%d_%d' % (sliceno, ix,),
for_single_slice=sliceno,
)
writers[-1].append(previous)
return writers
def analysis(sliceno, prepare_res):
per = options.lines_per_dataset
writers = iter(prepare_res[sliceno])
for ix, data in enumerate(datasets.source.iterate(sliceno)):
if ix % per == 0:
write = next(writers).get_split_write_list()
write(data)
def synthesis(prepare_res):
# We want the last one to be the default dataset
for r in prepare_res:
for dw in r:
ds = dw.finish()
ds.link_to_here()
请注意,我在这里没有传播哈希标签,因为如果源是哈希分区的话,那会让每个数据集只有一个片断的数据。