今天,我在Google卡夫卡(Kafka)州立商店中发现了一件很奇怪的东西,但没有找到该行为的原因。
考虑下面用java编写的状态存储:
private KeyValueStore<String, GenericRecord> userIdToUserRecord;
有two个处理器正在使用此状态存储。
topology.addStateStore(userIdToUserRecord, ALERT_PROCESSOR_NAME, USER_SETTING_PROCESSOR_NAME)
USER_SETTING_PROCESSOR_NAME会将数据放入状态存储
userIdToUserRecord.put("user-12345", record);
ALERT_PROCESSOR_NAME将从状态存储获取数据
userIdToUserRecord.get("user-12345");
向UserSettingProcessor添加源
userSettingTopicName = user-setting-topic;
topology.addSource(sourceName, userSettingTopicName)
.addProcessor(processorName, UserSettingProcessor::new, sourceName);
向AlertEngineProcessor添加源代码
alertTopicName = alert-topic;
topology.addSource(sourceName, alertTopicName)
.addProcessor(processorName, AlertEngineProcessor::new, sourceName);
情况1:使用Java中的Kafka Produce产生记录首先使用Java将记录生成到主题user-setting-topic,它将用户记录添加到状态存储中使用Java将第二个记录生成到主题alert-topic,它将使用用户ID userIdToUserRecord.get(“ user-12345”);
从状态存储中获取记录工作正常,我正在使用kafkaavroproducer来为两个主题生成记录
案例2:首先使用python将记录生成到主题user-setting-topic,它将用户记录添加到状态存储* userIdToUserRecord.put(“ user-100”,GenericRecord);
使用Java将第二个记录生成到主题alert-topic,它将使用用户ID userIdToUserRecord.get(“ user-100”);]]从状态存储中获取记录。
奇怪的是,这里userIdToUserRecord.get(“ user-100”)将返回null
我也检查这种情况我使用python将记录生成到用户设置主题,然后触发了userSettingProcessor处理方法,并在调试模式下进行了检查,并尝试从状态存储userIdToUserRecord.get(“ user-100”)中获取用户记录,在userSettingProcessor中可以正常工作从状态存储获取数据
然后我使用Java生成记录以提醒主题,然后尝试获取userIdToUserRecord.get(“ user-100”),它将返回null
我不知道这种奇怪的行为,任何人都告诉我这种行为。
Python代码:
value_schema = avro.load('user-setting.avsc') value = { "user-id":"user-12345", "client_id":"5cfdd3db-b25a-4e21-a67d-462697096e20", "alert_type":"WORK_ORDER_VOLUME" } print("------------------------Kafka Producer------------------------------") avroProducer = AvroProducer( {'bootstrap.servers': 'localhost:9092', 'schema.registry.url': 'http://localhost:8089'}, default_value_schema=value_schema) avroProducer.produce(topic="user-setting-topic", value=value) print("------------------------Sucess Producer------------------------------") avroProducer.flush()
Java代码:
Schema schema = new Schema.Parser().parse(schemaString);
GenericData.Record record = new GenericData.Record(schema);
record.put("alert_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
record.put("alert_created_at",123449437L);
record.put("alert_type","WORK_ORDER_VOLUME");
record.put("client_id","5cfdd3db-b25a-4e21-a67d-462697096e20");
//record.put("property_key","property_key-"+i);
record.put("alert_data","{\"alert_trigger_info\":{\"jll_value\":1.4,\"jll_category\":\"internal\",\"name\":\"trade_Value\",\"current_value\":40,\"calculated_value\":40.1},\"work_order\":{\"locations\":{\"country_name\":\"value\",\"state_province\":\"value\",\"city\":\"value\"},\"property\":{\"name\":\"property name\"}}}");
return record;
今天,我在Google卡夫卡州立商店中发现了一件很奇怪的东西,但没有找到该行为的原因。考虑下面用Java编写的状态存储:private KeyValueStore
问题是Java生产者和Python生产者(基于C生产者)使用不同的默认哈希函数进行数据分区。您需要为一个(或两个)提供定制的分区,以确保它们使用相同的分区策略。
不幸的是,Kafka协议未指定默认分区哈希函数应该是什么,因此客户端默认可以使用他们想要的任何东西。