上证所Spring Cloud Stream供应商

问题描述 投票:0回答:1

我正在尝试使用以下代码创建 SSE Spring Cloud Stream 供应商:

    @Bean
    fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
        return Supplier {
            Flux.from(sseSupplierPub)
        }
    }

    @Bean
    fun sseSupplierPub(
        sseSupplierProperties: sseSupplierProperties
    ): Publisher<Message<Any>> {
        return IntegrationFlow.from(channelsConfiguration.trigger())
            .enrichHeaders {
                it.header(CONTENT_TYPE, "application/json")
                it.header(ACCEPT, "text/event-stream")
            }
            .handle(sseMessageHandlerSpec())
            .log()
            .toReactivePublisher(true)
    }

    fun webClient(): WebClient {
        val client = HttpClient.create(provider)
            .keepAlive(true)

        val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
            ClientRegistration
                .withRegistrationId("clientId")
                .tokenUri("https://openid-connect/token")
                .clientId("clientId")
                .clientSecret("clientSecret")
                .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
                .build())
        val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
        val authorizedClientManager =
            AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
        val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
        oauthFilter.setDefaultClientRegistrationId("clientId")

        return WebClient.builder()
            .filter(oauthFilter)
            .clientConnector(ReactorClientHttpConnector(client))
            .exchangeStrategies(strategies)
            .build()
    }

    fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
        return WebFlux.outboundGateway(
            "https://api/src", webClient())
            .httpMethod(HttpMethod.POST)
            .replyPayloadToFlux(true)
            .expectedResponseType(typeReference<ServerSentEvent<String>>())
    }

    @Bean
    fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
        return CommandLineRunner { _ ->
            val message = MessageBuilder.withPayload("{}").build()
            logger.info { "Trigger SSE supplier" }
            channelsConfiguration.trigger().send(message)
        }
    }

我能够获取授权令牌并成功连接到

https://api/src
,但除了此消息之外我无法获得任何响应

{
    "scanAvailable": true,
    "prefetch": -1
}

当我连接时。收到上述消息后,我将不会再收到任何消息。可能是什么问题?

spring-integration server-sent-events spring-cloud-stream-binder-kafka
1个回答
0
投票

我设法接收数据,但我需要订阅响应中的

body
(请参阅下面更新的代码)。我现在面临的问题是如何将DataBuffer映射到ServerSentEvent。

    @Bean
    fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
        return Supplier {
            Flux.from(sseSupplierPub)
        }
    }

    @Bean
    fun sseSupplierPub(
        sseSupplierProperties: sseSupplierProperties
    ): Publisher<Message<Any>> {
        return IntegrationFlow.from(channelsConfiguration.trigger())
            .enrichHeaders {
                it.header(CONTENT_TYPE, "application/json")
                it.header(ACCEPT, "text/event-stream")
            }
            .handle(sseMessageHandlerSpec())
            .channel(channelsConfiguration.sseMessage())
            .log()
            .toReactivePublisher(true)
    }

    fun webClient(): WebClient {
        val client = HttpClient.create(provider)
            .keepAlive(true)

        val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
            ClientRegistration
                .withRegistrationId("clientId")
                .tokenUri("https://openid-connect/token")
                .clientId("clientId")
                .clientSecret("clientSecret")
                .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
                .build())
        val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
        val authorizedClientManager =
            AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
        val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
        oauthFilter.setDefaultClientRegistrationId("clientId")

        return WebClient.builder()
            .filter(oauthFilter)
            .clientConnector(ReactorClientHttpConnector(client))
            .exchangeStrategies(strategies)
            .build()
    }

    fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
        return WebFlux.outboundGateway(
            "https://api/src", webClient())
            .httpMethod(HttpMethod.POST)
            .bodyExtractor { inputMessage, _ ->
                inputMessage.body.subscribe {
                    val bytes = ByteArray(it.readableByteCount())
                    it.read(bytes)
                    DataBufferUtils.release(it)
                    val output = String(bytes)

                    val message = MessageBuilder.withPayload(output).build()
                    logger.debug { "SSE message payload => ${message.payload}" }
                    channelsConfiguration.sseMessage().send(message)
                }
            }
    }

    @Bean
    fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
        return CommandLineRunner { _ ->
            val message = MessageBuilder.withPayload("{}").build()
            logger.info { "Trigger SSE supplier" }
            channelsConfiguration.trigger().send(message)
        }
    }
© www.soinside.com 2019 - 2024. All rights reserved.