如何改进我的代码? [Python、MQTT、PLC (BACNet)、JSON] [关闭]

问题描述 投票:0回答:0

这里是新手。我想编写一个代码,可以为远程服务器打开 SSH 隧道,然后从 PLC 读取数据并通过 MQTT 以 JSON 格式传输。这是我的代码,但我认为这不是最清晰、最有效的版本。我想将它托管在 Azure Function 中并每天使用 Azure Logic App 触发一次。我应该如何改进我的代码?

# This is the main script file of the PLC Reader of my project.

# It is used to read the data from the PLC and send it to the server.

# Author: 

# License: MIT

# Imported libraries

import time
import json
import subprocess

# Imported third party libraries

import BAC0
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish

# Import secrets

from secrets.secrets import broker_address, plc_address, target_mqtt_server
from secrets.secrets import ssh_user, ssh_password, ssh_privatekey, ssh_local_port, ssh_port

# Functions

def create_ssh_tunnel():
print('Creating SSH Tunnel at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
try:
tunnel = subprocess.Popen(
\['ssh', '-i', ssh_privatekey, '-N', '-L', f'{ssh_local_port}:{plc_address}:{ssh_local_port}',
f'{ssh_user}@{broker_address}'\], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if tunnel.returncode is not None and tunnel.returncode != 0:
print(f"Failed to create SSH tunnel: {tunnel.stderr.read().decode()}")
else:
print("SSH tunnel created successfully.")
except Exception as e:
print(f"Error creating SSH tunnel: {e}")
time.sleep(5)

def check_broker_connection():
\# Check connection to the MQTT Broker
print('Checking connection to the MQTT Broker...')
try:
\# Create a new MQTT client
client = mqtt.Client()
client.tls_set()
client.username_pw_set(ssh_user, ssh_password)
client.connect(broker_address, int(ssh_port), bind_address='127.0.0.1', bind_port=int(ssh_local_port))
\# Disconnect from the MQTT Broker
client.disconnect()
\# Return True if the connection was successful
return True
except Exception as e:
\# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False

def check_plc_connection():
\# Check connection to the PLC
try:
\# Create a new BAC0 object
bacnet = BAC0.lite()
\# Read the value of the object
bacnet.read(plc_address, 'analogValue', 1)
\# Return True if the connection was successful
return True
except Exception as e:
\# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False

def check_target_mqtt_server_connection():
\# Check connection to the target MQTT server
print('Checking connection to the target MQTT server...')
try:
\# Create a new MQTT client
client = mqtt.Client()
\# Connect to the target MQTT server
client.connect(target_mqtt_server)
\# Disconnect from the target MQTT server
client.disconnect()
\# Return True if the connection was successful
return True
except Exception as e:
\# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False

def connection_checker():
\# Check the connections
print('Checking connections...')
\# Check the connection to the MQTT Broker
if check_broker_connection():
\# Check the connection to the PLC
print('Connection to the MQTT Broker was successful.')
else:
print('Connection to the MQTT Broker was not successful.')
if check_plc_connection():
\# Check the connection to the target MQTT server
print('Connection to the PLC was successful.')
else:
print('Connection to the PLC was not successful.')

    if check_target_mqtt_server_connection():
        # Return True if all the connections were successful
        print('Connection to the target MQTT server was successful.')
    else:
        print('Connection to the target MQTT server was not successful.')

def read_plc_to_json():
\# Read the PLC
print('Reading the PLC...')
\# Create a new BAC0 object
bacnet = BAC0.lite()
\# Read the value of the object
value = bacnet.read(plc_address, 'analogValue', 1)

    # Convert the value to a JSON object
    json_data = json.dumps({'data': value})
    
    # Return the JSON data
    return json_data

# Main function

def main():
print('Starting PLC Reader script at ' + time.strftime('%Y-%m-%d %H:%M:%S'))

    # Create the SSH tunnel
    create_ssh_tunnel()
    
    # Check the connections
    if connection_checker():
        print('All connections are successful.')
        json_data = read_plc_to_json()
        print('Data read from the PLC: ' + str(json_data))
    
        # Publish the data to the MQTT Broker
        publish.single('target_mqtt/plc', json_data, hostname=target_mqtt_server, port=broker_port)

# Run the main script

if __name__ == '__main__':
main()

提前谢谢大家!

我尽力收集所需的库并为我的问题创建有效的解决方案。

python json mqtt plc bacnet
© www.soinside.com 2019 - 2024. All rights reserved.