我正在尝试将 Raspberry Pi Pico W 连接到 AWS IoT Core 以发送传感器数据。我已经安装了 umqtt.simple 库并遵循了一些在线指南,但在处理证书和建立连接时遇到错误。
我的目标是建立安全连接,并希望获得有关代码片段或实现此目标的最佳实践的任何帮助。
具体来说,我不确定:
如何正确读取和使用下载的证书和私钥文件。 使用 umqtt.simple 建立连接并启用安全通信的语法。 预先感谢您的帮助!
这是我用来建立连接的代码。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
# AWS IoT Core - RPi Pico W Demo
# Required imports
import time
import machine
import network
import ujson
from umqtt.simple import MQTTClient
[code][code][code]
###############################################################################
### START CODE MODIFICATION ###################################################
###############################################################################
# Wifi Name / SSID
SSID = b'<your wifi network name>'
# Wifi Password
PASS = b'<your wifi password>'
# AWS ThingName is used for the Client ID (Best Practice). Example: RaspberryPiPicoW
CLIENT_ID = b'RaspberryPiPicoW'
# AWS Endpoint (Refer to "Creating a Thing in AWS IoT Core" step 13)
AWS_ENDPOINT = b'<your IoT Endpoint value here>'
###############################################################################
### END CODE MODIFICATION #####################################################
###############################################################################
# AWS IoT Core publish topic
PUB_TOPIC = b'/' + CLIENT_ID + '/temperature'
# AWS IoT Core subscribe topic
SUB_TOPIC = b'/' + CLIENT_ID + '/light'
# Reading Thing Private Key and Certificate into variables for later use
with open('/certs/key.der', 'rb') as f:
DEV_KEY = f.read()
# Thing Certificate
with open('/certs/cert.der', 'rb') as f:
DEV_CRT = f.read()
# Define light (Onboard Green LED) and set its default state to off
light = machine.Pin("LED", machine.Pin.OUT)
light.off()
# Wifi Connection Setup
def wifi_connect():
print('Connecting to wifi...')
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(SSID, PASS)
while wlan.isconnected() == False:
light.on()
print('Waiting for connection...')
time.sleep(0.5)
light.off()
time.sleep(0.5)
print('Connection details: %s' % str(wlan.ifconfig()))
# Callback function for all subscriptions
def mqtt_subscribe_callback(topic, msg):
print("Received topic: %s message: %s" % (topic, msg))
if topic == SUB_TOPIC:
mesg = ujson.loads(msg)
if 'state' in mesg.keys():
if mesg['state'] == 'on' or mesg['state'] == 'ON' or mesg['state'] == 'On':
light.on()
print('Light is ON')
else:
light.off()
print('Light is OFF')
# Read current temperature from RP2040 embeded sensor
def get_rpi_temperature():
sensor = machine.ADC(4)
voltage = sensor.read_u16() * (3.3 / 65535)
temperature = 27 - (voltage - 0.706) / 0.001721
return temperature
# Connect to wifi
wifi_connect()
# Set AWS IoT Core connection details
mqtt = MQTTClient(
client_id=CLIENT_ID,
server=AWS_ENDPOINT,
port=8883,
keepalive=5000,
ssl=True,
ssl_params={'key':DEV_KEY, 'cert':DEV_CRT, 'server_side':False})
# Establish connection to AWS IoT Core
mqtt.connect()
# Set callback for subscriptions
mqtt.set_callback(mqtt_subscribe_callback)
# Subscribe to topic
mqtt.subscribe(SUB_TOPIC)
# Main loop - with 5 sec delay
while True:
# Publisg the temperature
message = b'{"temperature":%s, "temperature_unit":"Degrees Celsius"}' % get_rpi_temperature()
print('Publishing topic %s message %s' % (PUB_TOPIC, message))
# QoS Note: 0=Sent zero or more times, 1=Sent at least one, wait for PUBACK
# See https://docs.aws.amazon.com/iot/latest/developerguide/mqtt.html
mqtt.publish(topic=PUB_TOPIC, msg=message, qos=0)
# Check subscriptions for message
mqtt.check_msg()
time.sleep(5)
我使用上面的代码连接我的树莓派 Pico W,将其连接到 AWS IoT 核心。该代码一直有效,直到它帮助我连接到 WiFi 的部分为止。但一旦到达与 IoT 核心建立连接的代码,它就会停止工作。它给出以下错误。
回溯(最近一次调用最后一次): 文件“”,第 99 行,位于 类型错误:意外的关键字参数“ssl_params”
class MQTTClient:
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=None,
):
“ssl_params”没有参数 我认为您正在引用 mpython 文档而不是 Micropython 文档