Python多重处理如何在不使用.join()方法的情况下更新管理器列表中的复杂对象

问题描述 投票:0回答:1

我大约在2个月前开始使用Python进行编程,在过去的2周中,我一直在努力解决这个问题。我知道有许多与此类似的线程,但是我找不到适合我情况的解决方案。

我需要一个主进程,它是与Telegram交互的一个主进程,另一个进程是buffer,它了解从主进程接收的复杂对象并对其进行更新。

我想以一种更简单,更流畅的方式进行此操作。

由于没有使用join()方法,因此由于使用了多重处理,因此无法更新对象。

然后我尝试使用多线程,但是它给了我与Pyrogram一个用于与Telegram交互的框架的兼容性问题。

我再次写出项目的“复杂性”是为了重现我所遇到的同样的错误,并从每个人那里获得并提供最好的帮助。

a.py

class A():
    def __init__(self, length = -1, height = -1):
        self.length = length
        self.height = height

b.py

from a import A
class B(A):
    def __init__(self, length = -1, height = -1, width = -1):
        super().__init__(length = -1, height = -1)
        self.length = length
        self.height = height
        self.width = width

    def setHeight(self, value):
        self.height = value

c.py

class C():
    def __init__(self, a, x = 0, y = 0):
        self.a = a
        self.x = x
        self.y = y

    def func1(self):
        if self.x < 7:
            self.x = 7

d.py

from c import C
class D(C):
    def __init__(self, a, x = 0, y = 0, z = 0):
        super().__init__(a, x = 0, y = 0)
        self.a = a
        self.x = x
        self.y = y
        self.z = z

    def func2(self):
        self.func1()

main.py

from b import B
from d import D
from  multiprocessing import Process, Manager
from buffer import buffer

if __name__ == "__main__":

    manager = Manager()
    lizt = manager.list()

    buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
    buffer.start()
    #can't invoke buffer.join() here because I need the below code to keep running while the buffer process takes a few minutes to end an instance passed in the list
    #hence I can't wait the join() function to update the objects inside the buffer but i need objects updated in order to pop them out from the list

    import datetime as dt
    t = dt.datetime.now()

    #library of kind of multithreading (pool of 4 processes), uses asyncio lib
    #this while was put to reproduce the same error I am getting

    while True:
        if t + dt.timedelta(seconds = 10) < dt.datetime.now():
            lizt.append(D(B(5, 5, 5)))
            t = dt.datetime.now()


"""
#This is the code which looks like the one in my project

#main.py
from pyrogram import Client #library of kind of multithreading (pool of 4 processes), uses asyncio lib
from b import B
from d import D
from  multiprocessing import Process, Manager
from buffer import buffer

if __name__ == "__main__":

    api_id = 1234567
    api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    app = Client("my_account", api_id, api_hash)

    manager = Manager()
    lizt = manager.list()

    buffer = Process(target = buffer, args = (lizt, )) #passing the list as a parameter
    buffer.start()
    #can't invoke buffer.join() here because I need the below code to run at the same time as the buffer process
    #hence I can't wait the join() function to update the objects inside the buffer

@app.on_message()
def my_handler(client, message):
    lizt.append(complex_object_conatining_message)
"""

buffer.py

def buffer(buffer):
    print("buffer was defined")
    while True:
        if len(buffer) > 0:
            print(buffer[0].x) #prints 0
            buffer[0].func2() #this changes the class attribute locally in the class instance but not in here
            print(buffer[0].x) #prints 0, but I'd like it to be 7

            print(buffer[0].a.height) #prints 5
            buffer[0].a.setHeight(10) #and this has the same behaviour
            print(buffer[0].a.height) #prints 5 but I'd like it to be 10

            buffer.pop(0)

这是关于我遇到的问题的完整代码。从字面上看,每个建议都是受欢迎的,希望是建设性的,在此先感谢您!

python-multiprocessing telegram python-3.7 python-multithreading
1个回答
0
投票

最后,我不得不改变解决此问题的方式,就像使用框架一样,它也在使用asyncio。

此解决方案提供了我一直在寻找的所有内容:

复杂对象更新

-避免多重处理的问题(尤其是join())

也是:

-lightweight:在我有2个python进程之前1)大约40K 2)大约75K

此实际过程约为30K(而且更快,更干净)

这里是解决方案,我希望它对像我一样的其他人有用:

类的一部分被跳过,因为此解决方案绝对可以更新复杂的对象

main.py

from pyrogram import Client
import asyncio
import time

def cancel_tasks():
    #get all task in current loop
    tasks = asyncio.Task.all_tasks()
    for t in tasks:
        t.cancel()

try:
    buffer = []
    firstWorker(buffer) #this one is the old buffer.py file and function
    #the missing loop and loop method are explained in the next piece of code
except KeyboardInterrupt:
    print("")
finally:
    print("Closing Loop")
    cancel_tasks()

firstWorker.py

import asyncio
def firstWorker(buffer):
    print("First Worker Executed")

    api_id = 1234567
    api_hash = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
    app = Client("my_account", api_id, api_hash)

    @app.on_message()
    async def my_handler(client, message):
        print("Message Arrived")
        buffer.append(complex_object_conatining_message)
        await asyncio.sleep(1)

    app.run(secondWorker(buffer)) #here is the trick: I changed the 
                                  #method run() of the Client class 
                                  #inside the Pyrogram framework 
                                  #since it was a loop itself. 
                                  #In this way I added another task 
                                  #to the existing loop in orther to 
                                  #let run both of them together.

我的secondWorker.py

import asyncio
async def secondWorker(buffer):
    while True:
        if len(buffer) > 0:
            print(buffer.pop(0))

        await asyncio.sleep(1)

了解此代码中使用的异步的资源可以在这里找到:

Asyncio simple tutorial

Python Asyncio Official Documentation

This tutorial about how to fix classical Asyncio errors

© www.soinside.com 2019 - 2024. All rights reserved.