将 Hazelcast IMap 与数据库 postgres 同步

问题描述 投票:0回答:1

我正在使用 hazelcast jet 来捕获数据库更改。我有一个管道可以捕获数据库更改并更新 IMap:

    Pipeline pipeline(StreamSource<ChangeRecord> theDataBase, IMap<Long, String> myMap) {
        var pipeline = Pipeline.create();
        pipeline.readFrom(theDataBase)
                .withoutTimestamps()
                .writeTo(CdcSinks.map(myMap, 
                        record -> Long.parseLong(Objects.requireNonNull(record.key()).toMap().get("id").toString()), 
                        record -> record.value().toJson()));
        return pipeline;
    }

我想做反向同步,即当地图发生变化时发出请求更新数据库中的记录。

我通过添加第二个管道尝试了这一点:

    Pipeline pipelineReversed(IMap<Long, String> myMap) {
        Pipeline p = Pipeline.create();
        p.readFrom(Sources.mapJournal(myMap, START_FROM_CURRENT))
                .withoutTimestamps()
                .writeTo(Sinks.jdbc("%some update query%", () -> {
                    BaseDataSource dataSource = new PGXADataSource();
                    dataSource.setUrl("jdbc:postgresql://localhost:5432/my_db");
                    dataSource.setUser("postgres");
                    dataSource.setPassword("postgres");
                    dataSource.setDatabaseName("my_db");
                    return dataSource;
                }, (stmt, record) -> {
                    // fill query params and execute                         
                }));
        return p;
    }

但是我得到了递归,地图和数据库不断地互相更新。我可以用这种方法避免递归吗?或者也许还有其他工具和最佳实践?

postgresql hazelcast cdc hazelcast-jet
1个回答
0
投票

您应该使用

MapStoreAdapter
,我怀疑您是否需要使用 Jet 来获取数据库更改,但话又说回来,我不知道您系统的架构。我会在这里阅读更多内容:

https://docs.hazelcast.com/hazelcast/5.3/mapstore/working-with-external-data

© www.soinside.com 2019 - 2024. All rights reserved.