我正在开发一个从两个来源获取数据的程序:
传入的 mqtt 消息通过前端的用户输入触发对 RestAPI 的请求。
此外,它使用相同的连接/客户端,以一秒的间隔通过 mqtt 向前端发送数据。
编辑: 这种“推送方法”的另一种方法可能是在前端会话启动/重新加载时从前端请求数据,然后如果仅更改数据,则从后端推送数据。但这并不能修复请求的阻塞行为。
按照代码片段...
def main():
# Connect to MQTT broker
mqtt = mqttClient()
testClass = TestClassName(mqtt.client)
try:
# Start infinit loop to run mqtt client
mqtt.client.loop_start() # in this loop the callback for the triggering messages is running
# publish every 1 second
while True:
time.sleep(1)
testClass.uiPublish() # send data back to frontend
except KeyboardInterrupt:
logging.debug("Programm stopped by user")
mqtt.client.loop_stop()
mqtt.client.disconnect()
类
TestClassName
还包含mqtt消息的回调和发布功能uiPublish()
。在其中一个回调中,会触发 API 请求的函数。
发送的数据是类
TestClassName
的属性,并且由 API 请求更新。所以这两个函数都必须能够访问这个属性。
# class TestClassName containes this function
def subscribeMqtt(self, topic):
self.mqttClient.subscribe(topic)
self.mqttClient.message_callback_add(topic, self.dotherequest)
总的来说,它可以工作,但有一个问题:对 API 的请求
dotherequest
会阻止在 main() 函数的 while 循环中发送 mqtt 消息。该请求需要一些时间(最多 5 秒),这也会导致与 mqtt 代理的断开/重新连接。
因此,前端在请求结束时已经过时,而且代理上的重新连接也不是一个好的解决方案。只要请求正在运行,前端就应该获取“旧”信息。请求结束时,属性会更新,最多 1 秒后数据将发送到前端。
我尝试了多种方法来处理这个问题,但我认为我有一些普遍的问题来理解Python中多线程/异步背后的原理。我尝试设置发出请求的函数:
所有方法都不会改变mqtt消息发送的阻塞。我认为原因可能是因为最终我必须等待请求函数完成。我不会多次并行化同一函数。我尝试并行化来完成程序的各个部分。
有什么想法/建议可以解决这个问题吗?
所以,我想我理解这个问题,并通过添加新线程找到了解决方案,如评论中推荐的那样。
以下是我如何实施该解决方案的示例。
订阅 mqtt 主题并创建函数回调
self.dotherequest
。
# class TestClassName containes this function
def subscribeMqtt(self, topic):
self.mqttClient.subscribe(topic)
self.mqttClient.message_callback_add(topic, self.dotherequest)
在此函数中,将创建并启动请求的新线程。请求本身位于函数中
self.functionWithRequest
锁用于在数据写入时保留对象的属性(在多个请求同时运行的情况下),也称为竞争条件。
def dotherequest(self, client, userdata, msg):
payload = json.loads(msg.payload.decode())
# creating a lock
lock = threading.Lock()
# New thread form request (IO)
t1 = threading.Thread(target=self.functionWithRequest, args=(payload, lock))
t1.start()
这是请求发生的函数。
def functionWithRequest(self, payload, lock):
headers = { "Content-Type": "application/json" }
data = {
"datapoint1": "data1",
"datapoint2": "data2",
"datapoint3": "data3"
}
response = requests.post(os.getenv('requestUrl'), json=data, headers=headers, verify='ca.pem', timeout=15)
if response.status_code == 200:
load = json.loads(response.text)
lock.acquire()
self.data = load['dataFromRequest']
lock.release()
else:
log.error('Request NOT successful. Error code {0}.'.format(response.status_code))
据我了解Python中线程和进程之间的区别,线程使用相同的GIL,因此GIL一直在线程之间切换。最后,所有内容都在同一个 cpu 核心上运行,单个 python 解释器正在运行。优点之一是所有线程可以共享相同的变量/数据。
进程能够在多个 cpu 核心/多个 python 解释器上运行,这使得进程完成后同步数据变得更加困难。优点是计算能力更强。