我正在使用 Kotlin 编写一个反应式 Quarkus 应用程序。这是我的 REST 资源,它接收请求并尝试从中创建
Operation
。
@ApplicationScoped
@Path("/v1/operation")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class OperationResource(
val service: OperationService
) {
@POST
@Transactional
suspend fun create(request: CreateOperationRequest, @Context uri: UriInfo): RestResponse<Void> {
val id = service.create(request)
return RestResponse.created(uri.absolutePathBuilder.path(id.toString()).build())
}
}
这是处理资源的业务逻辑、持久化实体并发出 Kafka 消息的服务。
@ApplicationScoped
class OperationService(
@Channel("operation-out")
val emitter: Emitter<Operation>,
val repository: OperationRepository
) {
suspend fun create(request: CreateOperationRequest): UUID {
val operation = Operation(UUID.randomUUID(), request)
repository.persist(operation)
emitter.send(operation)
return operation.id
}
}
合并
@Transactional
注释时遇到的异常是:
io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread
.
我已经尝试了
@Transactional
和 @Blocking
注释的组合,将它们合并在资源层和服务层中,但无法创建事务流,当且仅当实体已成功持久化,反之亦然。
@Transactional
仅适用于数据库操作,它不是 XA 事务。
要解决这个问题,您有几个选择:
isPersisted=false
(发送到同一主题或新主题),并使用第二个消费者来消费并保留该主题中的实体,@Scheduler
。或者创建一个可以手动触发重新发送的端点。