Kafka 配置模式配置
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServer());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
configProps.put(KAFKA_PROPERTY_SASL_MECHANISM, kafkaConfigProperties.getSaslMechanism());
configProps.put(KAFKA_PROPERTY_SASL_JAAS_CONFIG, kafkaConfigProperties.getSaslJaasConfig());
configProps.put(KAFKA_PROPERTY_SECURITY_PROTOCOL, kafkaConfigProperties.getSecurityProtocol());
configProps.put(KAFKA_PROPERTY_BASIC_AUTH_USER_INFO, kafkaConfigProperties.getRegistryAuthUserInfo());
configProps.put(KAFKA_PROPERTY_REGISTRY_AUTH_CREDENTIALS_SOURCE, kafkaConfigProperties.getRegistryCredentialSource());
configProps.put(KAFKA_PROPERTY_SCHEMA_REGISTRY_URL, kafkaConfigProperties.getSchemaRegistryUrl());
// Message publish code
GenericRecord inputRecord = new GenericData.Record(mySchema);
avroRecord.put("madatory_1", "Customvalue");
avroRecord.put("mandatory_3", "value2");
avroRecord.put("user_id", "99999");
String key = UUID.randomUUID().toString();
kafkaTemplate.send("topic_1", key, inputRecord);
出现这个错误。 org.apache.kafka.common.InvalidRecordException:日志记录 DefaultRecord(offset=0, timestamp=1677045032257, key=36 bytes, value=556 bytes) 被记录拦截器 io.confluent.cloud.kafka.schemaregistry.validator 拒绝。 CloudRecordSchemaValidator
在上面的代码中。我的架构是为这个主题生成的连接到注册表 url 的 avroSchema。
具有相同的生产者配置属性。它工作正常并且能够在 python 中发布消息。
发布 avro msg 在 python 中有效,但在 java spring kafka 中无效。