python 请求阻止发布 mqtt 消息

问题描述 投票:0回答:1

我正在开发一个从两个来源获取数据的程序:

  • 来自前端的传入 mqtt 消息(paho)和
  • 来自 RestAPI 的数据(请求)

传入的 mqtt 消息通过前端的用户输入触发对 RestAPI 的请求。

此外,它使用相同的连接/客户端,以一秒的间隔通过 mqtt 向前端发送数据。

编辑: 这种“推送方法”的另一种方法可能是在前端会话启动/重新加载时从前端请求数据,然后如果仅更改数据,则从后端推送数据。但这并不能修复请求的阻塞行为。

按照代码片段...

def main():
    # Connect to MQTT broker
    mqtt = mqttClient()

    testClass = TestClassName(mqtt.client)

    try:
        # Start infinit loop to run mqtt client
        mqtt.client.loop_start() # in this loop the callback for the triggering messages is running
           
        # publish every 1 second
        while True:
            time.sleep(1)
            testClass.uiPublish() # send data back to frontend

    except KeyboardInterrupt:
        logging.debug("Programm stopped by user")
        mqtt.client.loop_stop()
        mqtt.client.disconnect()

TestClassName
还包含mqtt消息的回调和发布功能
uiPublish()
。在其中一个回调中,会触发 API 请求的函数。

发送的数据是类

TestClassName
的属性,并且由 API 请求更新。所以这两个函数都必须能够访问这个属性。

# class TestClassName containes this function
def subscribeMqtt(self, topic):
    self.mqttClient.subscribe(topic)
    self.mqttClient.message_callback_add(topic, self.dotherequest)

总的来说,它可以工作,但有一个问题:对 API 的请求

dotherequest
会阻止在 main() 函数的 while 循环中发送 mqtt 消息。该请求需要一些时间(最多 5 秒),这也会导致与 mqtt 代理的断开/重新连接。 因此,前端在请求结束时已经过时,而且代理上的重新连接也不是一个好的解决方案。只要请求正在运行,前端就应该获取“旧”信息。请求结束时,属性会更新,最多 1 秒后数据将发送到前端。

我尝试了多种方法来处理这个问题,但我认为我有一些普遍的问题来理解Python中多线程/异步背后的原理。我尝试设置发出请求的函数:

  • 与 aiohttp 异步,
  • 带有螺纹的额外螺纹和
  • 与线程池异步

所有方法都不会改变mqtt消息发送的阻塞。我认为原因可能是因为最终我必须等待请求函数完成。我不会多次并行化同一函数。我尝试并行化来完成程序的各个部分。

有什么想法/建议可以解决这个问题吗?

python request mqtt blocking nonblocking
1个回答
0
投票

所以,我想我理解这个问题,并通过添加新线程找到了解决方案,如评论中推荐的那样。

以下是我如何实施该解决方案的示例。

订阅 mqtt 主题并创建函数回调

self.dotherequest

# class TestClassName containes this function
def subscribeMqtt(self, topic):
    self.mqttClient.subscribe(topic)
    self.mqttClient.message_callback_add(topic, self.dotherequest)

在此函数中,将创建并启动请求的新线程。请求本身位于函数中

self.functionWithRequest
锁用于在数据写入时保留对象的属性(在多个请求同时运行的情况下),也称为竞争条件。

def dotherequest(self, client, userdata, msg):
        payload = json.loads(msg.payload.decode())   
        # creating a lock 
        lock = threading.Lock() 
        # New thread form request (IO)
        t1 = threading.Thread(target=self.functionWithRequest, args=(payload, lock)) 
        t1.start()

这是请求发生的函数。

def functionWithRequest(self, payload, lock):   
    headers = { "Content-Type": "application/json" }
    data = {
        "datapoint1": "data1",
        "datapoint2": "data2",
        "datapoint3": "data3"
        }
    
    response = requests.post(os.getenv('requestUrl'), json=data, headers=headers, verify='ca.pem', timeout=15)

    if response.status_code == 200:
        load = json.loads(response.text)
            lock.acquire() 
            self.data = load['dataFromRequest']
            lock.release()
    else:
        log.error('Request NOT successful. Error code {0}.'.format(response.status_code))

据我了解Python中线程和进程之间的区别,线程使用相同的GIL,因此GIL一直在线程之间切换。最后,所有内容都在同一个 cpu 核心上运行,单个 python 解释器正在运行。优点之一是所有线程可以共享相同的变量/数据。

进程能够在多个 cpu 核心/多个 python 解释器上运行,这使得进程完成后同步数据变得更加困难。优点是计算能力更强。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.