无法使用 Spring KafkaProducer 发布 Kafka Avro Schema 消息 - 使用 Python

问题描述 投票:0回答:0

Kafka 配置模式配置

        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfigProperties.getBootstrapServer());
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                org.apache.kafka.common.serialization.StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                io.confluent.kafka.serializers.KafkaAvroSerializer.class);
        configProps.put(KAFKA_PROPERTY_SASL_MECHANISM, kafkaConfigProperties.getSaslMechanism());
        configProps.put(KAFKA_PROPERTY_SASL_JAAS_CONFIG, kafkaConfigProperties.getSaslJaasConfig());
        configProps.put(KAFKA_PROPERTY_SECURITY_PROTOCOL, kafkaConfigProperties.getSecurityProtocol());
        configProps.put(KAFKA_PROPERTY_BASIC_AUTH_USER_INFO, kafkaConfigProperties.getRegistryAuthUserInfo());
        configProps.put(KAFKA_PROPERTY_REGISTRY_AUTH_CREDENTIALS_SOURCE, kafkaConfigProperties.getRegistryCredentialSource());
        configProps.put(KAFKA_PROPERTY_SCHEMA_REGISTRY_URL, kafkaConfigProperties.getSchemaRegistryUrl());
       
// Message publish code
        GenericRecord inputRecord = new GenericData.Record(mySchema);
        avroRecord.put("madatory_1", "Customvalue");
        avroRecord.put("mandatory_3", "value2");
        avroRecord.put("user_id", "99999");
        String key = UUID.randomUUID().toString();
        kafkaTemplate.send("topic_1", key, inputRecord);

出现这个错误。 org.apache.kafka.common.InvalidRecordException:日志记录 DefaultRecord(offset=0, timestamp=1677045032257, key=36 bytes, value=556 bytes) 被记录拦截器 io.confluent.cloud.kafka.schemaregistry.validator 拒绝。 CloudRecordSchemaValidator

在上面的代码中。我的架构是为这个主题生成的连接到注册表 url 的 avroSchema。

具有相同的生产者配置属性。它工作正常并且能够在 python 中发布消息。

发布 avro msg 在 python 中有效,但在 java spring kafka 中无效。

spring-boot apache-kafka spring-kafka confluent-platform kafka-producer-api
© www.soinside.com 2019 - 2024. All rights reserved.