我有一个csv文件,我逐行读取url以请求每个enpoint。解析每个请求并将数据写入output.csv。这个过程是平行的。
该问题与书面数据有关。部分数据部分遗漏或完全遗漏(空白行)。我想这是因为异步进程之间的冲突或冲突而发生的。你能告诉我如何解决这个问题。
def parse_data(url, line_num):
print line_num, url
r = requests.get(url)
htmltext = r.text.encode("utf-8")
pois = re.findall(re.compile('<pois>(.+?)</pois>'), htmltext)
for poi in pois:
write_data(poi)
def write_data(poi):
with open('output.csv', 'ab') as resfile:
writer = csv.writer(resfile)
writer.writerow([poi])
resfile.close()
def main():
pool = Pool(processes=4)
with open("input.csv", "rb") as f:
reader = csv.reader(f)
for line_num, line in enumerate(reader):
url = line[0]
pool.apply_async(parse_data, args=(url, line_num))
pool.close()
pool.join()
尝试添加文件锁定:
import fcntl
def write_data(poi):
with open('output.csv', 'ab') as resfile:
writer = csv.writer(resfile)
fcntl.flock(resfile, fcntl.LOCK_EX)
writer.writerow([poi])
fcntl.flock(resfile, fcntl.LOCK_UN)
# Note that you dont have to close the file. The 'with' will take care of it
对同一文件的并发写入确实是数据丢失/文件损坏的已知原因。这里的安全解决方案是“map / reduce”模式 - 每个进程都在其自己的结果文件(map)中写入,然后将这些文件连接在一起(reduce)。