来自 io.bytesIO 的 Dask read_csv

问题描述 投票:0回答:1

我正在尝试使用 dask 读取 io.bytesIO 中的 csv 存储。那是可能的吗?

    blob_service = BlobServiceClient.from_connection_string(self.credentials['connection_string'])
    blob_client = blob_service.get_blob_client(container=      self.credentials['container_name'],blob=filename)
    download_stream = blob_client.download_blob()
    download_stream.download_to_stream(stream)
    stream.seek(0)
    df = dd.read_csv(stream.getvalue().decode().split('\n'))

第一个参数好像应该是url路径。但我不得不从 io.bytesIO(变量流)中获取 csv。有没有办法在不将文件保存在本地的情况下做到这一点? 我正在创建一个 ETL,因此凭证会发生变化。这就是我尝试以这种方式进行操作的原因。

python azure-blob-storage dask
1个回答
0
投票

可以利用

delayed
pandas
(可以从
io
读取),大致如下:

from dask import delayed
from dask.dataframe import from_delayed
from pandas import read_csv, DataFrame

@delayed
def io_to_pandas(stream):
   ... # any necessary logic
   df = read_csv(stream.getvalue())
   return df

# specify the expected data shape (can also specify dtypes)
meta = DataFrame(columns=['a', 'b', 'c'])

ddf = from_delayed(io_to_pandas(stream), meta=meta)
© www.soinside.com 2019 - 2024. All rights reserved.