Amazon Kinesis getRecord()不匹配结果

问题描述 投票:4回答:1

我刚开始使用可使用其API的Kinesis here

我已经用它来将100条记录推送到运动

for (int j = 0; j < 100; j++) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setStreamName(myStreamName);
        putRecordRequest.setData(ByteBuffer.wrap(data.getBytes()));   
        putRecordRequest.setPartitionKey(String.format("partitionKey-%d", j));   
        PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest);
        System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey()
                + ", ShardID : " + putRecordResult.getShardId() + ", Sequence No : "+ putRecordResult.getSequenceNumber());
}

现在,我想获取已推送的记录数。为此,我正在使用这个:

Iterator<Shard> shardIterator = getTotalShardsIterator();//Implemented and giving perfectly all the shards.....

现在使用上面的迭代器,我得到的计数为:

.....
while (shardIterator.hasNext()) {
        Shard shard = shardIterator.next();
        String shardId = shard.getShardId();
        int datacount = getDataCount(shardId, myStreamName);
        totalStreamDataCount+= datacount;
        System.out.println("Data Count for Shard " + shardId + " is : " + datacount); 
}
.....

这是我的函数getDataCount(shardId,myStreamName)

 public static int getDataCount(String shardId, String streamName) {
        int dataCount = 0;
        String shardIterator;
      GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
      getShardIteratorRequest.setStreamName(streamName);
      getShardIteratorRequest.setShardId(shardId);
      getShardIteratorRequest.setShardIteratorType(ShardIteratorType.TRIM_HORIZON);


      GetShardIteratorResult getShardIteratorResult = kinesisClient.getShardIterator(getShardIteratorRequest);
      shardIterator = getShardIteratorResult.getShardIterator();
      GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
      getRecordsRequest.setShardIterator(shardIterator);
      getRecordsRequest.setLimit(1000);

      GetRecordsResult getRecordsResult = kinesisClient.getRecords(getRecordsRequest);
      List<Record> records = getRecordsResult.getRecords();
      if(!records.isEmpty() && records.size() > 0){
          dataCount = records.size();
          Iterator<Record> iterator = records.iterator();
          while(iterator.hasNext()) {
              Record record = iterator.next();
              byte[] bytes = record.getData().array();
              String recordData = new String(bytes);
              System.out.println("Shard Id. :"+shardId+"Seq. No. is : "+"  Record data :"+recordData);
          }
      }


    return dataCount;
}

但是我每次运行此代码都会给出不匹配的结果,就像有时它在91有时显示81一样>]

请对此进行说明。.:)

<<

被推送到运动的记录具有限制(数据/秒),因此如果记录超过此限制,则记录可能会失败(速度取决于使用的分片数量)。现在,您可以使用kinesis API获取失败的记录计数,并将其再次推送到kinesis。
List<CompletionStage<PutRecordsResponse>> putRecordResponseCompletionStage = <PutRecordRequest call with Stream Name , Partition key and Data>; AtomicInteger failedCount = new AtomicInteger(); AtomicInteger recordsCount = new AtomicInteger(); int loopCount = 0; for (CompletionStage<PutRecordsResponse> stage : putRecordResponseCompletionStage) { loopCount++; try { PutRecordsResponse response = stage.toCompletableFuture().get(); failedCount.addAndGet(response.failedRecordCount()); recordsCount.addAndGet(response.records().size()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
java amazon-web-services amazon-kinesis
1个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.