我同时使用 Lambda 和 Kinesis 来验证和插入批量记录。我是 Kinesis 新手,所以我什至不确定我在寻找什么,或者我是否配置错误。我很难理解 Kinesis 的简单用例的复杂性。
我将一个大文件拆分为几个较小的文件 - 例如,将一个文件中的 275,000 行拆分为 275 个 1000 行文件。文件分解后,文件会在大约 30 秒内上传到 S3,这会触发生产者 lambda,然后写入 Kinesis,后者会触发消费者 lambda,以 10 为一批将记录写入 Postgres。
当我测试 100、1000 甚至 20000 条记录时,一切都很好。当我尝试运行所有 275,000 条记录时,我开始看到上传较大数据集时出现以下类型的错误:
ProvisionedThroughputExceededException:超出分片的速率
完成后,我最终得到了大约 130,000 条记录。
上述每个文件均小于 1MB,包含 1000 条 JSON 记录。每个 JSON 记录有 40 个字段,并且包含的数据不多。这些并不是很大的记录。
据我所知,当消费者被触发时,对于流可以处理的内容,有太多的读取。我想我需要增强型扇出?
如果我增加事件源的批量大小,在某个地方,不知何故,错误就会发生......我猜......因为我会得到几倍于我期望的记录。
CDK 基础设施:
运动
export class StreamStack extends Stack {
public readonly myStream: Stream;
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
this.myStream = new Stream(this, "MyStream", {
streamName: "my-stream",
streamMode: StreamMode.ON_DEMAND
});
}
}
事件来源
interface EventSourceStackProps extends StackProps {
myStream: Stream,
myConsumerLambda: NodejsFunction
}
export class EventSourceStack extends Stack {
constructor(scope: Construct, id: string, props: EventSourceStackProps) {
super(scope, id, props);
new EventSourceMapping(this, "ConsumerFunctionEvent", {
target: props.myConsumerLambda,
batchSize: 10,
startingPosition: StartingPosition.LATEST,
eventSourceArn: props.myStream.streamArn
});
props.myStream.grantRead(props.myConsumerLambda);
}
}
生产者和消费者 Lambda 函数在更大的
LambdaStack
this.myProducerLambda = new NodejsFunction(this, "MyProducerLambda", {
functionName: "myProducerHandler",
runtime: Runtime.NODEJS_18_X,
handler: "myProducerHandler",
entry: "./lambda/myProducerHandler.ts",
memorySize: 256,
timeout: Duration.minutes(5)
});
props.myStream.grantWrite(this.myProducerLambda);
this.myConsumerLambda = new NodejsFunction(this, "MyConsumerLambda", {
functionName: "myConsumerHandler",
runtime: Runtime.NODEJS_18_X,
handler: "myConsumerHandler",
entry: "./lambda/myConsumerHandler.ts",
memorySize: 1024,
timeout: Duration.minutes(15)
});
生产者像这样发送记录:
export const putStreamRecord = async (item: StreamItem): Promise<void> => {
try {
const client = new KinesisClient();
const params = {
Data: Buffer.from(JSON.stringify(item)),
PartitionKey: item.key,
StreamName: process.env.STREAM_NAME,
};
await client.send(new PutRecordCommand(params));
} catch (e) {
console.error(e);
throw e;
}
};
consumer函数看起来像这样:
export const myConsumerHandler = async (event: KinesisStreamEvent): Promise<boolean> => {
let batchSaveRes = false;
try {
//extract batch records
const batchData = event.Records
.map(rec => Buffer.from(rec.kinesis.data, "base64").toString())
.map(buff => JSON.parse(buff))
.map(item => item.payload);
batchSaveRes = await consumeBatchStream(batchData);
return batchSaveRes;
} catch (e) {
console.error("ERROR:", e);
return batchSaveRes;
}
};
那里的 consumeBatchStream 参考非常简单。它只是将数据批量插入到 Postgres 中。
export const consumeBatchStream = async (batchData: Record<string, unknown>[]): Promise<boolean> => {
let client: Client | null = null;
try {
client = await getDbClient("my-secret");
await client?.connect();
const query = //...builds an insert query;
await client.query(query);
console.log("INSERTED:", batchData);
return true;
} catch (e) {
console.error("Error consuming batch stream:", e)
throw e;
} finally {
await client?.end();
}
};
SQS/SNS 能否满足我的需求,且不受 Kinesis 的限制?我真的只需要一种简单的方法来通知多个消费者某些类型的新记录。我不在乎订购。
作为一名开发人员,我曾经在 Kafka 中做过此类事情,几乎没有遇到太多麻烦。记录进入,消费者拿起它们,一切都很顺利。
有几件事似乎对这种情况有所帮助。
首先,将事件消费者的批量大小从 10 提高到 100,消除了消费者端的错误。然后,通过随机化生产者端
PartitionKey
上的 PutRecordCommand
,确保所有记录都通过且没有错误:
export const putStreamRecord = async (item: StreamItem): Promise<void> => {
try {
const client = new KinesisClient();
const params = {
Data: Buffer.from(JSON.stringify(item)),
PartitionKey: uuid(),//item.key,
StreamName: process.env.STREAM_NAME,
};
await client.send(new PutRecordCommand(params));
} catch (e) {
console.error(e);
throw e;
}
};
有道理...有一个“热分区”并且忽略了它。