当在Python的多处理中与Kivy一起使用字符串共享内存时,会导致错误

问题描述 投票:0回答:1

我尝试通过以下简单的Python代码使用字符串的共享内存,并且效果很好

from multiprocessing import Value, Array, Process, Lock def process1(array, lock): with lock: buffer_data = str("z1111").encode("utf-8") array[0:len(buffer_data)] = buffer_data[:] if __name__ == '__main__': array = Array('c', 35) lock = Lock() process_test1 = Process(target=process1, args=[array, lock], daemon=True) process_test1.start() process_test1.join() print(array[:].decode("utf-8")) print("process ended")
接下来,我尝试使用名为 Kivy 的 GUI 库实现 

数字共享内存,效果也很好。 from multiprocessing import Process, Value, set_start_method, freeze_support, Lock kvLoadString=(""" <TextWidget>: BoxLayout: orientation: 'vertical' size: root.size Button: id: button1 text: "start" font_size: 48 on_press: root.buttonClicked() Label: id: lab text: 'result' """) def start_process(count,lock): process_test = Process(target=p_test, args=[count,lock], daemon=True) process_test.start() return process_test def p_test(count,lock): i = 0 while True: print(i) i = i + 1 with lock: count.value = i if __name__ == '__main__': freeze_support() from kivy.app import App from kivy.uix.widget import Widget from kivy.lang import Builder from kivy.uix.checkbox import CheckBox from kivy.uix.spinner import Spinner from kivy.properties import StringProperty,NumericProperty,ObjectProperty, BooleanProperty from kivy.core.window import Window from kivy.uix.boxlayout import BoxLayout from kivy.uix.popup import Popup from kivy.uix.treeview import TreeView, TreeViewLabel, TreeViewNode from kivy.uix.label import Label from kivy.uix.textinput import TextInput from kivy.uix.progressbar import ProgressBar from kivy.uix.button import Button set_start_method('spawn') class TestApp(App): def __init__(self, **kwargs): super(TestApp, self).__init__(**kwargs) self.title = 'testApp' def build(self): return TextWidget() class TextWidget(Widget): def __init__(self, **kwargs): super(TextWidget, self).__init__(**kwargs) self.process_test = None self.proc = None # shared memory self.count = Value('i', 0) self.lock = Lock() def buttonClicked(self): if self.ids.button1.text == "start": self.proc = start_process(self.count,self.lock) self.ids.button1.text = "stop" else: if self.proc: self.proc.kill() self.ids.button1.text = "start" self.ids.lab.text = str(self.count.value) Builder.load_string(kvLoadString) TestApp().run()

但是,我没有
成功地将Kivy与字符串共享内存一起使用

,如下所示。 from multiprocessing import Array,Process, Value, set_start_method, freeze_support, Lock import math kvLoadString=(""" <TextWidget>: BoxLayout: orientation: 'vertical' size: root.size Button: id: button1 text: "start" font_size: 48 on_press: root.buttonClicked() Label: id: lab text: 'result' """) def start_process(array,lock): process_test = Process(target=p_test, args=[array,lock], daemon=True) process_test.start() return process_test def p_test(array,lock): with lock: buffer_data = str("z1111").encode("utf-8") array[0:len(buffer_data)] = buffer_data[:] print(array.decode("utf-8")) if __name__ == '__main__': freeze_support() from kivy.app import App from kivy.uix.widget import Widget from kivy.lang import Builder from kivy.uix.checkbox import CheckBox from kivy.uix.spinner import Spinner from kivy.properties import StringProperty,NumericProperty,ObjectProperty, BooleanProperty from kivy.core.window import Window from kivy.uix.boxlayout import BoxLayout from kivy.uix.popup import Popup from kivy.uix.treeview import TreeView, TreeViewLabel, TreeViewNode from kivy.uix.label import Label from kivy.uix.textinput import TextInput from kivy.uix.progressbar import ProgressBar from kivy.uix.button import Button set_start_method('spawn') class TestApp(App): def __init__(self, **kwargs): super(TestApp, self).__init__(**kwargs) self.title = 'testApp' def build(self): return TextWidget() class TextWidget(Widget): def __init__(self, **kwargs): super(TextWidget, self).__init__(**kwargs) self.process_test = None self.proc = None # shared memory self.array = Array('c', 35) self.lock = Lock() def buttonClicked(self): if self.ids.button1.text == "start": self.proc = start_process(self.array,self.lock) self.ids.button1.text = "stop" else: if self.proc: self.proc.kill() self.ids.button1.text = "start" self.ids.lab.text = self.array.decode("utf-8") Builder.load_string(kvLoadString) TestApp().run()

错误信息如下。如何避免这样的错误?

Traceback (most recent call last): File "C:\Users\taichi\Documents\Winpython64-3.9.10.0\WPy64-39100\python-3.9.10.amd64\lib\multiprocessing\process.py", line 315, in _bootstrap self.run() File "C:\Users\taichi\Documents\Winpython64-3.9.10.0\WPy64-39100\python-3.9.10.amd64\lib\multiprocessing\process.py", line 108, in run self._target(*self._args, **self._kwargs) File "C:\Users\taichi\Desktop\test.py", line 29, in p_test print(array.decode("utf-8")) AttributeError: 'SynchronizedString' object has no attribute 'decode'


python multiprocessing kivy shared-memory
1个回答
0
投票

s = bytes.join(b'', [c for c in array if c != b'\x00']).decode('utf-8')

© www.soinside.com 2019 - 2024. All rights reserved.