AWS 签名请求 micropython

问题描述 投票:0回答:1

我正在使用 micropython 在 esp32-s3 上运行一个项目。 我尝试使用 https 将日志发布到 aws cloudwatch,它要求我创建 aws v4 签名,但是我从 AWS 收到以下错误:

{"__type":"InvalidSignatureException","message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."}

以下是我的签名方法:


try:
    from uhashlib import sha256 as _sha256
except ImportError:
    print("Warning: not using uhashlib")
    from hashlib import sha256 as _sha256

import hmac_ltd as _hmac
import ubinascii as _ubinascii


def request_gen(
    access_key,
    secret_key,
    date_time_stamp,
    method="POST",
    region="us-east-1",
    body="",
    uri="",
):
    service = "logs"
    request_type = "aws4_request"
    algorithm = "AWS4-HMAC-SHA256"


    date_stamp = date_time_stamp[:8]

    return_dict = {}
    return_dict["host"] = f"{service}.{region}.amazonaws.com"
    return_dict["uri"] = "/"


    key = bytearray()
    key.extend(("AWS4" + secret_key).encode())
    print("key: ", key)
    kDate = _hmac.new(key, date_stamp, _sha256).digest()
    kRegion = _hmac.new(kDate, region, _sha256).digest()
    kService = _hmac.new(kRegion, service, _sha256).digest()
    kSigning = _hmac.new(kService, request_type, _sha256).digest()

    content_length = str(len(body))
    print(body)
    payload_hash = _ubinascii.hexlify(_sha256(body.encode("utf-8")).digest()).decode(
        "utf-8"
    )

    # make the string to sign
    canonical_querystring = ""  # no request params for logs

    canonical_headers_dict = {
        "content-type": "application/x-amz-json-1.1",
        "host": return_dict["host"],
        "x-amz-content-sha256": payload_hash,
        "x-amz-date": date_time_stamp,
        "x-amz-target": "Logs_20140328.PutLogEvents",
    }
    # Build the canonical headers string
    canonical_headers = "\n".join(
        f"{key}:{value}" for key, value in canonical_headers_dict.items()
    )
    # Get the sorted keys for signed_headers
    signed_headers = ";".join(
        sorted(key.lower() for key in canonical_headers_dict.keys())
    )

    canonical_request = (
        method
        + "\n"
        + return_dict["uri"]
        + "\n"
        + canonical_querystring
        + "\n"
        + canonical_headers
        + "\n"
        + signed_headers
        + "\n"
        + payload_hash
    )


    canonical_request_hash = _ubinascii.hexlify(
        _sha256(canonical_request.encode("utf-8")).digest()
    ).decode("utf-8")


    credential_scope = date_stamp + "/" + region + "/" + service + "/" + request_type
    string_to_sign = (
        algorithm
        + "\n"
        + date_time_stamp
        + "\n"
        + credential_scope
        + "\n"
        + canonical_request_hash
    )

    # generate the signature:
    signature = _hmac.new(kSigning, string_to_sign, _sha256).digest()
    signatureHex = _ubinascii.hexlify(signature).decode("utf-8")

    authorization_header = (
        algorithm
        + " "
        + "Credential="
        + access_key
        + "/"
        + credential_scope
        + ", "
        + "SignedHeaders="
        + signed_headers
        + ", "
        + "Signature="
        + signatureHex
    )

    return_dict["headers"] = {
        "authorization": authorization_header,
        "content-type": "application/x-amz-json-1.1",
        "host": return_dict["host"],
        "x-amz-content-sha256": payload_hash,
        "x-amz-date": date_time_stamp,
        "x-amz-target": "Logs_20140328.PutLogEvents",
    }

    return return_dict

以下是我的输出:



##########################################
canonical_request: 
POST
/

content-type:application/x-amz-json-1.1
host:logs.us-east-1.amazonaws.com
x-amz-target:Logs_20140328.PutLogEvents
x-amz-content-sha256:7dbe84657fbd1885f46b4d0d51a6b49be58c78db693af0fe85e335f28f6efefd
x-amz-date:20231125T183032Z
content-type;host;x-amz-content-sha256;x-amz-date;x-amz-target
7dbe84657fbd1885f46b4d0d51a6b49be58c78db693af0fe85e335f28f6efefd
##########################################


string_to_sign: 
AWS4-HMAC-SHA256
20231125T183032Z
20231125/us-east-1/logs/aws4_request
62143e2e37c17f0122bfce2333c7523c78844be83a23df9c426ddf9109ee8584



signature: 
07685ab92b73a08ca828948e05251296f3a0bdc774f0bc904e8311064e8fc407
##########################################


BEGIN REQUEST++++++++++++++++++++++++++++++++++++

Request URL = https://logs.us-east-1.amazonaws.com

Request Headers = {"authorization": "AWS4-HMAC-SHA256 Credential=<ACCESS_KEY>/20231125/us-east-1/logs/aws4_request, SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-target, Signature=07685ab92b73a08ca828948e05251296f3a0bdc774f0bc904e8311064e8fc407", "x-amz-date": "20231125T183032Z", "x-amz-target": "Logs_20140328.PutLogEvents", "host": "logs.us-east-1.amazonaws.com", "x-amz-content-sha256": "7dbe84657fbd1885f46b4d0d51a6b49be58c78db693af0fe85e335f28f6efefd", "content-type": "application/x-amz-json-1.1"}


Request payload = {"logGroupName":"esp32-test","logEvents":[{"message":"This is a sample log message from esp32","timestamp":1700937032000}],"logStreamName":"esp32-test-stream-3"}

RESPONSE++++++++++++++++++++++++++++++++++++
Response code: 400

{'_content_consumed': False, 'raw': <SSLSocket>, 'encoding': 'utf-8', 'reason': b'Bad Request', '_cached': None, 'status_code': 400}
{"__type":"InvalidSignatureException","message":"The request signature we calculated does not match the signature you provided. Check your AWS Secret Access Key and signing method. Consult the service documentation for details."}
>>> 

知道如何解决这个问题吗?

我尝试过的事情:

amazon-web-services digital-signature esp32 micropython amazon-cloudwatchlogs
1个回答
0
投票

我使用的是 Pi Pico W,从 micropython 使用 AWS 似乎非常困难。不过,我认为我已经成功了,并且您的代码一路上很有帮助。由于标准库不可用,很难完全符合 AWS 的预期。但这是可能的。而且值得,因为使用微控制器作为客户端并借助 AWS on-tap 的强大功能开启了很多可能性?

我不确定 hmac_ltd 是什么,但我用过

import hmac

我需要更改 hmac.py 中的第 20 行,这意味着我无法使用 hmac.mpy (https://github.com/micropython/micropython-lib/blob/master/python-stdlib/hmac/hmac.py

/lib/hmac.py

# make_hash = lambda d=b"": hashlib.new(digestmod, d)  # this way does not work with uhashlib
make_hash = lambda d=b"": getattr(hashlib, digestmod)(d) # this way works with uhashlib

这是我的代码,用于签署 AWS v4 请求。

我使用它在 AWS lambda 函数(“execute-api”)上调用 GET,并且您想要 POST 到日志记录函数,但我认为您可以调整它。

main.py

import hmac
import uhashlib as hashlib
import ubinascii
import utime
import urequests
import json
import config
import credentials

def getSignatureKey(secret_key, date_stamp, regionName, serviceName):
    kDate = sign(('AWS4' + secret_key).encode('utf-8'), date_stamp)
    kRegion = sign(kDate, regionName)
    kService = sign(kRegion, serviceName)
    kSigning = sign(kService, 'aws4_request')
    return kSigning

def create_signed_headers(
    api_host,
    api_uri,
    region,
    service,
    access_key,
    secret_key,
    http_method='GET',
    query_string='',
    additional_headers=None,
    payload=''
):
    # Create a date for headers and the credential string
    t = utime.gmtime()
    amz_date = f"{t[0]:04d}{t[1]:02d}{t[2]:02d}T{t[3]:02d}{t[4]:02d}{t[5]:02d}Z"
    date_stamp = f"{t[0]:04d}{t[1]:02d}{t[2]:02d}"

    # Prepare canonical request
    canonical_querystring = query_string
    canonical_headers = 'host:' + api_host + '\n' + 'x-amz-date:' + amz_date + '\n'
    signed_headers = 'host;x-amz-date'
    payload = '' # GET requests don't usually have a payload
    payload_hash = ubinascii.hexlify(hashlib.sha256(payload.encode('utf-8')).digest()).decode()

    # print(f"\npayload_hash: {payload_hash}\n")
    
    canonical_request = (
        http_method + '\n' + api_uri + '\n' + canonical_querystring + '\n' +
        canonical_headers + '\n' + signed_headers + '\n' + payload_hash
    )

    # Prepare string to sign
    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = date_stamp + '/' + region + '/' + service + '/' + 'aws4_request'
    string_to_sign = (
        algorithm + '\n' +  amz_date + '\n' +  credential_scope + '\n' +
        ubinascii.hexlify(hashlib.sha256(canonical_request.encode('utf-8')).digest()).decode()
    )

    # Calculate the signature
    signing_key = getSignatureKey(secret_key, date_stamp, region, service)
    signature = hmac.new(
        signing_key, string_to_sign.encode('utf-8'), digestmod='sha256'
    ).hexdigest()

    # Prepare authorization header
    authorization_header = (
        algorithm + ' ' + 'Credential=' + access_key + '/' + credential_scope + ', ' +
        'SignedHeaders=' + signed_headers + ', ' + 'Signature=' + signature
    )

    # Prepare headers
    headers = {'x-amz-date':amz_date, 'Authorization':authorization_header}
    
    if additional_headers is not None:
        headers.update(additional_headers)

    print("================================")
    print("Canonical Request")
    print("================================")
    print(http_method) # HTTPRequestMethod
    print(api_uri) # CanonicalURI
    print(canonical_querystring) # CanonicalQueryString
    print(canonical_headers) # CanonicalHeaders
    print(signed_headers) # SignedHeaders
    print(payload_hash) # HashedPayload
    print("================================")
    print("String-to-sign")
    print("================================")
    print(string_to_sign)

    return headers

def sign(key, msg):
    return hmac.new(key, msg.encode("utf-8"), digestmod='sha256').digest()

def main():
    additional_headers = {'x-apikey': credentials.RAILDATAORG_API_KEY}
    payload = ""
    http_method = "GET"

    headers = create_signed_headers(
        api_host=config.AWS_API_HOST,
        api_uri=config.AWS_API_URI,
        region=config.AWS_API_REGION,
        service=config.AWS_API_SERVICE, 
        access_key=credentials.AWS_ACCESS_KEY,
        secret_key=credentials.AWS_SECRET_ACCESS_KEY,
        query_string=config.AWS_API_QUERYSTRING,
        additional_headers=additional_headers,
        http_method=http_method,
        payload=payload
    )

    try:
        if http_method == 'GET':
            response = urequests.get(config.AWS_API_URL, headers=headers)
        elif http_method == 'POST':
            response = urequests.post(config.AWS_API_URL, headers=headers, data=payload)
        elif http_method == 'PUT':
            response = urequests.put(config.AWS_API_URL, headers=headers, data=payload)
        elif http_method == 'DELETE':
            response = urequests.delete(config.AWS_API_URL, headers=headers)
        else:
            raise ValueError(f"Unsupported HTTP method: {http_method}")
        
        if response.status_code < 200 or response.status_code >= 400:
            print(f"Request failed with status code {response.status_code}")
            return
    except Exception as e:
        print(f"Request failed: {e}")
        return

    response_json = json.loads(response.text)

    # Use this troubleshooting AWS. Get the message and any error from the response
    message = response_json.get('message')
    error = response_json.get('error')

    print("================================")
    print("Response")
    print("================================")
    if message is not None:
        message = message.replace("'", "") # Remove single quotes to make comparison easier
        print(f"Message: {message}")
    if error is not None:
        print(f"Error: {error}")

    print(response.text)
    # print(json.dumps(response_json))

if __name__ == "__main__":
    main()

凭证.py

# AWS Access Key
AWS_ACCESS_KEY = "your AWS key here"

# AWS Secret Access Key
AWS_SECRET_ACCESS_KEY = "your AWS secret key here"

config.py

import re

def parse_url(url):
    pattern = '^(https?):\\/\\/([^\\/]+)\\/([^\\?]*)(\\?(.*))?'
    match = re.match(pattern, url)

    if match is None:
        raise ValueError(f"Invalid URL: {url}")

    protocol = match.group(1)
    host = match.group(2)
    uri = match.group(3)
    query_string = match.group(5) or ""

    return protocol, host, uri, query_string

# Specify the full API URL and the code will parse it into components.
AWS_API_URL = f"https://id-given-you-by-AWS.execute-api.eu-west-2.amazonaws.com/prod/PATH?somekey=somevalue"

# Parse the URL into components
AWS_API_PROTOCOL, AWS_API_HOST, AWS_API_URI, AWS_API_QUERYSTRING = parse_url(AWS_API_URL)

host_parts = AWS_API_HOST.split('.')
AWS_API_ID = host_parts[0]
AWS_API_SERVICE = host_parts[1]
AWS_API_REGION = host_parts[2]

希望对您有帮助并祝您好运。

© www.soinside.com 2019 - 2024. All rights reserved.