spring-integration 相关问题

有关Spring Integration项目的问题,请使用此标记。它不适用于将其他Spring项目与其他技术集成的一般问题。

将 JdbcMessageStore 添加到 Spring Kafka 聚合器

如何将 JdbcMessageStore 设置为 Aggregator,以便它使用 RDBMS 而不是内存中消息存储? 目前AggregatorAnnotationPostProcessor直接设置new SimpleMessageStore(...

回答 1 投票 0

Spring Integration Channel() 将消息分发给意外的订阅者

目前我面临 Spring IntegrationFlows 和 channel() 方法的一些问题。 我不确定我错过了什么,或者框架中是否存在错误。 这是流程: @

回答 1 投票 0

Spring Retry 不是基于排除关键字重试

我正在使用Spring Retry。早些时候,我使用了 @Retryable(value= Exception.class) & 它与 retryAdvice() 一起正常工作。出现任何类型的异常后,代码都会正确重试。最近,我们...

回答 1 投票 0

Spring 集成 2 阶段提交

下面是我们的IntegrationFlow,请建议如何为涉及的2个数据库实现两阶段提交或尽力一阶段提交。谢谢 MQ 适配器 --> Jpa.updatingGateway(db1) &am...

回答 1 投票 0

Spring Integration 服务激活器使用泛型挂起处理函数

我正在尝试使用 Kotlin 挂起函数作为我的服务激活器处理程序方法。服务激活器的设置方式如下: 界面工作流程步骤 { @ServiceActivator...

回答 1 投票 0

Spring Integration DSL 流程 - 一次处理一条消息,等待流程完成后再处理下一条消息

我有一个通用任务服务,用于接收要执行的任务。它将任务放到发布-订阅通道上,以便处理特定类型任务的特定实现......

回答 1 投票 0

原因:com.jcraft.jsch.JSchException:使用 DefaultSftpSessionFactory 和 SftpInboundFileSynchronizer 时连接被外部主机关闭

我正在使用 Spring Batch 远程分区来读取输入文件并处理它们。为了使输入文件在所有服务器上可用,我添加了步骤侦听器,用于检查是否存在并下载...

回答 1 投票 0

TCP 连接在闲置 300 秒后关闭

我已经为依赖于 TCP/IP 协议的面向流的套接字实现了 TCPClient。我使用过Spring集成。 我的期望是创建 9 个连接的池。交通将是手动的...

回答 1 投票 0

将InputStream转换为POJO

我目前正在使用 spring 集成通过 sftp 获取 .csv 文件并对其进行处理并将其发送到 kafka。我需要转换这个 .csv 文件(它作为输入流,我逐行分割

回答 1 投票 0

SFTP 集成 - SftpInboundFileSynchronizer - 如何不再次下载相同的文件

在我当前的应用程序中,使用 spring-batch 作业,我触发将 SFTP 远程文件发送到本地目录的过程,对其进行处理并在处理后删除文件。 @Bean("ftpMessageSource"...

回答 1 投票 0

Spring 将多个事务轮询器与单独的线程池集成

我正在使用 Spring Integration 来实现一个处理消息/事件的项目,具有以下要求:进入系统的入站消息不应丢失,即使在...

回答 1 投票 0

Spring Integration 6.1 重大更改:线程不再无限期阻塞

我们遇到了一些问题,即过去顺序执行的线程出现不必要的并行执行。我将范围缩小到 v6.1 中的这一重大更改 https://github.com/spring-projects/spring-

回答 2 投票 0

在 spring Integration 中使用 JdbcChannelMessageStore 时出现错误

这就是我在应用程序中实现 JdbcChannelMessageStore 的方式: 这就是我在应用程序中实现 JdbcChannelMessageStore 的方式: <int:transaction-synchronization-factory id="syncFactory"> <int:after-commit expression="@store.removeFromIdCache(headers.id)" /> <int:after-rollback expression="@store.removeFromIdCache(headers.id)"/> </int:transaction-synchronization-factory> <task:executor id="pool" pool-size="10" queue-capacity="10" rejection-policy="CALLER_RUNS" /> <int:channel id="objectToJsonChannelTMF639"> <int:queue message-store="store"/> <!--<int:interceptors> <bean class="ca.bell.bmf.customer.service.MyChannelInterceptor" /> </int:interceptors>--> </int:channel> <bean id="store" class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore"> <property name="dataSource" ref="dataSource"/> <property name="channelMessageStoreQueryProvider" ref="queryProvider"/> <property name="region" value="CONCURRENT_POLL"/> <property name="usingIdCache" value="true"/> </bean> <int:bridge input-channel="objectToJsonChannelTMF639"> <int:poller fixed-delay="500" receive-timeout="500" max-messages-per-poll="1" task-executor="pool"> <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="transactionManager" /> </int:poller> </int:bridge> 我在使用 JdbcChannelMessageStore 将数据插入数据库时遇到问题。运行代码时出现以下错误: [2023-08-10 00:08:38,909] 错误 org.springframework.integration.handler.LoggingHandler:250 - ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: 调度程序没有通道“application.jdbcMessageStoreChannel”的订阅者。 ;嵌套异常是 org.springframework.integration.MessageDispatchingException:调度程序没有订阅者,failedMessage=GenericMessage [payload=CPMPayload [cpmBody=[{"op":"replace","jsonPath":"$.resourceCharacteristic[?(@.name =='AStatus')].value","value":"返回"},{"op":"替换","jsonPath":"$.resourceCharacteristic[?(@.name=='FStatus')] .value","value":"FCredited"},{"op":"替换","jsonPath":"$.resourceCharacteristic[?(@.name=='tmpCPMData_orderId')].value","value" :“89CJ00”}],域=逻辑资源,transnSeqId = 44234,externalid = 89CJ00,specName = VideoCpe_LD,externalId = COM-BC743HE1C-1-1,sourceSystem = 3PL-Process,transnSeqId = 44234],标头= {id = bb7a6bac -7767-5467-490a-c326e780e0ab,时间戳=1691606317062}],failedMessage=GenericMessage [payload=CPMPPayload [cpmBody=[{"op":"replace","jsonPath":"$.resourceCharacteristic[?(@.name= ='AStatus')].value","value":"返回"},{"op":"替换","jsonPath":"$.resourceCharacteristic[?(@.name=='FStatus')].值","值":"FCredited"},{"op":"替换","jsonPath":"$.resourceCharacteristic[?(@.name=='tmpCPMData_orderId')].value","值": “89CJ00”}],域=逻辑资源,transnSeqId = 44234,externalid = 89CJ00,specName = VideoCpe_LD,externalId = COM-BC743HE1C-1-1,sourceSystem = 3PL-Process,transnSeqId = 44234],标头= {id = bb7a6bac- 7767-5467-490a-c326e780e0ab,时间戳=1691606317062}],标头={id=5458abcc-d97f-5b0f-9278-5525d2663fa8,时间戳=1691606318909}]原始GenericMessage [payload=CPMPayload [cpmBody= [{“操作”: "替换","jsonPath":"$.resourceCharacteristic[?(@.name=='AStatus')].value","值":"返回"},{"op":"替换","jsonPath" :"$.resourceCharacteristic[?(@.name=='FStatus')].value","value":"FCrededited"},{"op":"替换","jsonPath":"$.resourceCharacteristic[? (@.name=='tmpCPMData_orderId')].value","value":"89CJ00"}], 域=逻辑资源, transactionID=44234, externalid=89CJ00, specName=VideoCpe_LD, externalId=COM-BC743HE1C-1-1 ,sourceSystem=3PL-Process,transnSeqId=44234],标头={id=bb7a6bac-7767-5467-490a-c326e780e0ab,时间戳=1691606317062}] 我面对的是数据库。我观察到我插入数据库的数据在刷新后消失了。提交操作也没有发生,请让我知道这个问题的原因是什么以及如何修复它。 您的错误是这样的: Dispatcher has no subscribers for channel 'application.jdbcMessageStoreChannel'. 因此,您的应用程序中有一个 jdbcMessageStoreChannel,您可以在其中尝试发送消息。您没有在问题中显示该配置,但请确保您有该频道的订阅者以使其正常工作。 到目前为止您显示的配置与该错误无关。

回答 1 投票 0

Spring 集成:并行处理来自 Sftp.inboundStreamingAdapter 的消息

所以, 如果有一个 Spring Integration 5.5 / Spring Boot 2.7 流程,可以从 SFTP 源读取 XML 文件,处理并存储它们,然后删除。事情确实有效,但是是按顺序进行的,这意味着每个文件都是

回答 1 投票 0

Spring Integration 分布式锁:与 Redisson 的租用时间相比,TTL 概念上的一致性

这是以下问题的后续问题:https://github.com/spring-projects/spring-integration/issues/8687 正如上面链接中所讨论的,Spring Integration Distributed Lock 的 TTL 概念是为了......

回答 1 投票 0

如何在 Spring Integration 中使用 JAVA 配置来配置 http 入站通道适配器?

我发现了一个xml配置的入站适配器示例,我不完全理解。该配置指定REST请求设置请求方法、消费格式等。 我想从春天开始

回答 2 投票 0

Spring 集成 FileReadingMessageSource 使用 UseWatchService

我使用 FileReadingMessageSource 作为 Spring Integration ESB 中的消息源。 我特别想使用 WatchServiceDirectoryScanner。我使用基于注释的配置与 sp...

回答 2 投票 0

将@Aggregator注释与MethodInvokingMessageGroupProcessor一起使用的正确方法

创建 MethodInvokingMessageGroupProcessor(Object target) 时,使用 @Aggregator 在目标中注释所需的处理方法是否正确,否则会导致创建无用的

回答 1 投票 0

在 Spring Integration 中后台轮询 Http 请求状态

我正在尝试构建一个基于 Spring 集成的架构,其中我的系统需要通过各种协议与不同的其他系统进行通信。其中一个系统基于公开 REST 端...

回答 1 投票 0


© www.soinside.com 2019 - 2024. All rights reserved.