我创建了一个简单的 Python Flask 应用程序,它使用 MQTT 客户端接收消息,然后将其发布到 Flask 应用程序。该程序在我的本地计算机上运行时按预期运行,但在部署在 pythonanywhere.com 上时从未接收/发布消息。 我在网站上升级了我的帐户,因为免费用户现在允许连接到未列入白名单的 mqtt 代理,所以这不应该是问题。 我使用一个简单的字符串连接和一些名称,如“clientReceiver”+ str(time.time()) 以避免客户端在创建新实例时具有相同的名称。
我尝试过使用flask_mqtt库和paho.mqtt库,两者都有相同的问题。如果我在本地 PC 上运行一个单独的 mqtt 客户端并将其发布到已部署的 Flask 应用程序,它会收到消息,但不会自行接收消息。
这是 mqtt 客户端的代码:
info = Info(title='AIR API', version='1.0.0')
app = OpenAPI(__name__, info=info, template_folder='templates')
air_tag = Tag(name='air', description='air data operations')
app.config['MQTT_BROKER_URL'] = "boker.url"
app.config['MQTT_BROKER_PORT'] = 8883
app.config['MQTT_USERNAME'] = "username"
app.config['MQTT_PASSWORD'] = "password"
app.config['MQTT_CLIENT_ID'] = "clientReceive" + str(time.time())
app.config['MQTT_TLS_ENABLED'] = True
app.config['MQTT_TLS_CA_CERTS'] = None
app.config['MQTT_TLS_CERTFILE'] = None
app.config['MQTT_TLS_KEYFILE'] = None
app.config['MQTT_TLS_CERT_REQS'] = ssl.CERT_NONE
app.config['MQTT_TLS_VERSION'] = ssl.PROTOCOL_TLS_CLIENT
topic = "mytopic"
mqtt = Mqtt(app)
@mqtt.on_connect()
def on_connect(client, userdata, flags, rc):
# Code to execute when the MQTT client connects to the broker
# This function is called automatically when a connection is established.
print("Connected with result code " + str(rc))
client.subscribe(topic)
@mqtt.on_message()
def on_message(client, userdata, message):
# Code to execute when a message is received on a subscribed topic
# This function is called automatically when a message is received.
data = message.payload.decode()
headers = {'Content-Type': 'application/json'}
try:
response = requests.post('mysite/whereipost', data=data,
headers=headers)
if response.status_code == HTTPStatus.CREATED:
print('Data received and stored')
print("Data: " + data)
else:
print('Error storing data, status code:', {response.status_code})
except Exception as e:
print(f"failed to forward message to Flask endpoint: {e}")
以及用于在 Flask 应用程序上发布的代码:
@app.route('/api/core2/sensors/air-quality', methods=['POST'])
def post_air_data(Body: AirData):
""" Adds a new air data measurement """
measurements.data.append(Body)
# Get the time of the last measurement
times.time.append(dateTime(time=datetime.datetime.now()))
return AirDataId(id=len(measurements.data) - 1).json(), HTTPStatus.CREATED
英国人在帖子评论中提出的问题。 Flask_mqtt 调用使用线程的 paho.mqtt Loop_start() 函数。 pythonanywhere.com 不支持线程,这实际上使得它只是忽略代码。 为了解决这个问题,我制作了一个单独的程序并将其作为始终执行的任务运行。