这里是新手。我想编写一个代码,可以为远程服务器打开 SSH 隧道,然后从 PLC 读取数据并通过 MQTT 以 JSON 格式传输。这是我的代码,但我认为这不是最清晰、最有效的版本。我想将它托管在 Azure Function 中并每天使用 Azure Logic App 触发一次。我应该如何改进我的代码?
# This is the main script file of the PLC Reader of my project.
# It is used to read the data from the PLC and send it to the server.
# Author:
# License: MIT
# Imported libraries
import time
import json
import subprocess
# Imported third party libraries
import BAC0
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
# Import secrets
from secrets.secrets import broker_address, plc_address, target_mqtt_server
from secrets.secrets import ssh_user, ssh_password, ssh_privatekey, ssh_local_port, ssh_port
# Functions
def create_ssh_tunnel():
print('Creating SSH Tunnel at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
try:
tunnel = subprocess.Popen(
\['ssh', '-i', ssh_privatekey, '-N', '-L', f'{ssh_local_port}:{plc_address}:{ssh_local_port}',
f'{ssh_user}@{broker_address}'\], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if tunnel.returncode is not None and tunnel.returncode != 0:
print(f"Failed to create SSH tunnel: {tunnel.stderr.read().decode()}")
else:
print("SSH tunnel created successfully.")
except Exception as e:
print(f"Error creating SSH tunnel: {e}")
time.sleep(5)
def check_broker_connection():
\# Check connection to the MQTT Broker
print('Checking connection to the MQTT Broker...')
try:
\# Create a new MQTT client
client = mqtt.Client()
client.tls_set()
client.username_pw_set(ssh_user, ssh_password)
client.connect(broker_address, int(ssh_port), bind_address='127.0.0.1', bind_port=int(ssh_local_port))
\# Disconnect from the MQTT Broker
client.disconnect()
\# Return True if the connection was successful
return True
except Exception as e:
\# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False
def check_plc_connection():
\# Check connection to the PLC
try:
\# Create a new BAC0 object
bacnet = BAC0.lite()
\# Read the value of the object
bacnet.read(plc_address, 'analogValue', 1)
\# Return True if the connection was successful
return True
except Exception as e:
\# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False
def check_target_mqtt_server_connection():
\# Check connection to the target MQTT server
print('Checking connection to the target MQTT server...')
try:
\# Create a new MQTT client
client = mqtt.Client()
\# Connect to the target MQTT server
client.connect(target_mqtt_server)
\# Disconnect from the target MQTT server
client.disconnect()
\# Return True if the connection was successful
return True
except Exception as e:
\# Return False if the connection was not successful
print('The exception was: ' + str(e))
return False
def connection_checker():
\# Check the connections
print('Checking connections...')
\# Check the connection to the MQTT Broker
if check_broker_connection():
\# Check the connection to the PLC
print('Connection to the MQTT Broker was successful.')
else:
print('Connection to the MQTT Broker was not successful.')
if check_plc_connection():
\# Check the connection to the target MQTT server
print('Connection to the PLC was successful.')
else:
print('Connection to the PLC was not successful.')
if check_target_mqtt_server_connection():
# Return True if all the connections were successful
print('Connection to the target MQTT server was successful.')
else:
print('Connection to the target MQTT server was not successful.')
def read_plc_to_json():
\# Read the PLC
print('Reading the PLC...')
\# Create a new BAC0 object
bacnet = BAC0.lite()
\# Read the value of the object
value = bacnet.read(plc_address, 'analogValue', 1)
# Convert the value to a JSON object
json_data = json.dumps({'data': value})
# Return the JSON data
return json_data
# Main function
def main():
print('Starting PLC Reader script at ' + time.strftime('%Y-%m-%d %H:%M:%S'))
# Create the SSH tunnel
create_ssh_tunnel()
# Check the connections
if connection_checker():
print('All connections are successful.')
json_data = read_plc_to_json()
print('Data read from the PLC: ' + str(json_data))
# Publish the data to the MQTT Broker
publish.single('target_mqtt/plc', json_data, hostname=target_mqtt_server, port=broker_port)
# Run the main script
if __name__ == '__main__':
main()
提前谢谢大家!
我尽力收集所需的库并为我的问题创建有效的解决方案。