Dask client.persist尝试使用HashingVectorizer时返回AssertionError

问题描述 投票:0回答:1

我正在尝试使用dask HashingVectorizer对dask.dataframe进行矢量化处理。我希望向量化结果保留在群集(分布式系统)中。这就是为什么我尝试转换数据时使用client.persist的原因。但是由于某种原因,我在下面出现了错误。

Traceback (most recent call last):
  File "/home/dodzilla/my_project/components_with_adapter/vectorizers/base_vectorizer.py", line 112, in hybrid_feature_vectorizer
    CLUSTERING_FEATURES=self.clustering_features)
  File "/home/dodzilla/my_project/components_with_adapter/vectorizers/text_vectorizer.py", line 143, in vectorize
    X = self.client.persist(fitted_vectorizer.transform, combined_data)
  File "/home/dodzilla/.local/lib/python3.6/site-packages/distributed/client.py", line 2860, in persist
    assert all(map(dask.is_dask_collection, collections))
AssertionError

我无法共享数据,但有关数据的所有必要信息如下:

>>>type(combined_data)
<class 'dask.dataframe.core.Series'>
>>>type(combined_data.compute())
<class 'pandas.core.series.Series'>
>>>combined_data.compute().shape
12

下面是一个最小的工作示例。在下面的代码段中,combined_data保存合并的列。含义:所有列都合并为1列。数据有12行。行内的所有值都是字符串。这是我得到错误的代码:

from stop_words import get_stop_words
from dask_ml.feature_extraction.text import HashingVectorizer as daskHashingVectorizer
import pandas as pd
import dask
import dask.dataframe as dd
from dask.distributed import Client


def convert_dataframe_to_single_text(documents):
    """
    Combine all of the columns into 1 column.
    """
    if type(documents) is dask.dataframe.core.DataFrame:
        cols = documents.columns
        documents['combined'] = documents[cols].apply(func=(lambda row: ' '.join(row.values.astype(str))), axis=1,
                                                      meta=('str'))
        document_texts = documents.drop(cols, axis=1)
    else:
        raise TypeError('Wrong type of data. Expected Pandas DF or Dask DF but received ', type(documents))
    return document_texts

# Init the client.
client = Client('localhost:8786')

# Get stopwords
stopwords = get_stop_words(language="english")

# Create dask dataframe from pandas dataframe
data = {'Name':['Tom', 'nick', 'krish', 'jack'], 'Age':["twenty", "twentyone", "nineteen", "eighteen"]}
df = pd.DataFrame(data)
df = dd.from_pandas(df, npartitions=1)

# Init the vectorizer
vectorizer = daskHashingVectorizer(stop_words=stopwords, alternate_sign=False,
                       norm=None, binary=False,
                       n_features=10000)

# Combine all of to columns into 1 column.
combined_data = convert_dataframe_to_single_text(df)

# Fit the vectorizer.
fitted_vectorizer = client.persist(vectorizer.fit(combined_data))

# Transform the data.
X = client.persist(fitted_vectorizer.transform, combined_data)

我希望信息足够。

重要注意事项:当我说client.compute时,我没有出现任何错误,但是据我了解,这在计算机集群中不起作用,而是在本地计算机中运行。并且它返回一个csr矩阵,而不是惰性计算的dask.array

python-3.x dask dask-distributed
1个回答
0
投票

这不是我应该使用的client.persist。我要寻找的功能是client.submitclient.map ...就我而言,client.submit解决了我的问题。

© www.soinside.com 2019 - 2024. All rights reserved.