基于 Quarkus 的 Kafka Streams 应用程序中的多个拓扑

问题描述 投票:0回答:1

我正在尝试使用 Quarkus 构建 Kafka Streams 应用程序。我想知道如何在同一个 Quarkus 应用程序中拥有多个拓扑?

我试图像这样实现它:

@ApplicationScoped
public class TestToplogy {

    @Produces
    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }

    @Produces
    public Topology buildTopology2() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }
}

或者在同一个项目中使用多个@ApplicationScoped类:

@ApplicationScoped
public class TestToplogy1 {

    @Produces
    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }
}
@ApplicationScoped
public class TestToplogy2 {

    @Produces
    public Topology buildTopology() {
        final StreamsBuilder builder = new StreamsBuilder();
        // ...
        return builder.build();
    }
}

两种方法最终都会出现异常

jakarta.enterprise.inject.AmbiguousResolutionException
,因为 Quarkus Kafka Streams 期望通过注入只能解决一个拓扑。

有没有办法在一个 Quarkus 应用程序(jar)中启动并运行多个拓扑?

quarkus apache-kafka-streams quarkus-kafka
1个回答
0
投票

唯一对我有用的方法是 Quarkus 博客上描述的旧方法,它允许创建和配置 Kafka Streams 的多个实例。

它的优点是允许将不同的配置应用于 Kafka Streams 实例,但代价是:

    更多样板代码和生命周期管理,
  • 必须手动注入配置(代替下面示例中显示的硬编码值),
  • 与 Quarkus 框架其余部分的集成较弱(不确定实际影响)。
import io.quarkus.runtime.ShutdownEvent; import io.quarkus.runtime.StartupEvent; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.event.Observes; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @ApplicationScoped public class TopologyBuilderA { private KafkaStreams kafkaStreams; void buildTopologyA(@Observes StartupEvent startupEvent) { Properties properties = new Properties(); // TODO: populate actual values from externalized configuration properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-topology-a"); StreamsBuilder builder = new StreamsBuilder(); // builder.stream(...) kafkaStreams = new KafkaStreams(builder.build(), properties); kafkaStreams.start(); } void stopStream(@Observes ShutdownEvent shutdownEvent) { streams.close(); } }
请注意,各个拓扑之间的 

application.id

 需求(上例中的 
my-app-topology-a
)需要有所不同。

© www.soinside.com 2019 - 2024. All rights reserved.