如何将记录从NodeJS应用程序(lambda)放入Kinesis流

问题描述 投票:1回答:1

我有一个lambda函数,需要将记录放入Kinesis流。我没有收到错误(我承认),但消息似乎永远不会出现在流中...

我知道流本身是有效的,因为我可以使用aws cli app将消息推送到它。我已经验证了流名称和传递给putRecord()函数的其他参数。

我正在使用此代码将记录推送到流:

const params = {
      Data: payload,
      PartitionKey: partitionKey,
      StreamName: this.streamName,
    };
    const res = await this.awsKinesis.putRecord(params);

res是一个很复杂的对象,但它包含error: null ...

{
    "domain": null,
    "service": {
        "config": {
            "credentials": {
                "expired": false,
                "expireTime": null,
                "accessKeyId": "ASIAT5YUDX4OWH5FGFYE",
                "sessionToken": "FQoGZXIvYXdzEJr//////////wEaDFGkyxe1r9QSkkhSSyKKAgbrbB6ef77wtuCC4zIH3YB7C0xJPPoql1YtRGaxba5ZDSCwBBRSQ0cBeTPMmtdUqRGshdJjjLosON6QG0FGWdt3TNDrENxqFtxjrQAbCHXfIx3ARtnn6r2agZjXi9cGZhkdpvUMSIUpaC3ZC+E9wLLvkZyQBfTSsv6QdcoaKGqT8tJ9Px7Wp5BSV3Nw//NE0GtJwv0pXiQrb3c6p6GkETtAxBBVVgwJP1WYdF+kh+Gg24DxMPwwy66ayD6E7oZIWB4i7JaqMXHoDjf9D51bpWPUAVCKF9AVn3t4JiKFBVw7lFQC0m91N9HdcKLzGmjpvX4JJNzKwBA/D1TfALDsprrvU1u7r/RlyabzKIHtpeIF",
                "envPrefix": "AWS"
            },
            "credentialProvider": {
                "providers": [null, null, null, null]
            },
            "region": "eu-west-1",
            "logger": null,
            "apiVersions": {},
            "apiVersion": null,
            "endpoint": "kinesis.eu-west-1.amazonaws.com",
            "httpOptions": {
                "timeout": 120000
            },
            "maxRedirects": 10,
            "paramValidation": true,
            "sslEnabled": true,
            "s3ForcePathStyle": false,
            "s3BucketEndpoint": false,
            "s3DisableBodySigning": true,
            "computeChecksums": true,
            "convertResponseTypes": true,
            "correctClockSkew": false,
            "customUserAgent": null,
            "dynamoDbCrc32": true,
            "systemClockOffset": 0,
            "signatureVersion": "v4",
            "signatureCache": true,
            "retryDelayOptions": {},
            "useAccelerateEndpoint": false
        },
        "isGlobalEndpoint": false,
        "endpoint": {
            "protocol": "https:",
            "host": "kinesis.eu-west-1.amazonaws.com",
            "port": 443,
            "hostname": "kinesis.eu-west-1.amazonaws.com",
            "pathname": "/",
            "path": "/",
            "href": "https://kinesis.eu-west-1.amazonaws.com/"
        },
        "_clientId": 1
    },
    "operation": "putRecord",
    "params": {
        "Data": "<< THE MESSAGE >>",
        "PartitionKey": "c770e429-52e7-47c4-bcbc-497548ff9dee",
        "StreamName": "my-stream"
    },
    "httpRequest": {
        "method": "POST",
        "path": "/",
        "headers": {
            "User-Agent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
        },
        "body": "",
        "endpoint": {
            "protocol": "https:",
            "host": "kinesis.eu-west-1.amazonaws.com",
            "port": 443,
            "hostname": "kinesis.eu-west-1.amazonaws.com",
            "pathname": "/",
            "path": "/",
            "href": "https://kinesis.eu-west-1.amazonaws.com/"
        },
        "region": "eu-west-1",
        "_userAgent": "aws-sdk-nodejs/2.290.0 linux/v6.10.3 exec-env/AWS_Lambda_nodejs6.10"
    },
    "startTime": "2019-01-24T08:25:42.574Z",
    "response": {
        "request": "~context",
        "data": null,
        "error": null,
        "retryCount": 0,
        "redirectCount": 0,
        "httpResponse": {
            "headers": {},
            "streaming": false,
            "stream": null
        },
        "maxRetries": 3,
        "maxRedirects": 10
    },
    "_asm": {
        "currentState": "validate",
        "states": {
            "validate": {
                "accept": "build",
                "fail": "error"
            },
            "build": {
                "accept": "afterBuild",
                "fail": "restart"
            },
            "afterBuild": {
                "accept": "sign",
                "fail": "restart"
            },
            "sign": {
                "accept": "send",
                "fail": "retry"
            },
            "retry": {
                "accept": "afterRetry",
                "fail": "afterRetry"
            },
            "afterRetry": {
                "accept": "sign",
                "fail": "error"
            },
            "send": {
                "accept": "validateResponse",
                "fail": "retry"
            },
            "validateResponse": {
                "accept": "extractData",
                "fail": "extractError"
            },
            "extractError": {
                "accept": "extractData",
                "fail": "retry"
            },
            "extractData": {
                "accept": "success",
                "fail": "retry"
            },
            "restart": {
                "accept": "build",
                "fail": "error"
            },
            "success": {
                "accept": "complete",
                "fail": "complete"
            },
            "error": {
                "accept": "complete",
                "fail": "complete"
            },
            "complete": {
                "accept": null,
                "fail": null
            }
        }
    },
    "_haltHandlersOnError": false,
    "_events": {
        "validate": [null, null, null, null],
        "afterBuild": [null, null, null],
        "restart": [null],
        "sign": [null],
        "validateResponse": [null],
        "send": [null],
        "httpHeaders": [null],
        "httpData": [null],
        "httpDone": [null],
        "retry": [null, null, null, null, null, null],
        "afterRetry": [null],
        "build": [null],
        "extractData": [null, null],
        "extractError": [null, null],
        "httpError": [null]
    }
}

我希望在Kinesis流中出现一条消息并触发由流触发的lambda,但这种情况从未发生过。即使在Web控制台中,Kinesis流也不会在监视选项卡上显示任何活动。我能做错什么?

aws-sdk amazon-kinesis
1个回答
0
投票

根据documentation,返回值是Request对象。

putRecord(params = {},callback)⇒AWS.Request

你需要在这个promise()对象上调用Request,以便得到你可以接受await的承诺。

const res = await this.awsKinesis.putRecord(params).promise();
© www.soinside.com 2019 - 2024. All rights reserved.