使用Javascript SDK时如何在Amazon Kinesis使用者中获取最新记录?

问题描述 投票:0回答:1

我正在使用AWS Javascript SDK从Kinesis Data Stream消费。我想获取碎片中的最新记录。

[当我将ShardIterator类型设置为“ LATEST”时,我没有得到任何记录。但是,当我使用“ TRIM_HORIZON”时,我会取回所有记录。

kinesis.describeStream(describeParams, function(err, data) {
            if (err) {
                console.log(err, err.stack);    // an error occurred
            }
            else {
                var getParams = {
                    ShardId: data.StreamDescription.Shards[0].ShardId,
                    ShardIteratorType: "TRIM_HORIZON",     //get oldest package
                    StreamName: streamName,
                };

                if(shardIteratorType){
                    console.log("you have passed some shardIteratorType:" + shardIteratorType);
                    getParams.ShardIteratorType = shardIteratorType;
                }

                kinesis.getShardIterator(getParams, function(err, result) {
                    if (err) {
                        console.log("Error in getShardIterator()");
                        console.log(err);
                    } else {
                        console.log("calling getRecord with shard iterator");
                        // Get records from the Kinesis stream
                        getRecord(result.ShardIterator);
                    }
                });
            }
        });

function getRecord(shard_iterator) {
    console.log("getRecord was called.");
    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {

            if(result.Records.length > 0) {
                    // Loop through all the packages
               for(var i = 0; i < result.Records.length; i++) {
                   if(result.Records[i] != undefined) {
                        var getData = JSON.parse( decodeURIComponent
(escape(result.Records[i].Data)));
                        console.log(getData);
                        var table = document.getElementById("myTable");
                        var row = table.insertRow(0);
                        var j = i + 1 ;
                        var cell1 = row.insertCell(0);


                        cell1.innerHTML = getData ;

                    }
                }
            }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }
        }
    });
}

对于分片迭代器类型,使用“ LATEST”而不是“ TRIM_HORIZON”时,我希望只获得最新记录,而不是所有历史记录。我在做什么错?

javascript amazon-kinesis
1个回答
0
投票

来自文档https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

在请求中,您可以指定分片迭代器类型AT_TIMESTAMP以从任意时间点读取记录,TRIM_HORIZON导致ShardIterator指向系统中分片中最后未整理的记录(分片中最旧的数据记录)或LATEST,以便您始终读取分片中的最新数据。

这里最新表示“现在”,跳过最近检查点到现在之间的所有记录。

使用LATEST作为分片迭代器类型,您可以认为它读取了使用getShardIterator函数返回的分片迭代器之后的记录。

您可以在下面作为示例与'LATEST'一起使用:

kinesis.describeStream(describeParams, (err, streamData) => {
    // Skipping error handling

    kinesis.getShardIterator(getShardIteratorParams, (err, shardIterData) => {
        let shardIterator = shardIterData.ShardIterator;

        // Keep reading records from the stream
        while (true) {
            let getRecParams = {
                ShardIterator: shardIterator
            };
            kinesis.getRecords(getRecParams, (err, recData) => {
                // Skipping error handling

                if (recData.Records.length > 0) {
                   // Do something

                   shardIterator = recData.NextShardIterator;
                }
            });
            // Break if you need
        }
    });
});
© www.soinside.com 2019 - 2024. All rights reserved.