跨分区的Das Map Tensorflow

问题描述 投票:0回答:1

我有一个Tensorflow模型,我想在Dask Dataframe上运行(而不是训练)。我正在使用map_partitions。但是,当我查看仪表板以检查进度时,所有工作仅运行1个任务。我希望它可以同时处理分区。我在做什么错?

启动我的本地集群:

cluster = LocalCluster(ip="0.0.0.0")
client=Client(cluster)

ddf = dd.read_csv("data/docs", names=["docs"])

Dataframes ddf是一堆句子(字符串),具有9个分区。

这是TF模型:

def encode_factory(sess):

    output_tensor_names_sorted = ["input_layer/concat:0"]

    loader.load(sess, 'serve', export_path)

    def encode(sentence):
        #encodes string as `Example` protobuff
        serialized_examples = make_examples(sentence, "word")

        inputs_feed_dict = {"input_example_tensor:0": serialized_examples}

        outputs = sess.run(output_tensor_names_sorted,
                       feed_dict=inputs_feed_dict)
        return outputs[0][0]

    return encode

函数encode_factory使用一个Tensorflow Session对象,并从export_path(磁盘)加载TF模型。该函数返回一个闭包,该闭包以句子(文本字符串)作为输入并返回句子编码(嵌入/浮点数组)。

我将其注册为未来:

future_fn = client.scatter(encode_factory, broadcast=True) 

然后定义我的映射函数:

def map_fn(pdf, encoder):
    #create instance of TF model encoder
    encode = encoder(tf.Session())

    embedded_docs = []

    #iterate through items in Pandas Dataframe
    for doc in pdf.docs:
        doc_embedding = encoder(doc) #pass sentence to TF model
        embedded_docs.append(str(doc_embedding))

    pdf["encoding"] = embedded_docs
    return pdf

并在分区之间应用地图:

ddf.map_partitions(map_fn, future_fn, meta={'docs': str, 'encoding': str}).head()

我如何才能实现一些并发,只有一个工作程序正在运行!

Dask Dashboard

tensorflow dask dask-distributed
1个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.