我有一个使用 webflux 和 r2dbc-postgres 的 spring-boot 应用程序。当我尝试在
flatMap()
中执行一些数据库操作时,发现了一个奇怪的问题。
代码示例:
@Transactional
public Mono<Void> insertDummyFooBars() {
return Flux.fromIterable(IntStream.rangeClosed(1, 260).boxed().collect(Collectors.toList()))
.log()
.flatMap(i -> this.repository.save(FooBar.builder().foo("test-" + i).build()))
.log()
.concatMap(i -> this.repository.findAll())
.then();
}
看起来
flatMap
可以批量处理max个256
个元素。 (Queues.SMALL_BUFFER_SIZE
默认值为256
)。因此,当我尝试运行上面的代码(包含 260 个元素)时,出现异常 - TransientDataAccessResourceException
和以下消息:
Cannot exchange messages because the request queue limit is exceeded; nested exception is io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException
此异常之后没有
Releasing R2DBC Connection
。 pgdb 连接/会话保持在 idle in transaction
状态,当达到池最大大小并且所有连接都处于 idle in transaction
状态时,应用程序无法正常运行。我认为无论是否发生异常,连接都应该被释放。
如果我使用
concatMap
而不是 flatMap
,它会按预期工作 - 没有例外,连接已释放!当元素小于或等于256时,使用flatMap也可以。
是否可以强制关闭 pgdb 连接?如果我像这样在
flatMap
中有很多数据库操作,我该怎么办?我应该将它们全部替换为concatMap
吗?有没有一个全球性的解决方案?
版本: Postgres:12.6,Spring-boot:2.7.6
日志:
2022-12-08 16:32:13.092 INFO 17932 --- [actor-tcp-nio-1] reactor.Flux.Iterable.1 : | onNext(256)
2022-12-08 16:32:13.092 DEBUG 17932 --- [actor-tcp-nio-1] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [INSERT INTO foo_bar (foo) VALUES ($1)]
2022-12-08 16:32:13.114 INFO 17932 --- [actor-tcp-nio-1] reactor.Flux.FlatMap.2 : onNext(FooBar(id=258, foo=test-1))
2022-12-08 16:32:13.143 DEBUG 17932 --- [actor-tcp-nio-1] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT foo_bar.* FROM foo_bar]
2022-12-08 16:32:13.143 INFO 17932 --- [actor-tcp-nio-1] reactor.Flux.Iterable.1 : | request(1)
2022-12-08 16:32:13.143 INFO 17932 --- [actor-tcp-nio-1] reactor.Flux.Iterable.1 : | onNext(257)
2022-12-08 16:32:13.144 DEBUG 17932 --- [actor-tcp-nio-1] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [INSERT INTO foo_bar (foo) VALUES ($1)]
2022-12-08 16:32:13.149 INFO 17932 --- [actor-tcp-nio-1] reactor.Flux.Iterable.1 : | onComplete()
2022-12-08 16:32:13.149 INFO 17932 --- [actor-tcp-nio-1] reactor.Flux.Iterable.1 : | cancel()
2022-12-08 16:32:13.160 ERROR 17932 --- [actor-tcp-nio-1] reactor.Flux.FlatMap.2 : onError(org.springframework.dao.TransientDataAccessResourceException: executeMany; SQL [INSERT INTO foo_bar (foo) VALUES ($1)]; Cannot exchange messages because the request queue limit is exceeded; nested exception is io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: [08006] Cannot exchange messages because the request queue limit is exceeded)
2022-12-08 16:32:13.167 ERROR 17932 --- [actor-tcp-nio-1] reactor.Flux.FlatMap.2 :
org.springframework.dao.TransientDataAccessResourceException: executeMany; SQL [INSERT INTO foo_bar (foo) VALUES ($1)]; Cannot exchange messages because the request queue limit is exceeded; nested exception is io.r2dbc.postgresql.client.ReactorNettyClient$RequestQueueException: [08006] Cannot exchange messages because the request queue limit is exceeded
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:215) ~[spring-r2dbc-5.3.24.jar:5.3.24]
at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:147) ~[spring-r2dbc-5.3.24.jar:5.3.24]
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:7105) ~[reactor-core-3.4.25.jar:3.4.25]
at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:7158) ~[reactor-core-3.4.25.jar:3.4.25]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.25.jar:3.4.25]
我尝试更改 Queues.SMALL_BUFFER_SIZE,还尝试向平面图添加并发值。当我将值减小到
255
时它起作用,但我认为这不是一个好的解决方案。
我也面临着同样的例外。 您找到解决方案了吗? 如果是这样,你能分享一下吗? 非常感谢。