Kafka Persistence State-Store不能与python和java结合使用

问题描述 投票:1回答:1

今天,我在Google卡夫卡(Kafka)州立商店中发现了一件很奇怪的东西,但没有找到该行为的原因。

考虑下面用java编写的状态存储:

private KeyValueStore<String, GenericRecord> userIdToUserRecord;

two个处理器正在使用此状态存储。

  topology.addStateStore(userIdToUserRecord, ALERT_PROCESSOR_NAME, USER_SETTING_PROCESSOR_NAME)

USER_SETTING_PROCESSOR_NAME会将数据放入状态存储

userIdToUserRecord.put("user-12345", record);

ALERT_PROCESSOR_NAME将从状态存储获取数据

userIdToUserRecord.get("user-12345");

向UserSettingProcessor添加源

userSettingTopicName = user-setting-topic;    
topology.addSource(sourceName, userSettingTopicName)
                    .addProcessor(processorName, UserSettingProcessor::new, sourceName);

向AlertEngineProcessor添加源代码

alertTopicName = alert-topic;
topology.addSource(sourceName, alertTopicName)
                    .addProcessor(processorName, AlertEngineProcessor::new, sourceName);

情况1:使用Java中的Kafka Produce产生记录首先使用Java将记录生成到主题user-setting-topic,它将用户记录添加到状态存储中使用Java将第二个记录生成到主题alert-topic,它将使用用户ID userIdToUserRecord.get(“ user-12345”);

从状态存储中获取记录

工作正常,我正在使用kafkaavroproducer来为两个主题生成记录

案例2:首先使用python将记录生成到主题user-setting-topic,它将用户记录添加到状态存储* userIdToUserRecord.put(“ user-100”,GenericRecord);

使用Java将第二个记录生成到主题alert-topic,它将使用用户ID userIdToUserRecord.get(“ user-100”);]]从状态存储中获取记录。

奇怪的是,这里userIdToUserRecord.get(“ user-100”)将返回null

我也检查这种情况我使用python将记录生成到用户设置主题,然后触发了userSettingProcessor处理方法,并在调试模式下进行了检查,并尝试从状态存储userIdToUserRecord.get(“ user-100”)中获取用户记录,在userSettingProcessor中可以正常工作从状态存储获取数据

然后我使用Java生成记录以提醒主题,然后尝试获取userIdToUserRecord.get(“ user-100”),它将返回null

我不知道这种奇怪的行为,任何人都告诉我这种行为。

Python代码:

value_schema = avro.load('user-setting.avsc')
value = {
    "user-id":"user-12345",
    "client_id":"5cfdd3db-b25a-4e21-a67d-462697096e20",
    "alert_type":"WORK_ORDER_VOLUME"
}

print("------------------------Kafka Producer------------------------------")
avroProducer = AvroProducer(
    {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8089'},
    default_value_schema=value_schema)
avroProducer.produce(topic="user-setting-topic", value=value)
print("------------------------Sucess Producer------------------------------")
avroProducer.flush() 

Java代码:

 Schema schema = new Schema.Parser().parse(schemaString);

        GenericData.Record record = new GenericData.Record(schema);
        record.put("alert_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
        record.put("alert_created_at",123449437L);
        record.put("alert_type","WORK_ORDER_VOLUME");
        record.put("client_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
        //record.put("property_key","property_key-"+i);

        record.put("alert_data","{\"alert_trigger_info\":{\"jll_value\":1.4,\"jll_category\":\"internal\",\"name\":\"trade_Value\",\"current_value\":40,\"calculated_value\":40.1},\"work_order\":{\"locations\":{\"country_name\":\"value\",\"state_province\":\"value\",\"city\":\"value\"},\"property\":{\"name\":\"property name\"}}}");
        return record;

今天,我在Google卡夫卡州立商店中发现了一件很奇怪的东西,但没有找到该行为的原因。考虑下面用Java编写的状态存储:private KeyValueStore

java python apache-kafka-streams kafka-producer-api confluent-kafka
1个回答
0
投票

问题是Java生产者和Python生产者(基于C生产者)使用不同的默认哈希函数进行数据分区。您需要为一个(或两个)提供定制的分区,以确保它们使用相同的分区策略。

不幸的是,Kafka协议未指定默认分区哈希函数应该是什么,因此客户端默认可以使用他们想要的任何东西。

© www.soinside.com 2019 - 2024. All rights reserved.