我有一个Tensorflow模型,我想在Dask Dataframe上运行(而不是训练)。我正在使用map_partitions
。但是,当我查看仪表板以检查进度时,所有工作仅运行1个任务。我希望它可以同时处理分区。我在做什么错?
启动我的本地集群:
cluster = LocalCluster(ip="0.0.0.0")
client=Client(cluster)
ddf = dd.read_csv("data/docs", names=["docs"])
Dataframes ddf
是一堆句子(字符串),具有9个分区。
这是TF模型:
def encode_factory(sess):
output_tensor_names_sorted = ["input_layer/concat:0"]
loader.load(sess, 'serve', export_path)
def encode(sentence):
#encodes string as `Example` protobuff
serialized_examples = make_examples(sentence, "word")
inputs_feed_dict = {"input_example_tensor:0": serialized_examples}
outputs = sess.run(output_tensor_names_sorted,
feed_dict=inputs_feed_dict)
return outputs[0][0]
return encode
函数encode_factory
使用一个Tensorflow Session
对象,并从export_path
(磁盘)加载TF模型。该函数返回一个闭包,该闭包以句子(文本字符串)作为输入并返回句子编码(嵌入/浮点数组)。
我将其注册为未来:
future_fn = client.scatter(encode_factory, broadcast=True)
然后定义我的映射函数:
def map_fn(pdf, encoder):
#create instance of TF model encoder
encode = encoder(tf.Session())
embedded_docs = []
#iterate through items in Pandas Dataframe
for doc in pdf.docs:
doc_embedding = encoder(doc) #pass sentence to TF model
embedded_docs.append(str(doc_embedding))
pdf["encoding"] = embedded_docs
return pdf
并在分区之间应用地图:
ddf.map_partitions(map_fn, future_fn, meta={'docs': str, 'encoding': str}).head()
我如何才能实现一些并发,只有一个工作程序正在运行!