HDFS 上带有 Parquet 数据的 AutoFaiss - OSError:仅对可搜索文件有效

问题描述 投票:0回答:0

我正在尝试使用 autoafaiss 从以 Parquet 格式存储在 HDFS 中的数据集中生成 ANN 索引。运行 build_index 函数时,我遇到一个错误,指出“OSError:仅对可搜索文件有效”。我已经测试并验证了该文件确实是可搜索的,但问题仍然存在。

这是我的代码:

from autofaiss import build_index
build_index(
    embeddings="hdfs://[HOST]:[PORT]/image_embeddings/",
    file_format="parquet",
    embedding_column_name="embedding",
    temporary_indices_folder="autofaiss_indices",
    index_path="knn.index",
    index_infos_path="infos.json",
    max_index_memory_usage="2G",
    current_memory_available="10G",
)

错误日志如下

2023-05-04 18:55:52,802 [INFO]: Using 128 omp threads (processes), consider increasing --nb_cores if you have more
2023-05-04 18:55:52,803 [INFO]: Launching the whole pipeline 05/04/2023, 18:55:52
2023-05-04 18:55:52,804 [INFO]: Reading total number of vectors and dimension 05/04/2023, 18:55:52
2023-05-04 18:55:52,887 [INFO]: >>> Finished "Reading total number of vectors and dimension" in 0.0827 secs
2023-05-04 18:55:52,888 [INFO]: >>> Finished "Launching the whole pipeline" in 0.0842 secs
---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
/tmp/ipykernel_5643/567248455.py in <module>
      8     index_infos_path="infos.json",
      9     max_index_memory_usage="2G",
---> 10     current_memory_available="10G",
     11 )

/opt/conda/lib/python3.7/site-packages/autofaiss/external/quantize.py in build_index(embeddings, index_path, index_infos_path, ids_path, save_on_disk, file_format, embedding_column_name, id_columns, index_key, index_param, max_index_query_time_ms, max_index_memory_usage, min_nearest_neighbors_to_retrieve, current_memory_available, use_gpu, metric_type, nb_cores, make_direct_map, should_be_memory_mappable, distributed, temporary_indices_folder, verbose, nb_indices_to_keep)
    207                 file_format=file_format,
    208                 embedding_column=embedding_column_name,
--> 209                 meta_columns=id_columns,
    210             )
    211             nb_vectors = embedding_reader.count

/opt/conda/lib/python3.7/site-packages/embedding_reader/embedding_reader.py in __init__(self, embeddings_folder, file_format, embedding_column, meta_columns, metadata_folder)
     21         elif file_format == "parquet":
     22             self.reader = ParquetReader(
---> 23                 embeddings_folder, embedding_column_name=embedding_column, metadata_column_names=meta_columns
     24             )
     25         elif file_format == "parquet_npy":

/opt/conda/lib/python3.7/site-packages/embedding_reader/parquet_reader.py in __init__(self, embeddings_folder, embedding_column_name, metadata_column_names)
     53         for filename in embeddings_file_paths:
     54             with self.fs.open(filename, "rb") as f:
---> 55                 parquet_file = pq.ParquetFile(f, memory_map=True)
     56                 batches = parquet_file.iter_batches(batch_size=1, columns=[embedding_column_name])
     57                 try:

/opt/conda/lib/python3.7/site-packages/pyarrow/parquet.py in __init__(self, source, metadata, common_metadata, read_dictionary, memory_map, buffer_size, pre_buffer, coerce_int96_timestamp_unit)
    232             buffer_size=buffer_size, pre_buffer=pre_buffer,
    233             read_dictionary=read_dictionary, metadata=metadata,
--> 234             coerce_int96_timestamp_unit=coerce_int96_timestamp_unit
    235         )
    236         self.common_metadata = common_metadata

/opt/conda/lib/python3.7/site-packages/pyarrow/_parquet.pyx in pyarrow._parquet.ParquetReader.open()

/opt/conda/lib/python3.7/site-packages/pyarrow/io.pxi in pyarrow.lib.NativeFile.tell()

/opt/conda/lib/python3.7/site-packages/pyarrow/io.pxi in pyarrow.lib.NativeFile.get_random_access_file()

/opt/conda/lib/python3.7/site-packages/pyarrow/io.pxi in pyarrow.lib.NativeFile._assert_seekable()

OSError: only valid on seekable files

我使用以下代码连接到 HDFS:

# Connect to HDFS filesystem
hdfs_classpath = subprocess.run (['hadoop', 'classpath', '--glob'], stdout=subprocess.PIPE)
hdfs_host = '[HOST]'
hdfs_port = [PORT]
os.environ ['CLASSPATH'] = hdfs_classpath.stdout.decode('utf-8').strip ()
hdfs = fs.HadoopFileSystem(host=hdfs_host, port=hdfs_port)

当我尝试使用 PyArrow 的 HadoopFileSystem 打开文件时,seekable() 方法返回 True:

import pyarrow as pa
fs1 = pa.hdfs.connect('[HOST]', [PORT])
hdfs_path = 'hdfs://[HOST]:[PORT]/image_embeddings/part-x.snappy.parquet'
with fs1.open(hdfs_path, "rb") as f1:
    parquet_file = pq.ParquetFile(f1, memory_map=True)
    print(f1.seekable())
> True

但是,当我尝试对 embedding_readers.get_file_list 执行相同操作时,seekable() 方法返回 False:

embeddings_folder = "hdfs://[HOST]:[PORT]/image_embeddings/"
fs2, embeddings_file_paths = get_file_list(embeddings_folder, "parquet")
with fs2.open(hdfs_path, "rb") as f2:
    print(f2.seekable())
    parquet_file = pq.ParquetFile(f2, memory_map=True)
> False

上面的错误回溯如下:

OSError: only valid on seekable files

我应该做哪些更改以确保我可以使用存储在 HDFS 中的 Parquet 数据运行 autoafaiss?

python hadoop hdfs parquet pyarrow
© www.soinside.com 2019 - 2024. All rights reserved.