为什么我在向 Kinesis 写入大量记录时会收到 ProvisionedThroughputExceededException?

问题描述 投票:0回答:1

我同时使用 Lambda 和 Kinesis 来验证和插入批量记录。我是 Kinesis 新手,所以我什至不确定我在寻找什么,或者我是否配置错误。我很难理解 Kinesis 的简单用例的复杂性。

我将一个大文件拆分为几个较小的文件 - 例如,将一个文件中的 275,000 行拆分为 275 个 1000 行文件。文件分解后,文件会在大约 30 秒内上传到 S3,这会触发生产者 lambda,然后写入 Kinesis,后者会触发消费者 lambda,以 10 为一批将记录写入 Postgres。

当我测试 100、1000 甚至 20000 条记录时,一切都很好。当我尝试运行所有 275,000 条记录时,我开始看到上传较大数据集时出现以下类型的错误:

ProvisionedThroughputExceededException:超出分片的速率

完成后,我最终得到了大约 130,000 条记录。

上述每个文件均小于 1MB,包含 1000 条 JSON 记录。每个 JSON 记录有 40 个字段,并且包含的数据不多。这些并不是很大的记录。

据我所知,当消费者被触发时,对于流可以处理的内容,有太多的读取。我想我需要增强型扇出?

如果我增加事件源的批量大小,在某个地方,不知何故,错误就会发生......我猜......因为我会得到几倍于我期望的记录。

CDK 基础设施:

运动

export class StreamStack extends Stack {
    public readonly myStream: Stream;

    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props);

        this.myStream = new Stream(this, "MyStream", {
            streamName: "my-stream",
            streamMode: StreamMode.ON_DEMAND
        });
    }
}

事件来源

interface EventSourceStackProps extends StackProps {
    myStream: Stream,
    myConsumerLambda: NodejsFunction
}

export class EventSourceStack extends Stack {
    constructor(scope: Construct, id: string, props: EventSourceStackProps) {
        super(scope, id, props);

        new EventSourceMapping(this, "ConsumerFunctionEvent", {
            target: props.myConsumerLambda,
            batchSize: 10,
            startingPosition: StartingPosition.LATEST,
            eventSourceArn: props.myStream.streamArn
        });
        props.myStream.grantRead(props.myConsumerLambda);
    }
}

生产者和消费者 Lambda 函数在更大的

LambdaStack

        this.myProducerLambda = new NodejsFunction(this, "MyProducerLambda", {
            functionName: "myProducerHandler",
            runtime: Runtime.NODEJS_18_X,
            handler: "myProducerHandler",
            entry: "./lambda/myProducerHandler.ts",
            memorySize: 256,
            timeout: Duration.minutes(5)
        });
        props.myStream.grantWrite(this.myProducerLambda);

        this.myConsumerLambda = new NodejsFunction(this, "MyConsumerLambda", {
            functionName: "myConsumerHandler",
            runtime: Runtime.NODEJS_18_X,
            handler: "myConsumerHandler",
            entry: "./lambda/myConsumerHandler.ts",
            memorySize: 1024,
            timeout: Duration.minutes(15)
        });

生产者像这样发送记录:

export const putStreamRecord = async (item: StreamItem): Promise<void> => {
    try {
        const client = new KinesisClient();
        const params = {
            Data: Buffer.from(JSON.stringify(item)),
            PartitionKey: item.key,
            StreamName: process.env.STREAM_NAME,
        };
        await client.send(new PutRecordCommand(params));
    } catch (e) {
        console.error(e);
        throw e;
    }
};

consumer函数看起来像这样:

export const myConsumerHandler = async (event: KinesisStreamEvent): Promise<boolean> => {
    let batchSaveRes = false;
    try {
        //extract batch records
        const batchData = event.Records
            .map(rec => Buffer.from(rec.kinesis.data, "base64").toString())
            .map(buff => JSON.parse(buff))
            .map(item => item.payload);
        
        batchSaveRes = await consumeBatchStream(batchData);
        return batchSaveRes;
    } catch (e) {
        console.error("ERROR:", e);
        return batchSaveRes;
    }
};

那里的 consumeBatchStream 参考非常简单。它只是将数据批量插入到 Postgres 中。

export const consumeBatchStream = async (batchData: Record<string, unknown>[]): Promise<boolean> => {
    let client: Client | null = null;
    try {
        client = await getDbClient("my-secret");
        await client?.connect();
        const query = //...builds an insert query;
        await client.query(query);
        console.log("INSERTED:", batchData);
        return true;
    } catch (e) {
        console.error("Error consuming batch stream:", e)
        throw e;
    } finally {
        await client?.end();
    }
};

SQS/SNS 能否满足我的需求,且不受 Kinesis 的限制?我真的只需要一种简单的方法来通知多个消费者某些类型的新记录。我不在乎订购。

作为一名开发人员,我曾经在 Kafka 中做过此类事情,几乎没有遇到太多麻烦。记录进入,消费者拿起它们,一切都很顺利。

typescript amazon-web-services aws-lambda amazon-kinesis
1个回答
0
投票

有几件事似乎对这种情况有所帮助。

首先,将事件消费者的批量大小从 10 提高到 100,消除了消费者端的错误。然后,通过随机化生产者端

PartitionKey
上的
PutRecordCommand
,确保所有记录都通过且没有错误:

export const putStreamRecord = async (item: StreamItem): Promise<void> => {
    try {
        const client = new KinesisClient();
        const params = {
            Data: Buffer.from(JSON.stringify(item)),
            PartitionKey: uuid(),//item.key,
            StreamName: process.env.STREAM_NAME,
        };
        await client.send(new PutRecordCommand(params));
    } catch (e) {
        console.error(e);
        throw e;
    }
};

有道理...有一个“热分区”并且忽略了它。

© www.soinside.com 2019 - 2024. All rights reserved.