我正在尝试使用 dask 读取 io.bytesIO 中的 csv 存储。那是可能的吗?
blob_service = BlobServiceClient.from_connection_string(self.credentials['connection_string'])
blob_client = blob_service.get_blob_client(container= self.credentials['container_name'],blob=filename)
download_stream = blob_client.download_blob()
download_stream.download_to_stream(stream)
stream.seek(0)
df = dd.read_csv(stream.getvalue().decode().split('\n'))
第一个参数好像应该是url路径。但我不得不从 io.bytesIO(变量流)中获取 csv。有没有办法在不将文件保存在本地的情况下做到这一点? 我正在创建一个 ETL,因此凭证会发生变化。这就是我尝试以这种方式进行操作的原因。
可以利用
delayed
和pandas
(可以从io
读取),大致如下:
from dask import delayed
from dask.dataframe import from_delayed
from pandas import read_csv, DataFrame
@delayed
def io_to_pandas(stream):
... # any necessary logic
df = read_csv(stream.getvalue())
return df
# specify the expected data shape (can also specify dtypes)
meta = DataFrame(columns=['a', 'b', 'c'])
ddf = from_delayed(io_to_pandas(stream), meta=meta)