并行保存在 Spring R2DBC 中给出重复键错误

问题描述 投票:0回答:2

对于我在下面解释的场景,我从数据库 (MySQL) 收到重复键错误。

背景

我需要将一个实体保存到我的数据库中,但实体 ID 不是由我的系统管理/生成的,因此它来自具有 ID 的第 3 方,我需要将其与其 ID 一起保存。当我们的应用程序不管理它的 id 时,Spring Data(R2DBC) 无法知道它是否是一个新实体,因为它一直都有一个 id。根据他们的文档,有几种方法可以告诉 Spring 它是否是一个新实体,所以我选择实现一个 Persistable 实体,这样我就可以告诉 Spring 它是否是一个新实体。但是,我需要查询数据库以了解它是否存在。请注意,我使用的是 Spring 反应式,因此放置同步关键字对我来说不是解决方案...

问题来了

想象一下,几乎同时有 2 个请求。对于第二个请求,它将查询数据库,但由于第一个请求尚未保存,因此什么也得不到。它将决定创建第二个请求,但此时,第一个请求已提交给 DB,结果,第二个请求将出现重复键错误,因为我告诉 Spring 这是一个新请求。

我一直在想办法解决这个问题,但我还没有找到...如果你能在这件事上帮助我,我将不胜感激。

提前谢谢你!

java mysql spring spring-webflux spring-data-r2dbc
2个回答
0
投票

几乎所有实体系统都会遇到同样的问题。让我解释一下:假设我们有一个注册端点。两个人使用完全相同的电子邮件地址同时注册。然后两者都将被保存到数据库中,因为 R2DBC 对两个请求执行相同的流程并花费完全相同的时间。

这个问题最简单的解决方案是根本不检查电子邮件,只是接受你不会用你自己的代码来处理它。相反,您可以更改表的数据库模式。我不知道你是如何创建你的表的,但我只是使用一个带有它内部代码的 sql 文件。在我的例子中,通过制作电子邮件列

UNIQUE
,我们可以在数据库中防止这个问题,而不是使用我们自己的逻辑来防止它。我的帐户表模式文件如下所示:

create table if not exists accounts (id SERIAL NOT NULL, username VARCHAR(32), rank VARCHAR(32), email VARCHAR(32) UNIQUE, password VARCHAR(512), invitor INT, data TEXT, PRIMARY KEY (id));

您还可以查看 w3schools 文章 关于

UNIQUE
约束。


0
投票

我使用预查询并将结果保存在热门发布者处。您可能会看到“brandRepo.existsById”还有另一种方式,无需预查询,它也有效但速度较慢,因此我对其进行了评论。 flatmap 更快更异步,但在我的业务中我需要 concatMap。

    Mono<List<Brand>> existedBrands = brandRepo.findAll().collectList().cache(); // hot source
    SenderRecord<String, String, String> senderRecord = SenderRecord.create(
                new ProducerRecord<>(topic, "refresh-brand-finished", dateStr(YYYY_MM_DD_UD)), dateStr(YYYY_MM_DD_UD));
    return Mono.fromCallable(commonService::parseProduct)
                .subscribeOn(Schedulers.boundedElastic()) // sync operation
                .publishOn(Schedulers.parallel())
                .flatMapMany(Flux::fromIterable)
                .concatMap(this::fetchBrandsWithWebClient) // we need one by one concat
                .filter(Brand::isNotEmptyBrand)
                .concatMap(brand -> this.setBrandNew(brand, existedBrands))
//                .flatMap(brand -> brandRepo.existsById(brand.getId())
//                        .map(b -> b ? brand.setBrandNew(false) : brand.setBrandNew(true)))
                .collectList()
                .flatMapMany(brandRepo::saveAll)
                .thenMany(kafkaSender.send(Mono.just(senderRecord)))
                .then(Mono.just("success"));
© www.soinside.com 2019 - 2024. All rights reserved.