我正在使用AWS Javascript SDK从Kinesis Data Stream消费。我想获取碎片中的最新记录。
[当我将ShardIterator类型设置为“ LATEST”时,我没有得到任何记录。但是,当我使用“ TRIM_HORIZON”时,我会取回所有记录。
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
}
else {
var getParams = {
ShardId: data.StreamDescription.Shards[0].ShardId,
ShardIteratorType: "TRIM_HORIZON", //get oldest package
StreamName: streamName,
};
if(shardIteratorType){
console.log("you have passed some shardIteratorType:" + shardIteratorType);
getParams.ShardIteratorType = shardIteratorType;
}
kinesis.getShardIterator(getParams, function(err, result) {
if (err) {
console.log("Error in getShardIterator()");
console.log(err);
} else {
console.log("calling getRecord with shard iterator");
// Get records from the Kinesis stream
getRecord(result.ShardIterator);
}
});
}
});
function getRecord(shard_iterator) {
console.log("getRecord was called.");
var getRecParams = {
ShardIterator: shard_iterator
};
kinesis.getRecords(getRecParams, function(err, result) {
if (err) {
console.log("Error in getRecords() from the Kinesis stream.");
console.log(err);
} else {
try {
if(result.Records.length > 0) {
// Loop through all the packages
for(var i = 0; i < result.Records.length; i++) {
if(result.Records[i] != undefined) {
var getData = JSON.parse( decodeURIComponent
(escape(result.Records[i].Data)));
console.log(getData);
var table = document.getElementById("myTable");
var row = table.insertRow(0);
var j = i + 1 ;
var cell1 = row.insertCell(0);
cell1.innerHTML = getData ;
}
}
}
} catch(err) {
console.log("Error parsing the package.");
console.log(err);
}
}
});
}
对于分片迭代器类型,使用“ LATEST”而不是“ TRIM_HORIZON”时,我希望只获得最新记录,而不是所有历史记录。我在做什么错?
来自文档https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
在请求中,您可以指定分片迭代器类型AT_TIMESTAMP以从任意时间点读取记录,TRIM_HORIZON导致ShardIterator指向系统中分片中最后未整理的记录(分片中最旧的数据记录)或LATEST,以便您始终读取分片中的最新数据。
这里最新表示“现在”,跳过最近检查点到现在之间的所有记录。
使用LATEST作为分片迭代器类型,您可以认为它读取了使用getShardIterator函数返回的分片迭代器之后的记录。
您可以在下面作为示例与'LATEST'一起使用:
kinesis.describeStream(describeParams, (err, streamData) => {
// Skipping error handling
kinesis.getShardIterator(getShardIteratorParams, (err, shardIterData) => {
let shardIterator = shardIterData.ShardIterator;
// Keep reading records from the stream
while (true) {
let getRecParams = {
ShardIterator: shardIterator
};
kinesis.getRecords(getRecParams, (err, recData) => {
// Skipping error handling
if (recData.Records.length > 0) {
// Do something
shardIterator = recData.NextShardIterator;
}
});
// Break if you need
}
});
});