在子进程之间共享已保存的张量流模型

问题描述 投票:1回答:1

我已经构建了一个预测器(Python)类,它使用tensorflow来预测类

class IndustryPredictor:
    def __init__(self):
        self.predictor = self.load_predictor()

    def load_predictor(self):
        import tensorflow as tf
        confi_obj = ConfigParser()
        classifier_dir = confi_obj.classifier_directory
        predictor_model_dir_name = confi_obj.predictor_directory
        model_path = os.path.join(classifier_dir, predictor_model_dir_name)
        return tf.contrib.predictor.from_saved_model(model_path)

    def _create_float(v):
        return tf.train.Feature(float_list=tf.train.FloatList(value=[v]))

    def _create_str(v):
        return tf.train.Feature(bytes_list=tf.train.BytesList(value=[bytes(v, 'utf-8')]))

    def predict(description):
        doc_text = preprocess(description)

        text = _create_str(doc_text)
        dlen = _create_float(len(doc_text.split()))

        predicate = {'clean_text': text, 'len': dlen}

        example = tf.train.Example(features=tf.train.Features(feature=predicate))
        inputs = example.SerializeToString()

        preds = self.predictor({"inputs": [inputs]})
        return preds

这在单个流程环境中运行良好。我正在尝试使用multiprocessing模块来加速我的处理。我可以在子进程中创建这个对象并且运行正常,但由于我的模型本身大小为1GB,所以我只能启动子进程达到一定的限制。

我想的是在父进程中加载​​已保存的模型,并以某种方式将它传递给子进程,这样我只需加载模型一次。我试过这样做,但过程挂了。

def main():
    workers = 8
    predictor = load_predictor()
    pool = Pool(processes=workers)
    for i in range(0, workers):
        pool.apply_async(consume, args=(predictor,), error_callback=handle_error)

    # Stay alive
    try:
        while True:
            continue
    except KeyboardInterrupt:
        logger.error(' [*] Exiting...')
        pool.terminate()
        pool.join()

有没有办法在子进程中共享张量流的tf.contrib.predictor对象。在这个预测器上写一个keras包装器可以帮我解决这个问题。

python-3.x tensorflow keras multiprocessing python-3.4
1个回答
0
投票

多处理会分叉您的流程,这就是您拥有副本的原因。通过例如共享存储空间是可能的。 joblib。实质上,您的对象将成为磁盘上的内存映射对象。如果你有一个像样的SSD(或更好),那就是很好的解决方案

来自多处理的Manager可能是一个选择,但我不打赌它。

© www.soinside.com 2019 - 2024. All rights reserved.