使用Python多处理写入csv apply_async会导致数据丢失

问题描述 投票:0回答:2

我有一个csv文件,我逐行读取url以请求每个enpoint。解析每个请求并将数据写入output.csv。这个过程是平行的。

该问题与书面数据有关。部分数据部分遗漏或完全遗漏(空白行)。我想这是因为异步进程之间的冲突或冲突而发生的。你能告诉我如何解决这个问题。

def parse_data(url, line_num):
    print line_num, url
    r = requests.get(url)
    htmltext = r.text.encode("utf-8")
    pois = re.findall(re.compile('<pois>(.+?)</pois>'), htmltext)
    for poi in pois:
        write_data(poi)

def write_data(poi):
    with open('output.csv', 'ab') as resfile:
        writer = csv.writer(resfile)
        writer.writerow([poi])
    resfile.close()

def main():
    pool = Pool(processes=4)

    with open("input.csv", "rb") as f:
        reader = csv.reader(f)
        for line_num, line in enumerate(reader):
            url = line[0]
            pool.apply_async(parse_data, args=(url, line_num))

    pool.close()
    pool.join()
python csv multiprocessing
2个回答
0
投票

尝试添加文件锁定:

import fcntl

def write_data(poi):
    with open('output.csv', 'ab') as resfile:
        writer = csv.writer(resfile)
        fcntl.flock(resfile, fcntl.LOCK_EX)
        writer.writerow([poi])
        fcntl.flock(resfile, fcntl.LOCK_UN)
     # Note that you dont have to close the file. The 'with' will take care of it

0
投票

对同一文件的并发写入确实是数据丢失/文件损坏的已知原因。这里的安全解决方案是“map / reduce”模式 - 每个进程都在其自己的结果文件(map)中写入,然后将这些文件连接在一起(reduce)。

© www.soinside.com 2019 - 2024. All rights reserved.