kafka 流处理器内的异步 http 调用并将响应转发到下一个处理器

问题描述 投票:0回答:1

我有一个 kafka 流应用程序(用 java springboot 编写),它有 3 个处理器。在第二个处理器中,我想以异步方式调用rest api,当响应到来时,我想将响应转发到下一个处理器。我不希望流线程在执行其余 api 时阻塞其余 api 调用并获取其他消息。有什么办法可以做到这一点吗? 我拥有的处理器示例

public class secondprocessorsupplier implements ProcessorSupplier<IndexedRecord, IndexedRecord> {

    public secondprocessorsupplier() {
    }

    private static final ProcessorName PROCESSOR_NAME = ProcessorName.SECOND_PROCESSOR;

    
    @Override
    public Processor<IndexedRecord, IndexedRecord> get() {
        return new Secondprocessor();
    }

    private class Secondprocessor extends Processor<IndexedRecord, IndexedRecord> {    

        @Override
        public void init(ProcessorContext context) {
            this.context=context;
        
        }

        @Override
        public void process(final IndexedRecord keyInput, final IndexedRecord streamMessageInput) {
            final HttpClient httpclient= HttpClient.newBuilder()
                  .connectTimeout(Duration.ofMillis(30000))
                  .version(getClientVersion(httpClientVersion))
                  .followRedirects(behavior.getHttpClientRedirect())
                  .sslContext(sslContext)
                  .sslParameters(sslParameters)
                  .build());
            httpclient.sendAsync(httprequest, HttpResponse.BodyHandlers.ofByteArray()).thenAccept(
                response -> context.forward(keyInput, response)
            )
        }

        @Override
        public ProcessorName getProcessorName() {
            return PROCESSOR_NAME;
        }

    }
}

在上面的例子中,在thenAccept执行中上下文中的当前节点为null并且forward不起作用

spring-boot asynchronous apache-kafka-streams
1个回答
0
投票

您可以使用 STREAM_TIME 标点符号来异步处理事件:

  1. init
    方法中注册标点符号
    initSchedule = context.schedule(Duration.ofMillis(100), PunctuationType.STREAM_TIME, this::processRecords);
  2. process
    方法中将记录存储到本地集合,如果您只需要处理一次并且记录数据并不重要。如果数据很关键,寄存器内部存储来保存数据(推荐)。
  3. processRecords
    中对所有存储的记录进行 synch HTTP 调用并转发数据。再次强调,如果您只想处理记录一次,将其从集合/存储中删除。

它是如何工作的 - STREAM_TIME 标点符号仅在记录到达时触发。因此,如果每秒有一条记录,它将每 1 秒 + ~100 毫秒触发一次。如果您每 50 毫秒收到一条记录,标点符号将每 100 毫秒触发一次,平均处理 2 条记录。

© www.soinside.com 2019 - 2024. All rights reserved.