我在做什么?
我发现自己被困在了为视频中的每一帧插入一个文档的问题上。然后,将部分帧扔进一个使用机器学习模型的标签函数中。最后,我想 更新 的空白文档,并将标签分配给图像。
我尝试过什么。
经过一些实验,我决定使用 concurrent.futures
's ProcessPoolExecutor
我看了一下 这个 的文章,该文章使用了一个名为 consumers
. 这套方案非常类似于 ProcessPoolExecutor
所以我发现它很有用。
我在实施他建议的解决方案时遇到了麻烦。"国家的持久性: "状态持续: 理智" 与 ProcessPoolExecutor
于是,我转而实现了他的 "黑客 "解决方案,它使用了一个全局。"State Persistence: "State Persistence: The Hack"
结果。
将他的逻辑应用到我的脚本中后, 多重处理开始工作了!
但是当试图 find_and_update
当前帧的文档(由该函数的 position
争论)。) 实际上没有任何东西被保存到MongoDB集合中。
我犯了什么错误,该如何解决?
我怀疑我误解了一些关于 pymongo
客户端以及他们在多个流程中的表现。
如果有人能指出我的错误,我会非常感激。
我的函数
log = None
config = open('config.json')
config = json.load(config)
# Connect to database
client = pymongo.MongoClient("mongodb://localhost:27017/")
# Create log database
db = client[config["settings"]["video_file"].replace('.', '_').replace('/', '_')]
collection = "log_0"
log = db[collection]
log.insert_one({ "type": "__init" })
# Database initialization
def init(config, collection):
print("Initializing worker ")
# Initializing global
global log
# Connect to database
client = pymongo.MongoClient("mongodb://localhost:27017/")
# Create log database
db = client[config["settings"]["video_file"].replace('.', '_').replace('/', '_')]
log = db[collection]
def predict_icon(data):
global log
# Unpack data
image_data, position, operators = data
# Identify icon with TensorFlow model
image_data = cv2.resize(image_data, (224, 224), interpolation=cv2.INTER_AREA)
interpreter.set_tensor(input_details[0]['index'], [image_data])
interpreter.invoke()
output_data = interpreter.get_tensor(output_details[0]['index'])
# Write to database
print(type(log))
log.find_one_and_update({"frame_index": position}, {"$push": { "roster": operators[pd.Series(output_data[0]).idxmax()] }} )
#print(operators[pd.Series(output_data[0]).idxmax()] + " was predicted on process: {}".format(os.getpid()))
return None
池子:
with ProcessPoolExecutor(max_workers=3, initializer=init, initargs=(config, collection)) as executor:
对于视频中的每一帧,我首先插入一个文档。
while cap:
prev = time.time()
# Do something with your image here.
_, frame = cap.read()
_, frame2 = cap.read()
_, frame3 = cap.read()
frame_position = cap.get(cv2.CAP_PROP_POS_FRAMES)
log.insert_one({ "type": "frame", "frame_index": frame_position, "roster": []})
在代码的后面,我把工作提交给执行者。
# Predict each icon label using multiple threads
for icon in crops:
executor.submit(predict_icon, (icon, frame_position, config["operators"]))
我做了一些测试,我已经确认:
frame_position
我做了一些测试,我已经确认:log
似乎是一个pymongo客户端。当我打印出 type()
的 log
: <class 'pymongo.collection.Collection'>
我在这个问题上纠结了很久 这是我得到的最多的解决方案了
谢谢! 我正在做的是:
mm 我不是pymongo的专家,但是你能不能用upsert=True的update_one代替(你不需要整个文档作为结果),然后检查操作的结果,并从那里进行调试?