我正在尝试使用 Quarkus 构建 Kafka Streams 应用程序。我想知道如何在同一个 Quarkus 应用程序中拥有多个拓扑?
我试图像这样实现它:
@ApplicationScoped
public class TestToplogy {
@Produces
public Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
// ...
return builder.build();
}
@Produces
public Topology buildTopology2() {
final StreamsBuilder builder = new StreamsBuilder();
// ...
return builder.build();
}
}
或者在同一个项目中使用多个@ApplicationScoped类:
@ApplicationScoped
public class TestToplogy1 {
@Produces
public Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
// ...
return builder.build();
}
}
@ApplicationScoped
public class TestToplogy2 {
@Produces
public Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();
// ...
return builder.build();
}
}
两种方法最终都会出现异常
jakarta.enterprise.inject.AmbiguousResolutionException
,因为 Quarkus Kafka Streams 期望通过注入只能解决一个拓扑。
有没有办法在一个 Quarkus 应用程序(jar)中启动并运行多个拓扑?
唯一对我有用的方法是 Quarkus 博客上描述的旧方法,它允许创建和配置 Kafka Streams 的多个实例。
它的优点是允许将不同的配置应用于 Kafka Streams 实例,但代价是:
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
@ApplicationScoped
public class TopologyBuilderA {
private KafkaStreams kafkaStreams;
void buildTopologyA(@Observes StartupEvent startupEvent) {
Properties properties = new Properties();
// TODO: populate actual values from externalized configuration
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-topology-a");
StreamsBuilder builder = new StreamsBuilder();
// builder.stream(...)
kafkaStreams = new KafkaStreams(builder.build(), properties);
kafkaStreams.start();
}
void stopStream(@Observes ShutdownEvent shutdownEvent) {
streams.close();
}
}
请注意,各个拓扑之间的 application.id
需求(上例中的
my-app-topology-a
)需要有所不同。