如何在 Quarkus/Kotlin 中使用 JDBC 以事务方式写入 Kafka 记录并持久化它

问题描述 投票:0回答:1

我正在使用 Kotlin 编写一个反应式 Quarkus 应用程序。这是我的 REST 资源,它接收请求并尝试从中创建

Operation

@ApplicationScoped
@Path("/v1/operation")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class OperationResource(
    val service: OperationService
) {

    @POST
    @Transactional
    suspend fun create(request: CreateOperationRequest, @Context uri: UriInfo): RestResponse<Void> {
        val id = service.create(request)
        return RestResponse.created(uri.absolutePathBuilder.path(id.toString()).build())
    }
}

这是处理资源的业务逻辑、持久化实体并发出 Kafka 消息的服务。

@ApplicationScoped
class OperationService(
    @Channel("operation-out")
    val emitter: Emitter<Operation>,
    val repository: OperationRepository
) {

    suspend fun create(request: CreateOperationRequest): UUID {
        val operation = Operation(UUID.randomUUID(), request)
        repository.persist(operation)
        emitter.send(operation)
        return operation.id
    }
}

合并

@Transactional
注释时遇到的异常是:

io.quarkus.runtime.BlockingOperationNotAllowedException: Cannot start a JTA transaction from the IO thread
.

我已经尝试了

@Transactional
@Blocking
注释的组合,将它们合并在资源层和服务层中,但无法创建事务流,当且仅当实体已成功持久化,反之亦然。

kotlin quarkus jta quarkus-reactive quarkus-kafka
1个回答
0
投票

@Transactional
仅适用于数据库操作,它不是 XA 事务。

要解决这个问题,您有几个选择:

  • 首先保存实体并使用 Debezium/Kafka-Connect 将实体流式传输到 Kafka 主题(也称为发件箱模式),
  • 首先发送消息,可能在消息头中使用
    isPersisted=false
    (发送到同一主题或新主题),并使用第二个消费者来消费并保留该主题中的实体,
  • 不是真正的解决方案,而是一种解决方法:在当前方法上使用@Fallback,它仅在发送消息失败的情况下进行回退。在后备方法中,将实体保存到新的(重试)表中,并添加定期扫描该表以重新发送这些实体的
    @Scheduler
    。或者创建一个可以手动触发重新发送的端点。
© www.soinside.com 2019 - 2024. All rights reserved.