我正在使用 hazelcast jet 来捕获数据库更改。我有一个管道可以捕获数据库更改并更新 IMap:
Pipeline pipeline(StreamSource<ChangeRecord> theDataBase, IMap<Long, String> myMap) {
var pipeline = Pipeline.create();
pipeline.readFrom(theDataBase)
.withoutTimestamps()
.writeTo(CdcSinks.map(myMap,
record -> Long.parseLong(Objects.requireNonNull(record.key()).toMap().get("id").toString()),
record -> record.value().toJson()));
return pipeline;
}
我想做反向同步,即当地图发生变化时发出请求更新数据库中的记录。
我通过添加第二个管道尝试了这一点:
Pipeline pipelineReversed(IMap<Long, String> myMap) {
Pipeline p = Pipeline.create();
p.readFrom(Sources.mapJournal(myMap, START_FROM_CURRENT))
.withoutTimestamps()
.writeTo(Sinks.jdbc("%some update query%", () -> {
BaseDataSource dataSource = new PGXADataSource();
dataSource.setUrl("jdbc:postgresql://localhost:5432/my_db");
dataSource.setUser("postgres");
dataSource.setPassword("postgres");
dataSource.setDatabaseName("my_db");
return dataSource;
}, (stmt, record) -> {
// fill query params and execute
}));
return p;
}
但是我得到了递归,地图和数据库不断地互相更新。我可以用这种方法避免递归吗?或者也许还有其他工具和最佳实践?
MapStoreAdapter
,我怀疑您是否需要使用 Jet 来获取数据库更改,但话又说回来,我不知道您系统的架构。我会在这里阅读更多内容:
https://docs.hazelcast.com/hazelcast/5.3/mapstore/working-with-external-data