与KTable从异常中间结果的话题加入KStream

问题描述 投票:0回答:1

我试图加入一个KTable一个KStream。如果没有加入我没有问题,从中间话题读“书属性按ID”。

对于KTable样本信息:

{key: {id: 1}
 value: {id: 1, attribute_name: "weight"}}

对于KStream样本信息:

{key: {id: 1},
 value: {id: 1, book_id: 1, attribute_id: 1, value: 200}}

所需的输出为“最终聚集”的话题:

{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "weight", value: 200}}
{key: {id: 1}, 
 value: {book_id: 1, attribute_name: "number_of_pages", value: 450}}

下面是代码

    KStream<DefaultId, BookAttribute> bookAttributeStream = builder.stream(bookAttributeTopic, Consumed.with(defaultIdSerde, bookAttributeSerde));
    KStream<DefaultId, BookValueInt> bookValueIntStream = builder.stream(bookValueIntTopic, Consumed.with(defaultIdSerde, bookValueIntSerde));

    bookAttributeStream
        .selectKey((k, v) -> k.getId())
        .to("book-attribute-by-id", Produced.with(Serdes.Integer(), bookAttributeSerde));

    KTable<Integer, BookAttribute> bookAttributeByIdTable = builder.table("book-attribute-by-id", Consumed.with(Serdes.Integer(), bookAttributeSerde));

    // when the snippet below is commented out, consuming "book-attribute-by-id" works. 
    bookValueIntStream
        .selectKey((k, v) -> v.getAttribute_id())
        .join(bookAttributeByIdTable, (intValue, attribute) -> {
                System.out.println("intValue: " + intValue);
                System.out.println("attribute: " + attribute);
                return new BookAttributeValue(intValue, attribute);
            });

接合KStream&KTable时例外:

异常的线程 “XXX-StreamThread-1” org.apache.kafka.streams.errors.TopologyBuilderException:无效的拓扑结构建筑:流线[XXX-StreamThread-1]主题未发现:书属性由-ID的组织。 apache.kafka.streams.processor.internals.StreamPartitionAssignor $ CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:792)

apache-kafka apache-kafka-streams
1个回答
2
投票

我想,你正在使用卡夫卡流1.0.0

问题是,你必须为你流创建输入主题。

你的情况议题是:book-attribute-by-id和那些变量的值:bookAttributeTopicbookValueIntTopic

对于加入卡夫卡流有保证,该分区的加盟主题的数目相等。异常被抛出,当其试图获得元数据的话题:book-attribute-by-id

在运行应用程序之前,你必须手动创建qazxsw POI话题

在卡夫卡流的新版本主题的存在是分区数的验证之前检查。

© www.soinside.com 2019 - 2024. All rights reserved.