spring-integration 相关问题

有关Spring Integration项目的问题,请使用此标记。它不适用于将其他Spring项目与其他技术集成的一般问题。

上证所Spring Cloud Stream供应商

我正在尝试使用以下代码创建一个SSE Spring Cloud Stream供应商: @豆 fun sseSupplier(sseSupplierPub:发布者>):供应商 我正在尝试使用以下代码创建 SSE Spring Cloud Stream 供应商: @Bean fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> { return Supplier { Flux.from(sseSupplierPub) } } @Bean fun sseSupplierPub( sseSupplierProperties: sseSupplierProperties ): Publisher<Message<Any>> { return IntegrationFlow.from(channelsConfiguration.trigger()) .enrichHeaders { it.header(CONTENT_TYPE, "application/json") it.header(ACCEPT, "text/event-stream") } .handle(sseMessageHandlerSpec()) .log() .toReactivePublisher(true) } fun webClient(): WebClient { val client = HttpClient.create(provider) .keepAlive(true) val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository( ClientRegistration .withRegistrationId("clientId") .tokenUri("https://openid-connect/token") .clientId("clientId") .clientSecret("clientSecret") .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS) .build()) val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo) val authorizedClientManager = AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService) val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager) oauthFilter.setDefaultClientRegistrationId("clientId") return WebClient.builder() .filter(oauthFilter) .clientConnector(ReactorClientHttpConnector(client)) .exchangeStrategies(strategies) .build() } fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec { return WebFlux.outboundGateway( "https://api/src", webClient()) .httpMethod(HttpMethod.POST) .replyPayloadToFlux(true) .expectedResponseType(typeReference<ServerSentEvent<String>>()) } @Bean fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner { return CommandLineRunner { _ -> val message = MessageBuilder.withPayload("{}").build() logger.info { "Trigger SSE supplier" } channelsConfiguration.trigger().send(message) } } 我能够获取授权令牌并成功连接到https://api/src,但除了此消息之外我无法获得任何响应 { "scanAvailable": true, "prefetch": -1 } 当我连接时。收到上述消息后,我将不会再收到任何消息。可能是什么问题? 我设法接收数据,但我需要订阅响应中的body(请参阅下面更新的代码)。我现在面临的问题是如何将DataBuffer映射到ServerSentEvent。 @Bean fun sseSupplier(sseSupplierPub: Publisher<Message<Any>>): Supplier<Flux<Message<Any>>> { return Supplier { Flux.from(sseSupplierPub) } } @Bean fun sseSupplierPub( sseSupplierProperties: sseSupplierProperties ): Publisher<Message<Any>> { return IntegrationFlow.from(channelsConfiguration.trigger()) .enrichHeaders { it.header(CONTENT_TYPE, "application/json") it.header(ACCEPT, "text/event-stream") } .handle(sseMessageHandlerSpec()) .channel(channelsConfiguration.sseMessage()) .log() .toReactivePublisher(true) } fun webClient(): WebClient { val client = HttpClient.create(provider) .keepAlive(true) val clientRegistryRepo = InMemoryReactiveClientRegistrationRepository( ClientRegistration .withRegistrationId("clientId") .tokenUri("https://openid-connect/token") .clientId("clientId") .clientSecret("clientSecret") .authorizationGrantType(AuthorizationGrantType.CLIENT_CREDENTIALS) .build()) val clientService = InMemoryReactiveOAuth2AuthorizedClientService(clientRegistryRepo) val authorizedClientManager = AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(clientRegistryRepo, clientService) val oauthFilter = ServerOAuth2AuthorizedClientExchangeFilterFunction(authorizedClientManager) oauthFilter.setDefaultClientRegistrationId("clientId") return WebClient.builder() .filter(oauthFilter) .clientConnector(ReactorClientHttpConnector(client)) .exchangeStrategies(strategies) .build() } fun sseMessageHandlerSpec(): WebFluxMessageHandlerSpec { return WebFlux.outboundGateway( "https://api/src", webClient()) .httpMethod(HttpMethod.POST) .bodyExtractor { inputMessage, _ -> inputMessage.body.subscribe { val bytes = ByteArray(it.readableByteCount()) it.read(bytes) DataBufferUtils.release(it) val output = String(bytes) val message = MessageBuilder.withPayload(output).build() logger.debug { "SSE message payload => ${message.payload}" } channelsConfiguration.sseMessage().send(message) } } } @Bean fun commandLineRunner(ctx: ApplicationContext?): CommandLineRunner { return CommandLineRunner { _ -> val message = MessageBuilder.withPayload("{}").build() logger.info { "Trigger SSE supplier" } channelsConfiguration.trigger().send(message) } }

回答 1 投票 0

使用 Spring 集成的发件箱模式

我们想通过使用 Spring 集成来实现 Outbox 模式。 从这个例子开始,我们想出了这个更简单的解决方案: 受保护的 IntegrationFlowDefinition buildFlow() {

回答 2 投票 0

通过 Spring Integration Sftp 跟踪多个文件夹以下载文件

我尝试更改应用程序以跟踪多个文件夹并从 Sftp 获取文件。 它构建在最新的 Spring 堆栈、Spring Boot 3.2.0、Spring Integration 6.2.0 等之上。 关注官方

回答 1 投票 0

实现用于从文件服务器归档文件的 AWS lambda 应用程序的方法

我想寻求有关如何实施AWS Lambda应用程序(在我们团队的AWS账户中)来执行文件归档任务的建议。 客观的 要存档 Windows 服务器上托管的文件...

回答 1 投票 0

Spring 集成资源

有哪些好的资源可以帮助您开始 Spring 集成。 我找到的都是 2012 年的书籍,并且 Spring 文档没有为初学者提供足够的解释。

回答 1 投票 0

Spring Integration SFTP fileExistsMode进行mv操作

为什么mv操作时不能使用FileExistsMode? FAIL 是 mv 操作的默认值吗? AbstractRemoteFileOutboundGateway.Command.MV .fileExistsMode(FileExistsMode.REPLACE) 为什么不...

回答 1 投票 0

如何使用spring框架通过TCP实现自定义应用程序协议?

我已经尝试弄清楚在 Spring 框架内通过 TCP 实现自定义应用程序协议的正确方法是什么。你们能就这个话题跟我谈谈吗? 我...

回答 1 投票 0

在 SpringBootTest 带注释的测试中运行代码时,Spring 集成流程不会启动

我有一个集成流程,由消息网关接口调用启动 @MessagingGateway(defaultRequestChannel = "aChn") 接口A { 有趣的民意调查(c:C) } // 踢代码...

回答 1 投票 0

在Java应用程序中使用Databricks数据库数据的最佳方式是什么?

我需要检索存储在Databricks平台中的数据。我可以看到它可以使用 Databricks-SDK 以及 Databricks API 路线来实现,但没有在任何地方看到获得...

回答 1 投票 0

Spring Integration - 处理程序与拦截器

在 Spring Integration 中,处理程序和拦截器看起来基本上实现了相同的目标。甚至还有一些“受骗”的实现,例如 MessageTransformingChannelInterceptor 和

回答 2 投票 0

Spring Integration sftp 新文件不会轮询,直到消息处理程序遍历本地缓存的所有文件列表

@Bean 公共 SessionFactory sftpSessionFactory() { DefaultSftpSessionFactory 工厂 = new DefaultSftpSessionFactory(true); 工厂.setHost(sftpHost);...

回答 1 投票 0

使用 SFTP 出站通道适配器通过 SFTP 将数据流式传输到远程文件服务器

在我的 Spring Boot 应用程序中,我必须从数据库读取大量数据,将其转换为 CSV 文件并将其上传到 SFTP 服务器。由于文件可能很大,我无法读取内存中的整个文件,然后你...

回答 1 投票 0

Spring 集成:测试轮询器依赖逻辑

我想知道如何编写 Spring 测试来断言由“SourcePollingChannelAdapter”触发的逻辑链。 我想到了什么: 使用 Thread.sleep() 这对于测试来说确实是个坏主意 有

回答 1 投票 0

使用 MappingJackson2MessageConverter 处理已删除的 MQTT 主题

在自定义的IntegrationFlow中,我使用MappingJackson2MessageConverter进行订阅,以从JSON生成相应的POJO。到目前为止,这工作得很好。 然而,一些主题被保留...

回答 1 投票 0

消息处理程序中的 SpEL 导致 CPU 使用率较高

在应用程序上运行高负载时,我们会遇到极高的 CPU(15 个内核,100%)。配置文件显示 SpEL 使用了 20% 以上,其中主要由 ServiceActivatingHandler 使用。

回答 1 投票 0

Spring Integration 重试建议配置

我想配置重试建议。 这行得通吗? IntegrationFlows.from("inputChannel") .transform(theTranformer , "theTransfomerMethod" , e -> e.handleMessageAdvice(new

回答 1 投票 0

在FTP出站网关中,如何在“mv”命令中获取原始文件名?

首先,我列出了一个工作正常的目录中的文件。列出后,我想将文件移动到另一个也可以工作的目录。然而,保留原汁原味是行不通的...

回答 1 投票 0

同一个 StompSessionManager 实例可以用于不同的目的地吗?

我有一个带有 StompInboundChannelAdapter 的 Spring 集成流程,用于侦听来自一个目标的传入消息,还有另一个带有 StompMessageHandler 的集成流程,用于将消息发送到

回答 1 投票 0

spring-Integration - http 入站网关响应和异步处理 - 任务执行器线程仅处理一条消息而不是所有消息

说明: 我们有一个 Spring 集成流程,我们想要配置一个端点,分割负载,返回给消费者,即使用 http 代码 202,其他项目必须继续...

回答 1 投票 0

将 Spring Integration 5.1.3.RELEASE 更新到 5.5.18 时,“标头值路由器”的“默认输出通道”出现“组件需要 bean”错误

我正在将 Spring Boot 从 2.1 更新到 2.5(最终更新到 3.2),但遇到了 Spring Integration 的问题,我无法在 Spring Integration 中找到解决方案

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.