Kafka Streams - 无法搜索状态存储

问题描述 投票:0回答:1

我有一个使用两个状态存储的 Kafka Stream 应用程序。我在 Strimzi 集群 (kafka:0.29.0-kafka-3.1.0) 上的 Openshift 上运行此应用程序时遇到问题。

这意味着当我收到 bp-addr 记录时,它会成功从 bp-state-store 获取记录,但从 person-state-store 始终返回 null。

当 person-addr 记录到达流并尝试从 bp-state-store 检索条目时,它也会返回 null。

尝试使用具有精确键和

.get()
.prefixScan()
方法。对于这两种情况,我都会得到这种奇怪的行为。

我已经仔细检查了密钥、条目、状态存储状态,它们看起来很好。更重要的是,我的单元测试以及本地 docker kafka 集群 (cp-kafka) 都可以正常工作。

我尝试检查 strimzi 集群上的 ACL 权限,它们看起来也正确(将我的用户添加为超级用户来检查它)。从头开始构建的两个不同的 openshift 命名空间。

有人有想法/提示我可以检查什么吗?

    private StreamsBuilder buildTopology(KafkaStreamsProperty kafkaStreamsProperty,
                                         SpecificAvroSerde<JoinedPersonAddrV2> joinedPASerde,
                                         SpecificAvroSerde<JoinedBpAddrV2> joinedBASerde,
                                         SpecificAvroSerde<ObjectUpdateEvent> updateSerde
    ) {
        StreamsBuilder builder = new StreamsBuilder();

        final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaStreamsProperty.getSchemaRegistryUrl());

        joinedBASerde.configure(serdeConfig, false);
        joinedPASerde.configure(serdeConfig, false);
        updateSerde.configure(serdeConfig, false);

        KStream<String, JoinedPersonAddrV2> personAddrKStream = builder.stream(kafkaStreamsProperty.getPersonInputTopic(), Consumed.with(Serdes.String(), joinedPASerde));
        KStream<String, JoinedBpAddrV2> bpAddrKStream = builder.stream(kafkaStreamsProperty.getBpInputTopic(), Consumed.with(Serdes.String(), joinedBASerde));

        StoreBuilder<KeyValueStore<String, JoinedBpAddrV2>> bpStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(BP_STORE_NAME),
                Serdes.String(),
                joinedBASerde
        );

        StoreBuilder<KeyValueStore<String, JoinedPersonAddrV2>> personStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(PERSON_STORE_NAME),
                Serdes.String(),
                joinedPASerde
        );

        StoreBuilder<KeyValueStore<String, Long>> debounceStoreBuilder = Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore(DEBOUNCE_STORE),
                Serdes.String(),
                Serdes.Long()
        );

        builder.addStateStore(bpStoreBuilder);
        builder.addStateStore(personStoreBuilder);
        builder.addStateStore(debounceStoreBuilder);

        KStream<String, BpAddrOrPersonAddrV2> mergedStream = bpAddrKStream
                .mapValues(value -> new BpAddrOrPersonAddrV2(null, value))
                .merge(personAddrKStream.mapValues(value -> new BpAddrOrPersonAddrV2(value, null)));

        mergedStream.process(() -> new ContextualProcessor<String, BpAddrOrPersonAddrV2, String, ObjectUpdateEvent>() {
            @Override
            public void process(Record<String, BpAddrOrPersonAddrV2> record) {
                // assign value to a variable
                BpAddrOrPersonAddrV2 eitherValue = record.value();
                boolean isHashUpdated = false;

                // get state stores
                KeyValueStore<String, JoinedBpAddrV2> bpAddrStateStore = context().getStateStore(BP_STORE_NAME);
                KeyValueStore<String, JoinedPersonAddrV2> personAddrStateStore = context().getStateStore(PERSON_STORE_NAME);

                // assign single Values
                JoinedBpAddrV2 bpAddr = eitherValue.getBpAddr();
                JoinedPersonAddrV2 personAddr = eitherValue.getPersonAddr();

                // process if bp-addr record
                if (bpAddr != null) {
                    String[] splittedKey = record.key().split(":");
                    String personIdAddrIdKey = String.format("%s:%s", splittedKey[1], splittedKey[2]);

                    boolean isEmittent = isEmittent(bpAddr);

                    if (isRegisteredOwnerId(bpAddr, splittedKey[1]) || isEmittent) {
//                        personId, addrId, bpId
                        String revertedKey = String.format("%s:%s:%s", splittedKey[1], splittedKey[2], splittedKey[0]);

                        // get stored value from bp-addr state store
                        JoinedBpAddrV2 storedValue = bpAddrStateStore.get(revertedKey);
                        log.info("bpaddr entry storedValue: {}", storedValue);
                        if (storedValue == null) {
                            log.debug("{}, New BpAddr Table Entry for: {}, hash: {}", BP_STORE_NAME, record.key(), bpAddr.getHash());
                            bpAddrStateStore.put(revertedKey, bpAddr);
                            isHashUpdated = true;
                        } else if (!Objects.equals(storedValue.getHash(), bpAddr.getHash())) {
                            log.debug("{}, Update BpAddr Entry for key: {}, old hash: {}, new hash: {}", BP_STORE_NAME, record.key(), storedValue.getHash(), bpAddr.getHash());
                            bpAddrStateStore.put(revertedKey, bpAddr);
                            isHashUpdated = true;
                        } else {
                            log.debug("{}, No hash change do nothing for BpAddr key:{} hash value:{}", BP_STORE_NAME, record.key(), storedValue.getHash());
                        }
                    }

                    if (isHashUpdated) {
                        log.info("SEARCHING IN PERSON - ADDR STATE STORE WITH A KEY: {}", personIdAddrIdKey);

                        JoinedPersonAddrV2 matching = personAddrStateStore.get(personIdAddrIdKey);
                        log.info("MATCHING: {}", matching);

                        KeyValueIterator<String, JoinedPersonAddrV2> matchedPersonAddrIterator = personAddrStateStore.prefixScan(personIdAddrIdKey, new StringSerializer());

                        if (matchedPersonAddrIterator.hasNext()) {
                            while (matchedPersonAddrIterator.hasNext()) {
                                JoinedPersonAddrV2 matchedPersonAddr = matchedPersonAddrIterator.next().value;
                                // if the bp-addr is an emmitent and didn't find mathing personAddr look with just personId using prefix scan
                                // because addrId is not propagated on bp, therefore they might have a different domiAddr on bp and person
                                if (matchedPersonAddr == null && isEmittent) {
                                    String personId = splittedKey[1];
                                    // do a prefix scan for a personId
                                    KeyValueIterator<String, JoinedPersonAddrV2> joinedPersonIterator = personAddrStateStore.prefixScan(personId, new StringSerializer());
                                    while (joinedPersonIterator.hasNext()) {
                                        JoinedPersonAddrV2 item = joinedPersonIterator.next().value;
                                        context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(item, bpAddr)));
                                    }
                                    // "standard" match forward the record
                                } else if (matchedPersonAddr != null) {
                                    context().forward(record.withKey(personIdAddrIdKey).withValue(createObjectUpdateEvent(matchedPersonAddr, bpAddr)));
                                }
                            }
                        }
                    }

                } else if (personAddr != null) { // process if person-addr record is coming

                    if (record.value() != null) {
                        JoinedPersonAddrV2 storedValue = personAddrStateStore.get(record.key());
                        log.info("personaddr entry storedValue: {}", storedValue);

                        if (storedValue == null) {
                            log.debug("{}, New PersonAddr Table entry for: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
                            personAddrStateStore.put(record.key(), personAddr);
                            isHashUpdated = true;
                        } else if (!storedValue.getHash().equals(personAddr.getHash())) {
                            log.debug("{}, Update PersonAddr Table entry for: {}, old hash: {}, new hash: {}", PERSON_STORE_NAME, record.key(), storedValue.getHash(), personAddr.getHash());
                            personAddrStateStore.put(record.key(), personAddr);
                            isHashUpdated = true;
                        } else {
                            log.debug("{}, No hash change do nothing for PersonAddr key: {}, hash: {}", PERSON_STORE_NAME, record.key(), personAddr.getHash());
                        }
                    } else {
                        log.debug("Got NULL JoinedPersonAddr Object - skip! --> key: {}", record.key());
                    }

                    if (isHashUpdated) {
                        try {

                            // look for matches in bpAddr store
                            log.info("SEARCHING IN BP - ADDR STATE STORE WITH A KEY: {}", record.key());
                            KeyValueIterator<String, JoinedBpAddrV2> bpAddrIterator = bpAddrStateStore.prefixScan(record.key(), new StringSerializer());

                            if (bpAddrIterator.hasNext()) {
                                log.trace("Found records for key {} record in {}", record.key(), BP_STORE_NAME);
                                // got matches for given personId:addrId update all found records
                                while (bpAddrIterator.hasNext()) {
                                    JoinedBpAddrV2 bpAddrObject = bpAddrIterator.next().value;
                                    context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrObject)));
                                }
                            } else {
                                log.trace("Found NO records for key {} record in {}", record.key(), BP_STORE_NAME);
                                String[] splittedKey = record.key().split(":");
                                final String ZERO = "0";

                                // only when addr != 0 and is emittent
                                if (personAddr.getIsEmittent() && !splittedKey[1].equals(ZERO)) {
                                    String personIdNullAddrKey = String.format("%s:%s", splittedKey[0], ZERO);
//                                look for records in bp-addr state store with key personId:0
                                    KeyValueIterator<String, JoinedBpAddrV2> bpNullAddrIterator = bpAddrStateStore.prefixScan(personIdNullAddrKey, new StringSerializer());
                                    if (bpNullAddrIterator.hasNext()) {
                                        log.trace("Found records for key {} record in {}", personIdNullAddrKey, BP_STORE_NAME);
                                        while (bpNullAddrIterator.hasNext()) {
                                            JoinedBpAddrV2 bpAddrV2 = bpNullAddrIterator.next().value;
                                            context().forward(record.withKey(record.key()).withValue(createObjectUpdateEvent(personAddr, bpAddrV2)));
                                        }
                                    }
                                } else {
                                    log.trace("Found NO matched sending PERSON_ONLY records");
                                    // no match in bpAddr state store send PERSON_ONLY event
                                    context().forward(record.withValue(createObjectUpdateEvent(personAddr, null)));
                                }
                            }
                        } catch (Exception e) {
                            log.error("EXCEPTION : {}", e.getMessage());
                        }
                    }
                }
            }
        }, BP_STORE_NAME, PERSON_STORE_NAME)
                .process(() -> new DebounceTransformer<>(DEBOUNCE_STORE, 5000), DEBOUNCE_STORE)
                .peek((key, value) -> log.debug("Produced Update Event key: {}, hash: {}", key, value.getHash()))
                .to(kafkaStreamsProperty.getOutputTopic(), Produced.with(Serdes.String(), updateSerde));

        return builder;
    }

拓扑如下:

Topology: Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [person.addr.join.topic])
      --> KSTREAM-MAPVALUES-0000000003
    Source: KSTREAM-SOURCE-0000000001 (topics: [bp.addr.join.topic])
      --> KSTREAM-MAPVALUES-0000000002
    Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
      --> KSTREAM-MERGE-0000000004
      <-- KSTREAM-SOURCE-0000000001
    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
      --> KSTREAM-MERGE-0000000004
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MERGE-0000000004 (stores: [])
      --> KSTREAM-PROCESSOR-0000000005
      <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
    Processor: KSTREAM-PROCESSOR-0000000005 (stores: [bp-state-store, person-state-store])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-MERGE-0000000004
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [debounce-store])
      --> KSTREAM-PEEK-0000000007
      <-- KSTREAM-PROCESSOR-0000000005
    Processor: KSTREAM-PEEK-0000000007 (stores: [])
      --> KSTREAM-SINK-0000000008
      <-- KSTREAM-PROCESSOR-0000000006
    Sink: KSTREAM-SINK-0000000008 (topic: update.event.topic)
      <-- KSTREAM-PEEK-0000000007
apache-kafka apache-kafka-streams strimzi
1个回答
0
投票

“问题”是我使用的 strimzi 集群有一个分区主题。 情况:如果您有一个具有 10 个分区的主题 A 和 B,并且使用默认分区器将消息发送到这些主题,则 BpAddr 记录和 PersonAddr 记录可能位于不同的分区中。

这意味着状态存储中的条目(也是分区的)随后也会在不同的分区中创建 --> 因此,当更新记录过来并且应用程序想要检查状态存储中的条目时,我们必须确保PersonAddr 和 BpAddr 始终位于同一分区。

通过为生产者编写自定义分区器来解决此问题,该生产者写入 PersonAddr 和 BpAddr 记录以确保它们位于主题的同一分区中。

© www.soinside.com 2019 - 2024. All rights reserved.