我们可以使用 PostgreSQL 而不是默认的 dynamo 数据库来进行检查点和锁定,以防使用 Binder 方法从 Kinesis 消费数据吗

问题描述 投票:0回答:1

我有一个 Spring Boot 应用程序,它使用 Amazon Kinesis 来使用数据并将其保存到 PostgreSQL。 由于我已经在我的应用程序中使用一个数据库(PostgreSQL),我想避免仅出于检查点和锁定目的而使用另一个数据库(Dynamo db)。从而可以降低资源成本。

期望:我想将默认的 dynamo 数据库更改为 PostgreSQL 数据库以进行检查点和锁定。

在我的项目实现中使用以下依赖项 'org.springframework.cloud:spring-cloud-stream-binder-kinesis:4.0.2'

我的application.yml文件

spring:
  cloud:
    aws:
      credentials:
        sts:
          web-identity-token-file: <Where i had given the token file path>
          role-arn: <Where i had given the assume role arn>
          role-session-name: RoleSessionName
      region:
        static: <where i had given my aws region>
      dualstack-enabled: false
    stream:
      kinesis:
        binder:
          auto-create-stream: false
          min-shard-count: 1
      bindings:
        input-in-0:
          destination: test-test.tst.v1
          content-type: text/json

下面是包含用于处理 Kinesis 数据的 bean 的 java 类

@Configuration
public class KinesisConsumerBinder{
   @Bean
   public Consumer<Message<String>> input(){
      return message ->{
        System.out.println("Data from Kinesis:"+message.getPayload());
        //Process the message got from Kinesis
      }
   }

}

我尝试了很多方法,但未能找到正确的解决方案

我也尝试过以下步骤

  1. 为表“KinesisCheckpoint”创建了一个表及其实体类,其中包含字段分片 ID、序列号和流名称

  2. 创建了 Spring Data JPA 存储库接口,用于处理检查点数据的 CRUD 操作。

  3. 更新了消费者代码如下,用于在成功处理一批记录后将检查点信息(如分片 ID、序列号和流名称)保存到上面创建的表:“KinesisCheckpoint”。

@Configuration
public class KinesisConsumerBinder{
   @Bean
   public Consumer<Message<String>> input(){
      return message ->{
        System.out.println("Data from Kinesis:"+message.getPayload());
        String strmNme = (String) message.getHeader().get("aws_receivedStream");
        String shrdId = (String) message.getHeader().get("aws_shard");
        String seqNo = (String) message.getHeader().get("aws_shard");
        String lstChckPntDta = checkPointRepo.findLastChckpoint(strmNme,shrdId);// query that written in repo for getting the last chek point info for the given stream name and shard id

        //Process the message got from Kinesis

        if(null == lstChckPntDta){
           // save the new checkpoint info(shard Id, sequence number and stream name ) to "KinesisCheckpoint" table
        }else{
           // update the checkpoint data(sequence no) for the filter Stream name and shardId
        }
      }
   }

}

作为最后一步,当应用程序启动/重新启动时,我从表“KinesisCheckpoint”中检索了保存的检查点信息,我想使用它从中断处恢复处理。

我们如何使用我在表“KinesisCheckpoint”中收集的信息来从中断处恢复处理。

上述方法不是更好的方法。是否有其他更好的方法来使用 PostgreSQL 数据库实现检查点和锁定。

有人可以帮我解决这个问题吗

java postgresql spring-boot spring-cloud-stream amazon-kinesis
1个回答
0
投票

KinesisMessageDrivenChannelAdapter
中的锁定逻辑基于
LockRegistry
。检查点是基于
ConcurrentMetadataStore
的。 是的,默认情况下它们分别是
DynamoDbLockRegistry
DynamoDbMetadataStore
。我们在文档中没有说明这一点,但是这些
LockRegistry
ConcurrentMetadataStore
的自动配置 bean 可以从最终用户配置中覆盖。这样您就可以在 PostgreSQL 中使用基于
JdbcLockRegistry
JdbcMetadataStore
DataSource

您需要

schema-postgresql.sql
jar 中的
spring-integration-jdbc
才能正确初始化数据库。这样您就不需要您现在正在讨论的任何自定义逻辑。

在文档中查看更多信息:

https://docs.spring.io/spring-integration/reference/jdbc/lock-registry.html

https://docs.spring.io/spring-integration/reference/jdbc/metadata-store.html

© www.soinside.com 2019 - 2024. All rights reserved.