将数据从多个进程推送到单一的MongoDB集合。

问题描述 投票:0回答:1

我在做什么?

我发现自己被困在了为视频中的每一帧插入一个文档的问题上。然后,将部分帧扔进一个使用机器学习模型的标签函数中。最后,我想 更新 的空白文档,并将标签分配给图像。

我尝试过什么。

经过一些实验,我决定使用 concurrent.futures's ProcessPoolExecutor

我看了一下 这个 的文章,该文章使用了一个名为 consumers. 这套方案非常类似于 ProcessPoolExecutor所以我发现它很有用。

我在实施他建议的解决方案时遇到了麻烦。"国家的持久性: "状态持续: 理智"ProcessPoolExecutor

于是,我转而实现了他的 "黑客 "解决方案,它使用了一个全局。"State Persistence: "State Persistence: The Hack"

结果。

将他的逻辑应用到我的脚本中后, 多重处理开始工作了!

但是当试图 find_and_update 当前帧的文档(由该函数的 position 争论)。) 实际上没有任何东西被保存到MongoDB集合中。

我犯了什么错误,该如何解决?

我怀疑我误解了一些关于 pymongo 客户端以及他们在多个流程中的表现。

如果有人能指出我的错误,我会非常感激。

我的代码。

我的函数

log = None
config = open('config.json')
config = json.load(config)

# Connect to database
client = pymongo.MongoClient("mongodb://localhost:27017/")

# Create log database
db = client[config["settings"]["video_file"].replace('.', '_').replace('/', '_')]
collection = "log_0"

log = db[collection]
log.insert_one({ "type": "__init" })

# Database initialization
def init(config, collection):
    print("Initializing worker ")

    # Initializing global
    global log

    # Connect to database
    client = pymongo.MongoClient("mongodb://localhost:27017/")

    # Create log database
    db = client[config["settings"]["video_file"].replace('.', '_').replace('/', '_')]
    log = db[collection]

def predict_icon(data):
    global log

    # Unpack data
    image_data, position, operators = data

    # Identify icon with TensorFlow model
    image_data = cv2.resize(image_data, (224, 224), interpolation=cv2.INTER_AREA)
    interpreter.set_tensor(input_details[0]['index'], [image_data])
    interpreter.invoke()
    output_data = interpreter.get_tensor(output_details[0]['index'])

    # Write to database
    print(type(log))
    log.find_one_and_update({"frame_index": position}, {"$push": { "roster": operators[pd.Series(output_data[0]).idxmax()] }} )
    #print(operators[pd.Series(output_data[0]).idxmax()] + " was predicted on process: {}".format(os.getpid()))
    return None

池子:

    with ProcessPoolExecutor(max_workers=3, initializer=init, initargs=(config, collection)) as executor:

对于视频中的每一帧,我首先插入一个文档。

        while cap:
        prev = time.time()

        # Do something with your image here.
        _, frame = cap.read()
        _, frame2 = cap.read()
        _, frame3 = cap.read()

        frame_position = cap.get(cv2.CAP_PROP_POS_FRAMES)
        log.insert_one({ "type": "frame", "frame_index": frame_position, "roster": []})

在代码的后面,我把工作提交给执行者。

                    # Predict each icon label using multiple threads
                    for icon in crops:
                        executor.submit(predict_icon, (icon, frame_position, config["operators"]))

我做了一些测试,我已经确认:

  1. frame_position 我做了一些测试,我已经确认:
  2. 模型输出标签。我可以在控制台中打印数据库中我想要的数据。
  3. 在预测功能中。log 似乎是一个pymongo客户端。当我打印出 type()log: <class 'pymongo.collection.Collection'>

我在这个问题上纠结了很久 这是我得到的最多的解决方案了

谢谢! 我正在做的是:

python python-3.x parallel-processing multiprocessing pymongo
1个回答
0
投票

mm 我不是pymongo的专家,但是你能不能用upsert=True的update_one代替(你不需要整个文档作为结果),然后检查操作的结果,并从那里进行调试?

© www.soinside.com 2019 - 2024. All rights reserved.