Spring Integration - 处理陈旧的sftp会话

问题描述 投票:0回答:1

我已经实现了以下场景:

  1. 以字节[]的形式保存消息的queueChannel
  2. MessageHandler,轮询队列通道并通过sftp上传文件
  3. Transformer,侦听errorChannel并将失败消息中提取的有效负载发送回queueChannel(被认为是处理失败消息的错误处理程序,因此不会丢失任何内容)

如果sftp服务器在线,一切都按预期工作。

如果sftp服务器关闭,那么作为转换器到达的错误消息是:

org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session

变换器无法对此做任何事情,因为有效负载的failedMessage为null并且本身会抛出异常。变压器丢失了消息。

如何配置流程以使变换器获得正确的消息以及未成功上载的文件的相应有效负载?

我的配置:

  @Bean
  public MessageChannel toSftpChannel() {
    final QueueChannel channel = new QueueChannel();
    channel.setLoggingEnabled(true);
    return new QueueChannel();
  }

  @Bean
  public MessageChannel toSplitter() {
    return new PublishSubscribeChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
  public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
      if (message.getPayload() instanceof byte[]) {
        return (String) message.getHeaders().get("name");
      } else {
        throw new IllegalArgumentException("byte[] expected in Payload!");
      }
    });
    return handler;
  }

  @Bean
  public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);

    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);

    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
      factory.setPrivateKey(sftpPrivateKey);
      factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
      factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
  }

  @Bean
  @Splitter(inputChannel = "toSplitter")
  public DmsDocumentMessageSplitter splitter() {
    final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
    splitter.setOutputChannelName("toSftpChannel");
    return splitter;
  }

  @Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
  public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

    Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
      .getFailedMessage();
    return MessageBuilder.withPayload(failedMessage)
                         .copyHeadersIfAbsent(failedMessage.getHeaders())
                         .build();
  }

  @MessagingGateway 
  public interface UploadGateway {

    @Gateway(requestChannel = "toSplitter")
    void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
  }

谢谢..

更新

@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
  PollerMetadata poller() {
    return Pollers
      .fixedRate(5000)
      .maxMessagesPerPoll(1)
      .receiveTimeout(500)
      .taskExecutor(taskExecutor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory())
      .get();
  }

  @Bean
  @ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
  public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toSftpChannel");
    return bridgeHandler;
  }
spring-integration jsch spring-integration-sftp
1个回答
1
投票

null failedMessage是一个bug;再现INT-4421

我不建议在这种情况下使用QueueChannel。如果您使用直接频道,则可以配置retry advice以尝试重新发送。当重试耗尽时(如果这样配置),异常将被抛回到调用线程。

将建议添加到SftpMessageHandleradviceChain财产。

编辑

您可以通过在可轮询通道和sftp适配器之间插入桥接来解决“丢失”failedMessage:

@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toRealSftpChannel");
    return bridgeHandler;
}

@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("name");
        }
        else {
            throw new IllegalArgumentException("byte[] expected in Payload!");
        }
    });
    return handler;
}
© www.soinside.com 2019 - 2024. All rights reserved.