我正在尝试使用以下代码创建 SSE Spring Cloud Stream 供应商:
@Bean
fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
return Supplier {
Flux.from(sseSupplierPub)
}
}
@Bean
fun sseSupplierPub(
sseSupplierProperties: sseSupplierProperties
): Publisher<Message<Any>> {
return IntegrationFlow.from(channelsConfiguration.trigger())
.enrichHeaders {
it.header(CONTENT_TYPE, "application/json")
it.header(ACCEPT, "text/event-stream")
}
.handle(sseMessageHandlerSpec())
.log()
.toReactivePublisher(true)
}
fun webClient(): WebClient {
val client = HttpClient.create(provider)
.keepAlive(true)
val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
ClientRegistration
.withRegistrationId("clientId")
.tokenUri("https://openid-connect/token")
.clientId("clientId")
.clientSecret("clientSecret")
.authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
.build())
val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
val authorizedClientManager =
AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
oauthFilter.setDefaultClientRegistrationId("clientId")
return WebClient.builder()
.filter(oauthFilter)
.clientConnector(ReactorClientHttpConnector(client))
.exchangeStrategies(strategies)
.build()
}
fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
return WebFlux.outboundGateway(
"https://api/src", webClient())
.httpMethod(HttpMethod.POST)
.replyPayloadToFlux(true)
.expectedResponseType(typeReference<ServerSentEvent<String>>())
}
@Bean
fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
return CommandLineRunner { _ ->
val message = MessageBuilder.withPayload("{}").build()
logger.info { "Trigger SSE supplier" }
channelsConfiguration.trigger().send(message)
}
}
我能够获取授权令牌并成功连接到
https://api/src
,但除了此消息之外我无法获得任何响应
{
"scanAvailable": true,
"prefetch": -1
}
当我连接时。收到上述消息后,我将不会再收到任何消息。可能是什么问题?
我设法接收数据,但我需要订阅响应中的
body
(请参阅下面更新的代码)。我现在面临的问题是如何将DataBuffer映射到ServerSentEvent。
@Bean
fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> {
return Supplier {
Flux.from(sseSupplierPub)
}
}
@Bean
fun sseSupplierPub(
sseSupplierProperties: sseSupplierProperties
): Publisher<Message<Any>> {
return IntegrationFlow.from(channelsConfiguration.trigger())
.enrichHeaders {
it.header(CONTENT_TYPE, "application/json")
it.header(ACCEPT, "text/event-stream")
}
.handle(sseMessageHandlerSpec())
.channel(channelsConfiguration.sseMessage())
.log()
.toReactivePublisher(true)
}
fun webClient(): WebClient {
val client = HttpClient.create(provider)
.keepAlive(true)
val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository(
ClientRegistration
.withRegistrationId("clientId")
.tokenUri("https://openid-connect/token")
.clientId("clientId")
.clientSecret("clientSecret")
.authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS)
.build())
val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo)
val authorizedClientManager =
AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService)
val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager)
oauthFilter.setDefaultClientRegistrationId("clientId")
return WebClient.builder()
.filter(oauthFilter)
.clientConnector(ReactorClientHttpConnector(client))
.exchangeStrategies(strategies)
.build()
}
fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec {
return WebFlux.outboundGateway(
"https://api/src", webClient())
.httpMethod(HttpMethod.POST)
.bodyExtractor { inputMessage, _ ->
inputMessage.body.subscribe {
val bytes = ByteArray(it.readableByteCount())
it.read(bytes)
DataBufferUtils.release(it)
val output = String(bytes)
val message = MessageBuilder.withPayload(output).build()
logger.debug { "SSE message payload => ${message.payload}" }
channelsConfiguration.sseMessage().send(message)
}
}
}
@Bean
fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner {
return CommandLineRunner { _ ->
val message = MessageBuilder.withPayload("{}").build()
logger.info { "Trigger SSE supplier" }
channelsConfiguration.trigger().send(message)
}
}