使用 Quarkus 和 Reactor 重写异步过滤器中的数据流

问题描述 投票:0回答:1

我有一个来自侧面服务的网络钩子,我尝试在控制器前面创建一个过滤器,以检查签名。就像 Spring Security 风格,但不幸的是,它是带有 Reactor 的 Quarkus。 在控制器中,我有

null
,调试器甚至没有实现控制器方法。

我的目标是:

  1. 检查签名我需要它的标头和正文,并且
    WebhookHandlerUtility
    不是 mt 类,我无法更改它。还有
    WebhookHandlerUtility.verifySignature(signature, body)
    同步方法,我无法更改它。
  2. 我需要重写流,因为我希望在控制器中接收这些数据作为主体。

我的实施有什么问题以及如何解决?

@Provider
class SignatureFilter(
        private val objectMapper: ObjectMapper,
        private val webhookHandlerUtility: WebhookHandlerUtility
) : ContainerRequestFilter {

    private val executor = Executors.newFixedThreadPool(5)

    @Throws(IOException::class)
    override fun filter(requestContext: ContainerRequestContext) {
        executor.execute {
            val signature = requestContext.getHeaderString("signature")
            val body = requestContext.entityStream.reader().readText()

            if (!webhookHandlerUtility.verifySignature(signature, body)) {
                throw TerraSignatureVerificationException()
            }

            val payload = webhookHandlerUtility.parseWebhookPayload(body)
                    ?: throw TerraWebHookPayloadParseException()

            val json = objectMapper.writeValueAsString(payload.raw)
            val newInputStream = ByteArrayInputStream(json.toByteArray())

            requestContext.entityStream = newInputStream
        }
    }
}

@ApplicationScoped
@Path("/webHook")
class WebHookController(
        private val omronService: OmronService
) {

    @POST
    @Path("/device")//, 
    @Consumes(MediaType.APPLICATION_JSON)
    fun omronWebHook(req: HookDto): Uni<Response> {
        log.info("Received data: $req")
        return omronService.save(req)
                .map { Response.noContent().build() }
    }
}
2023-09-29 14:30:46,346 DEBUG [org.jbo.res.rea.ser.han.ParameterHandler] (vert.x-eventloop-thread-0) Error occurred during parameter extraction: 
javax.enterprise.inject.UnsatisfiedResolutionException: No bean found for required type [interface javax.ws.rs.container.ContainerRequestContext] and qualifiers [[]]
    at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:190)
    at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:211)
    at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:97)
    at org.jboss.resteasy.reactive.server.core.parameters.ContextParamExtractor.extractParameter(ContextParamExtractor.java:100)
    at org.jboss.resteasy.reactive.server.handlers.ParameterHandler.handle(ParameterHandler.java:45)
    at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:111)
    at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:142)
    at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:51)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:18)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:8)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:84)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:71)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:430)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:408)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:196)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:185)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

2023-09-29 14:30:46,349 DEBUG [org.jbo.res.rea.com.cor.AbstractResteasyReactiveContext] (vert.x-eventloop-thread-0) Restarting handler chain for exception exception: 
javax.ws.rs.WebApplicationException: HTTP 400 Bad Request
    at org.jboss.resteasy.reactive.server.handlers.ParameterHandler.handle(ParameterHandler.java:66)
    at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:111)
    at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:142)
    at org.jboss.resteasy.reactive.server.handlers.RestInitialHandler.beginProcessing(RestInitialHandler.java:51)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:18)
    at org.jboss.resteasy.reactive.server.vertx.ResteasyReactiveVertxHandler.handle(ResteasyReactiveVertxHandler.java:8)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:84)
    at io.quarkus.vertx.http.runtime.StaticResourcesRecorder$2.handle(StaticResourcesRecorder.java:71)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:430)
    at io.quarkus.vertx.http.runtime.VertxHttpRecorder$6.handle(VertxHttpRecorder.java:408)
    at io.vertx.ext.web.impl.RouteState.handleContext(RouteState.java:1284)
    at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:173)
    at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:140)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:196)
    at io.quarkus.vertx.http.runtime.devmode.VertxHttpHotReplacementSetup$5.handle(VertxHttpHotReplacementSetup.java:185)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: No bean found for required type [interface javax.ws.rs.container.ContainerRequestContext] and qualifiers [[]]
    at io.quarkus.arc.impl.InstanceImpl.bean(InstanceImpl.java:190)
    at io.quarkus.arc.impl.InstanceImpl.getInternal(InstanceImpl.java:211)
    at io.quarkus.arc.impl.InstanceImpl.get(InstanceImpl.java:97)
    at org.jboss.resteasy.reactive.server.core.parameters.ContextParamExtractor.extractParameter(ContextParamExtractor.java:100)
    at org.jboss.resteasy.reactive.server.handlers.ParameterHandler.handle(ParameterHandler.java:45)
    ... 30 more

2023-09-29 14:30:46,358 INFO  [http-problem] (vert.x-eventloop-thread-0) status=400, title="Bad Request", detail="HTTP 400 Bad Request" 
Exception in thread "pool-6-thread-1" java.lang.IllegalStateException: Cannot be called from response filter
    at org.jboss.resteasy.reactive.server.jaxrs.ContainerRequestContextImpl.assertNotResponse(ContainerRequestContextImpl.java:94)
    at org.jboss.resteasy.reactive.server.jaxrs.ContainerRequestContextImpl.setEntityStream(ContainerRequestContextImpl.java:158)
    at com.dfskjhzxk.terra.configuration.SignatureFilter.filter$lambda$0(SignatureFilter.kt:41)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

kotlin quarkus project-reactor
1个回答
0
投票

问题是您正在尝试从不受 Quarkus 管理的线程访问请求范围的 bean(因为您正在使用自己的执行器)。

一个简单的补救措施是在

ContainerRequestContext
方法本身中从
filter
访问所需的任何内容,而不是在用作线程
Runnable
的 lambda 中访问前者

© www.soinside.com 2019 - 2024. All rights reserved.