我正在使用 micropython 在 esp32-s3 上运行一个项目。 我尝试使用 https 将日志发布到 aws cloudwatch,它要求我创建 aws v4 签名,但是我从 AWS 收到以下错误:
{"__type":"InvalidSignatureException","message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."}
以下是我的签名方法:
try:
from uhashlib import sha256 as _sha256
except ImportError:
print("Warning: not using uhashlib")
from hashlib import sha256 as _sha256
import hmac_ltd as _hmac
import ubinascii as _ubinascii
def request_gen(
access_key,
secret_key,
date_time_stamp,
method="POST",
region="us-east-1",
body="",
uri="",
):
service = "logs"
request_type = "aws4_request"
algorithm = "AWS4-HMAC-SHA256"
date_stamp = date_time_stamp[:8]
return_dict = {}
return_dict["host"] = f"{service}.{region}.amazonaws.com"
return_dict["uri"] = "/"
key = bytearray()
key.extend(("AWS4" + secret_key).encode())
print("key: ", key)
kDate = _hmac.new(key, date_stamp, _sha256).digest()
kRegion = _hmac.new(kDate, region, _sha256).digest()
kService = _hmac.new(kRegion, service, _sha256).digest()
kSigning = _hmac.new(kService, request_type, _sha256).digest()
content_length = str(len(body))
print(body)
payload_hash = _ubinascii.hexlify(_sha256(body.encode("utf-8")).digest()).decode(
"utf-8"
)
# make the string to sign
canonical_querystring = "" # no request params for logs
canonical_headers_dict = {
"content-type": "application/x-amz-json-1.1",
"host": return_dict["host"],
"x-amz-content-sha256": payload_hash,
"x-amz-date": date_time_stamp,
"x-amz-target": "Logs_20140328.PutLogEvents",
}
# Build the canonical headers string
canonical_headers = "\n".join(
f"{key}:{value}" for key, value in canonical_headers_dict.items()
)
# Get the sorted keys for signed_headers
signed_headers = ";".join(
sorted(key.lower() for key in canonical_headers_dict.keys())
)
canonical_request = (
method
+ "\n"
+ return_dict["uri"]
+ "\n"
+ canonical_querystring
+ "\n"
+ canonical_headers
+ "\n"
+ signed_headers
+ "\n"
+ payload_hash
)
canonical_request_hash = _ubinascii.hexlify(
_sha256(canonical_request.encode("utf-8")).digest()
).decode("utf-8")
credential_scope = date_stamp + "/" + region + "/" + service + "/" + request_type
string_to_sign = (
algorithm
+ "\n"
+ date_time_stamp
+ "\n"
+ credential_scope
+ "\n"
+ canonical_request_hash
)
# generate the signature:
signature = _hmac.new(kSigning, string_to_sign, _sha256).digest()
signatureHex = _ubinascii.hexlify(signature).decode("utf-8")
authorization_header = (
algorithm
+ " "
+ "Credential="
+ access_key
+ "/"
+ credential_scope
+ ", "
+ "SignedHeaders="
+ signed_headers
+ ", "
+ "Signature="
+ signatureHex
)
return_dict["headers"] = {
"authorization": authorization_header,
"content-type": "application/x-amz-json-1.1",
"host": return_dict["host"],
"x-amz-content-sha256": payload_hash,
"x-amz-date": date_time_stamp,
"x-amz-target": "Logs_20140328.PutLogEvents",
}
return return_dict
以下是我的输出:
##########################################
canonical_request:
POST
/
content-type:application/x-amz-json-1.1
host:logs.us-east-1.amazonaws.com
x-amz-target:Logs_20140328.PutLogEvents
x-amz-content-sha256:7dbe84657fbd1885f46b4d0d51a6b49be58c78db693af0fe85e335f28f6efefd
x-amz-date:20231125T183032Z
content-type;host;x-amz-content-sha256;x-amz-date;x-amz-target
7dbe84657fbd1885f46b4d0d51a6b49be58c78db693af0fe85e335f28f6efefd
##########################################
string_to_sign:
AWS4-HMAC-SHA256
20231125T183032Z
20231125/us-east-1/logs/aws4_request
62143e2e37c17f0122bfce2333c7523c78844be83a23df9c426ddf9109ee8584
signature:
07685ab92b73a08ca828948e05251296f3a0bdc774f0bc904e8311064e8fc407
##########################################
BEGIN REQUEST++++++++++++++++++++++++++++++++++++
Request URL = https://logs.us-east-1.amazonaws.com
Request Headers = {"authorization": "AWS4-HMAC-SHA256 Credential=<ACCESS_KEY>/20231125/us-east-1/logs/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-target, Signature=07685ab92b73a08ca828948e05251296f3a0bdc774f0bc904e8311064e8fc407", "x-amz-date": "20231125T183032Z", "x-amz-target": "Logs_20140328.PutLogEvents", "host": "logs.us-east-1.amazonaws.com", "x-amz-content-sha256": "7dbe84657fbd1885f46b4d0d51a6b49be58c78db693af0fe85e335f28f6efefd", "content-type": "application/x-amz-json-1.1"}
Request payload = {"logGroupName":"esp32-test","logEvents":[{"message":"This is a sample log message from esp32","timestamp":1700937032000}],"logStreamName":"esp32-test-stream-3"}
RESPONSE++++++++++++++++++++++++++++++++++++
Response code: 400
{'_content_consumed': False, 'raw': <SSLSocket>, 'encoding': 'utf-8', 'reason': b'Bad Request', '_cached': None, 'status_code': 400}
{"__type":"InvalidSignatureException","message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."}
>>>
知道如何解决这个问题吗?
我尝试过的事情:
我使用的是 Pi Pico W,从 micropython 使用 AWS 似乎非常困难。不过,我认为我已经成功了,并且您的代码一路上很有帮助。由于标准库不可用,很难完全符合 AWS 的预期。但这是可能的。而且值得,因为使用微控制器作为客户端并借助 AWS on-tap 的强大功能开启了很多可能性?
我不确定 hmac_ltd 是什么,但我用过
import hmac
我需要更改 hmac.py 中的第 20 行,这意味着我无法使用 hmac.mpy (https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py )
/lib/hmac.py
# make_hash = lambda d=b"": hashlib.new(digestmod, d) # this way does not work with uhashlib
make_hash = lambda d=b"": getattr(hashlib, digestmod)(d) # this way works with uhashlib
这是我的代码,用于签署 AWS v4 请求。
我使用它在 AWS lambda 函数(“execute-api”)上调用 GET,并且您想要 POST 到日志记录函数,但我认为您可以调整它。
main.py
import hmac
import uhashlib as hashlib
import ubinascii
import utime
import urequests
import json
import config
import credentials
def getSignatureKey(secret_key, date_stamp, regionName, serviceName):
kDate = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def create_signed_headers(
api_host,
api_uri,
region,
service,
access_key,
secret_key,
http_method='GET',
query_string='',
additional_headers=None,
payload=''
):
# Create a date for headers and the credential string
t = utime.gmtime()
amz_date = f"{t[0]:04d}{t[1]:02d}{t[2]:02d}T{t[3]:02d}{t[4]:02d}{t[5]:02d}Z"
date_stamp = f"{t[0]:04d}{t[1]:02d}{t[2]:02d}"
# Prepare canonical request
canonical_querystring = query_string
canonical_headers = 'host:' + api_host + '\n' + 'x-amz-date:' + amz_date + '\n'
signed_headers = 'host;x-amz-date'
payload = '' # GET requests don't usually have a payload
payload_hash = ubinascii.hexlify(hashlib.sha256(payload.encode('utf-8')).digest()).decode()
# print(f"\npayload_hash: {payload_hash}\n")
canonical_request = (
http_method + '\n' + api_uri + '\n' + canonical_querystring + '\n' +
canonical_headers + '\n' + signed_headers + '\n' + payload_hash
)
# Prepare string to sign
algorithm = 'AWS4-HMAC-SHA256'
credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
string_to_sign = (
algorithm + '\n' + amz_date + '\n' + credential_scope + '\n' +
ubinascii.hexlify(hashlib.sha256(canonical_request.encode('utf-8')).digest()).decode()
)
# Calculate the signature
signing_key = getSignatureKey(secret_key, date_stamp, region, service)
signature = hmac.new(
signing_key, string_to_sign.encode('utf-8'), digestmod='sha256'
).hexdigest()
# Prepare authorization header
authorization_header = (
algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +
'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
)
# Prepare headers
headers = {'x-amz-date':amz_date, 'Authorization':authorization_header}
if additional_headers is not None:
headers.update(additional_headers)
print("================================")
print("Canonical Request")
print("================================")
print(http_method) # HTTPRequestMethod
print(api_uri) # CanonicalURI
print(canonical_querystring) # CanonicalQueryString
print(canonical_headers) # CanonicalHeaders
print(signed_headers) # SignedHeaders
print(payload_hash) # HashedPayload
print("================================")
print("String-to-sign")
print("================================")
print(string_to_sign)
return headers
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), digestmod='sha256').digest()
def main():
additional_headers = {'x-apikey': credentials.RAILDATAORG_API_KEY}
payload = ""
http_method = "GET"
headers = create_signed_headers(
api_host=config.AWS_API_HOST,
api_uri=config.AWS_API_URI,
region=config.AWS_API_REGION,
service=config.AWS_API_SERVICE,
access_key=credentials.AWS_ACCESS_KEY,
secret_key=credentials.AWS_SECRET_ACCESS_KEY,
query_string=config.AWS_API_QUERYSTRING,
additional_headers=additional_headers,
http_method=http_method,
payload=payload
)
try:
if http_method == 'GET':
response = urequests.get(config.AWS_API_URL, headers=headers)
elif http_method == 'POST':
response = urequests.post(config.AWS_API_URL, headers=headers, data=payload)
elif http_method == 'PUT':
response = urequests.put(config.AWS_API_URL, headers=headers, data=payload)
elif http_method == 'DELETE':
response = urequests.delete(config.AWS_API_URL, headers=headers)
else:
raise ValueError(f"Unsupported HTTP method: {http_method}")
if response.status_code < 200 or response.status_code >= 400:
print(f"Request failed with status code {response.status_code}")
return
except Exception as e:
print(f"Request failed: {e}")
return
response_json = json.loads(response.text)
# Use this troubleshooting AWS. Get the message and any error from the response
message = response_json.get('message')
error = response_json.get('error')
print("================================")
print("Response")
print("================================")
if message is not None:
message = message.replace("'", "") # Remove single quotes to make comparison easier
print(f"Message: {message}")
if error is not None:
print(f"Error: {error}")
print(response.text)
# print(json.dumps(response_json))
if __name__ == "__main__":
main()
凭证.py
# AWS Access Key
AWS_ACCESS_KEY = "your AWS key here"
# AWS Secret Access Key
AWS_SECRET_ACCESS_KEY = "your AWS secret key here"
config.py
import re
def parse_url(url):
pattern = '^(https?):\\/\\/([^\\/]+)\\/([^\\?]*)(\\?(.*))?'
match = re.match(pattern, url)
if match is None:
raise ValueError(f"Invalid URL: {url}")
protocol = match.group(1)
host = match.group(2)
uri = match.group(3)
query_string = match.group(5) or ""
return protocol, host, uri, query_string
# Specify the full API URL and the code will parse it into components.
AWS_API_URL = f"https://id-given-you-by-AWS.execute-api.eu-west-2.amazonaws.com/prod/PATH?somekey=somevalue"
# Parse the URL into components
AWS_API_PROTOCOL, AWS_API_HOST, AWS_API_URI, AWS_API_QUERYSTRING = parse_url(AWS_API_URL)
host_parts = AWS_API_HOST.split('.')
AWS_API_ID = host_parts[0]
AWS_API_SERVICE = host_parts[1]
AWS_API_REGION = host_parts[2]
希望对您有帮助并祝您好运。