我尝试通过以下简单的Python代码使用字符串的共享内存,并且效果很好。
from multiprocessing import Value, Array, Process, Lock
def process1(array, lock):
with lock:
buffer_data = str("z1111").encode("utf-8")
array[0:len(buffer_data)] = buffer_data[:]
if __name__ == '__main__':
array = Array('c', 35)
lock = Lock()
process_test1 = Process(target=process1, args=[array, lock], daemon=True)
process_test1.start()
process_test1.join()
print(array[:].decode("utf-8"))
print("process ended")
接下来,我尝试使用名为 Kivy 的 GUI 库实现 数字共享内存,效果也很好。
from multiprocessing import Process, Value, set_start_method, freeze_support, Lock
kvLoadString=("""
<TextWidget>:
BoxLayout:
orientation: 'vertical'
size: root.size
Button:
id: button1
text: "start"
font_size: 48
on_press: root.buttonClicked()
Label:
id: lab
text: 'result'
""")
def start_process(count,lock):
process_test = Process(target=p_test, args=[count,lock], daemon=True)
process_test.start()
return process_test
def p_test(count,lock):
i = 0
while True:
print(i)
i = i + 1
with lock:
count.value = i
if __name__ == '__main__':
freeze_support()
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.checkbox import CheckBox
from kivy.uix.spinner import Spinner
from kivy.properties import StringProperty,NumericProperty,ObjectProperty, BooleanProperty
from kivy.core.window import Window
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.popup import Popup
from kivy.uix.treeview import TreeView, TreeViewLabel, TreeViewNode
from kivy.uix.label import Label
from kivy.uix.textinput import TextInput
from kivy.uix.progressbar import ProgressBar
from kivy.uix.button import Button
set_start_method('spawn')
class TestApp(App):
def __init__(self, **kwargs):
super(TestApp, self).__init__(**kwargs)
self.title = 'testApp'
def build(self):
return TextWidget()
class TextWidget(Widget):
def __init__(self, **kwargs):
super(TextWidget, self).__init__(**kwargs)
self.process_test = None
self.proc = None
# shared memory
self.count = Value('i', 0)
self.lock = Lock()
def buttonClicked(self):
if self.ids.button1.text == "start":
self.proc = start_process(self.count,self.lock)
self.ids.button1.text = "stop"
else:
if self.proc:
self.proc.kill()
self.ids.button1.text = "start"
self.ids.lab.text = str(self.count.value)
Builder.load_string(kvLoadString)
TestApp().run()
但是,我没有成功地将Kivy与字符串共享内存一起使用
,如下所示。
from multiprocessing import Array,Process, Value, set_start_method, freeze_support, Lock
import math
kvLoadString=("""
<TextWidget>:
BoxLayout:
orientation: 'vertical'
size: root.size
Button:
id: button1
text: "start"
font_size: 48
on_press: root.buttonClicked()
Label:
id: lab
text: 'result'
""")
def start_process(array,lock):
process_test = Process(target=p_test, args=[array,lock], daemon=True)
process_test.start()
return process_test
def p_test(array,lock):
with lock:
buffer_data = str("z1111").encode("utf-8")
array[0:len(buffer_data)] = buffer_data[:]
print(array.decode("utf-8"))
if __name__ == '__main__':
freeze_support()
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.checkbox import CheckBox
from kivy.uix.spinner import Spinner
from kivy.properties import StringProperty,NumericProperty,ObjectProperty, BooleanProperty
from kivy.core.window import Window
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.popup import Popup
from kivy.uix.treeview import TreeView, TreeViewLabel, TreeViewNode
from kivy.uix.label import Label
from kivy.uix.textinput import TextInput
from kivy.uix.progressbar import ProgressBar
from kivy.uix.button import Button
set_start_method('spawn')
class TestApp(App):
def __init__(self, **kwargs):
super(TestApp, self).__init__(**kwargs)
self.title = 'testApp'
def build(self):
return TextWidget()
class TextWidget(Widget):
def __init__(self, **kwargs):
super(TextWidget, self).__init__(**kwargs)
self.process_test = None
self.proc = None
# shared memory
self.array = Array('c', 35)
self.lock = Lock()
def buttonClicked(self):
if self.ids.button1.text == "start":
self.proc = start_process(self.array,self.lock)
self.ids.button1.text = "stop"
else:
if self.proc:
self.proc.kill()
self.ids.button1.text = "start"
self.ids.lab.text = self.array.decode("utf-8")
Builder.load_string(kvLoadString)
TestApp().run()
错误信息如下。如何避免这样的错误?
Traceback (most recent call last):
File "C:\Users\taichi\Documents\Winpython64-3.9.10.0\WPy64-39100\python-3.9.10.amd64\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\taichi\Documents\Winpython64-3.9.10.0\WPy64-39100\python-3.9.10.amd64\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\taichi\Desktop\test.py", line 29, in p_test
print(array.decode("utf-8"))
AttributeError: 'SynchronizedString' object has no attribute 'decode'
s = bytes.join(b'', [c for c in array if c != b'\x00']).decode('utf-8')