在更改分区之前重新读取由 PyArrow fsspec 缓存的 s3 parquet 分区时出现 FileNotFoundError

问题描述 投票:0回答:1

要复制的事件顺序如下:

  1. 使用 pandas.read_parquet (底层是 pyarrow.dataset)读取 s3 parquet 分区。
  2. 将另一个文件添加到该分区中。
  3. 再次读取相同的 s3 parquet,我们将在这个新文件上遇到 FileNotFoundError。

下面是复制它的代码片段:

import os
import boto3
import pandas as pd

df1 = pd.DataFrame([{'a': i, 'b': i} for i in range(10)])
df1.to_parquet('part1.parquet')
df2 = pd.DataFrame([{'a': i+100, 'b': i+100} for i in range(10)])
df2.to_parquet('part2.parquet')

s3_client = boto3.client('s3')
url = 's3://bucket_name/test01'
s3_client.upload_file('part1.parquet', 'bucket_name', os.path.join(url, 'p=x', 'part1.parquet'))
dfx1 = pd.read_parquet(url) # this is fine

s3_client.upload_file('part2.parquet', 'bucket_name', os.path.join(url, 'p=x', 'part2.parquet'))

dfx2 = pd.read_parquet(url) # this will generate the FileNotFoundError on part2.parquet

异常回溯如下:

  File "<env>/lib/python3.8/site-packages/pandas/io/parquet.py", line 493, in read_parquet
    return impl.read(
  File "<env>/lib/python3.8/site-packages/pandas/io/parquet.py", line 240, in read
    result = self.api.parquet.read_table(
  File "<env>/lib/python3.8/site-packages/pyarrow/parquet.py", line 1996, in read_table
    return dataset.read(columns=columns, use_threads=use_threads,
  File "<env>/lib/python3.8/site-packages/pyarrow/parquet.py", line 1831, in read
    table = self._dataset.to_table(
  File "pyarrow/_dataset.pyx", line 323, in pyarrow._dataset.Dataset.to_table
  File "pyarrow/_dataset.pyx", line 2311, in pyarrow._dataset.Scanner.to_table
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/_fs.pyx", line 1179, in pyarrow._fs._cb_open_input_file
  File "<env>/python3.8/site-packages/pyarrow/fs.py", line 394, in open_input_file
    raise FileNotFoundError(path)
FileNotFoundError: bucket_name/test01/p=x/part2.parquet

请注意,如果我打开一个单独的 python 控制台并尝试读取该镶木地板/分区,则完全不会有问题。

我怀疑 pyarrow 在访问 s3 parquets 时会进行某种缓存,并且当添加新文件时,新文件不在其缓存中,即使它知道需要读入新文件,但它找不到它。但是重新加载 pandas 和 pyarrow 模块并不能帮助重置这样的缓存。

以下是我的软件包版本

  • fsspec== 2022.2.0
  • 熊猫== 1.4.1
  • pyarrow == 7.0.0

[更新] 如果将上述步骤 2 替换为上传part2.parquet 以覆盖现有的part1.parquet,则覆盖part1.parquet 也不起作用。缓存器知道原始的part1.parquet不再存在。这个异常回溯是这样的:

File "pyarrow/_dataset.pyx", line 1680, in pyarrow._dataset.DatasetFactory.finish
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "<env>/lib/python3.8/site-packages/fsspec/spec.py", line 1544, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/<env>/lib/python3.8/site-packages/fsspec/caching.py", line 377, in _fetch
    self.cache = self.fetcher(start, bend)
  File "<env>/lib/python3.8/site-packages/s3fs/core.py", line 1965, in _fetch_range
    raise FileExpired(
s3fs.utils.FileExpired: [Errno 16] The remote file corresponding to filename bucket_name/test01/p=x/part1.parquet and Etag "d64a2de4f9c93dff49ecd3f19c414f61" no longer exists.
amazon-s3 parquet python-3.8 pyarrow fsspec
1个回答
0
投票

简短回答:
规避此缓存的最简单方法是在调用 read_parquet 时使用它:

df = pandas.read_parquet(url, storage_options={'version_aware': True})

这暗示 s3fs 系统其他用户无法更改 s3 对象,这在某种意义上暗示 s3fs 不依赖缓存。

长话短说:
fsspec 很难关闭/清除/重置。 read_parquet 中有一个 storage_options 句柄,它是存储文件系统级别的命令(参见 https://pandas.pydata.org/pandas-docs/version/1.5/reference/api/pandas.read_parquet.html)。
这链接到 storage_options 的可用选项(此处https://pandas.pydata.org/docs/user_guide/io.html#reading-writing-remote-files),但这些选项都没有为我们提供所需的句柄。
单击它提供的 S3FS 文档链接(https://s3fs.readthedocs.io/en/latest/index.html?highlight=host#s3-known-storage),该部分也没有用,但后面的部分(#bucket-version-awareness)指出:

如果您的存储桶启用了对象版本控制,那么您可以添加 对 s3fs 的版本感知支持。这确保了如果文件被打开 在特定时间点该版本将用于阅读。

这缓解了多个用户同时使用的问题 读取和写入同一个对象。

即使没有说明,这也是 storage_options 可接受的参数,可以解决此问题。

© www.soinside.com 2019 - 2024. All rights reserved.