Spring 集成:通过千分尺在不同通道上进行迹线传播?

问题描述 投票:0回答:1

如果中间有一个带有轮询器的

QueueChannel
,我将面临正确设置跟踪传播的问题:(

Flow 由 HTTP 调用发起,触发 Spring 集成的

MessagingGateway

@RestController
@RequiredArgsConstructor
class FooController {

    private final InitiateSftpImportGateway gateway;

    @ResponseStatus(HttpStatus.NO_CONTENT)
    @PostMapping("/bar")
    void runImportAt(@RequestParam String path) {
        gateway.runImportAt(path);
    }

    @MessagingGateway(defaultRequestChannel = "sftpListingChannel", errorChannel = "errorChannel")
    interface InitiateSftpImportGateway {
        @Gateway
        void runImportAt(String path);
    }
}

通道定义如下:

    @Bean
    MessageChannel sftpListingChannel(final ObservationRegistry observationRegistry) {
        final var directChannel = new DirectChannel();
        directChannel.registerObservationRegistry(observationRegistry);
        return directChannel;
    }

    @Bean
    MessageChannel sftpFetchingChannel(final ObservationRegistry observationRegistry) {
        final var queueChannel = new QueueChannel();
        queueChannel.registerObservationRegistry(observationRegistry);
        return queueChannel;
    }

以及实际流程中的片段 - 基于触发器,

sftpListingChannel
在 SFTP 上提取文件:

    @Bean
    @ServiceActivator(inputChannel = "sftpListingChannel", outputChannel = "fileSplittingChannel")
    MessageHandler listFiles(final CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
        final var outboundGateway = new SftpOutboundGateway(
            cachingSessionFactory,
            AbstractRemoteFileOutboundGateway.Command.LS.getCommand(),
            "payload"
        );
        outboundGateway.setOption(AbstractRemoteFileOutboundGateway.Option.RECURSIVE);
        return outboundGateway;
    }

并将它们分开:

    @Splitter(inputChannel = "fileSplittingChannel", outputChannel = "sftpFetchingChannel")
    List<FileInfo<SftpClient.DirEntry>> splitByFile(@Payload final List<FileInfo<SftpClient.DirEntry>> fileInfo) {
        return fileInfo;
    }

对于每个文件 - 我们下载它:

    @Bean
    @ServiceActivator(
        inputChannel = "sftpFetchingChannel",
        outputChannel = "...",
        poller = @Poller(value = "transactionalPoller")
    )
    MessageHandler fetchFiles(CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory) {
        final var outboundGateway = new SftpOutboundGateway(
            cachingSessionFactory,
            AbstractRemoteFileOutboundGateway.Command.GET.getCommand(),
            Constants.Expressions.FILE_LOCATION_FROM_HEADER
        );
        outboundGateway.setOption(
            AbstractRemoteFileOutboundGateway.Option.STREAM,
            AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP
        );
        return outboundGateway;
    }

它使用自定义轮询器来注入事务建议:

    @Bean
    PollerMetadata transactionalPoller(final TransactionManager transactionManager) {
        final var pollerMetadata = new PollerMetadata();

        pollerMetadata.setAdviceChain(
            List.of(new TransactionInterceptorBuilder().transactionManager(transactionManager).build())
        );

        return pollerMetadata;
    }

轮询器(?)终止初始跟踪并初始化一个新的跟踪 - 这很麻烦,因为我想跟踪整体流程并仅中继跨度 ID 以深入研究特定的文件子进程。

我最关心的是

errorChannel
- 如果拆分后的某些子操作抛出错误,则会在
errorChannel
上处理,但使用新的跟踪 ID :(

请建议我是否/如何调整我的配置以使跟踪传播更平滑。


免责声明:

  • 可能看起来很奇怪,首先是
    get
    (甚至是适配器),而不是普通的
    ls
    - 上面的代码只是更大流程的一个小子集
  • 为什么要使用轮询器对通道进行排队?基本上 - 我想要一个具有独立的、事务性错误处理的“for-each-loop”,这样一个文件就不会破坏流程,并且它仍然可以继续

记录一下:

  • 弹簧集成:6.1.4
  • 弹簧启动:3.1.5
  • 微米追踪(带勇敢桥):1.1.6
spring-integration spring-micrometer spring-integration-sftp micrometer-tracing
1个回答
0
投票

你确实错过了观察传播

QueueChannel
在另一个线程上被消耗,并且不知道生产者线程中的跟踪。因此,您需要配置提到的
ObservationPropagationChannelInterceptor
,以通过存储在该通道中的消息将跟踪从生产者传播到消费者。

© www.soinside.com 2019 - 2024. All rights reserved.