使用融合的云模式注册表客户端注册 jackson avro 模式时出错

问题描述 投票:0回答:1

我使用 Jackson 从 POJO 生成 Avro 模式并将其序列化。到目前为止,在杰克逊的帮助下效果很好。

但是,我无法使用缓存模式注册表将此模式上传到模式注册表。

测试设置与 Testcontainer 见这里

  SchemaRegistryClient schemaRegistryClient( final String url ) {
     return new CachedSchemaRegistryClient( url, 100 );
  }

 public byte[] serialize( final Optional<Object> root ) {
      if ( root.isEmpty() ) {
         return new byte[0];
      }
      final Object rootObject = root.get();
      final String subjectName = rootObject.getClass().getSimpleName();

      try {
         // Jackson Avro Schema
         final AvroSchema schema = getAvroSchema( rootObject );
         final io.confluent.kafka.schemaregistry.avro.AvroSchema avroSchema = new io.confluent.kafka.schemaregistry.avro.AvroSchema( schema.getAvroSchema(),
               1 );
         schemaRegistryClient.register( subjectName, avroSchema, false );
         return avroMapper.writer( schema ).writeValueAsBytes( rootObject );
      } catch ( final IOException | RuntimeException e ) {
         throw new SerializationException( "Error serializing Avro message", e );
      } catch ( final RestClientException e ) {
         throw toKafkaException( e );
      }

   }

错误是:

java.net.SocketException: Unexpected end of file from server
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:900)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1688)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589)
    at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:290)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:397)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:370)

有人知道这方面的经验吗?

编辑

Client 是 7.3.3,Container 是 confluentinc/cp-schema-registry:7.3.3.

有趣的是网址。重用了 KafkaContainer 的想法,但也许我需要另一个端口?

第一次尝试:

   public String getClientUrl() {
      return String.format( "http://%s:%s", getHost(), getMappedPort( SCHEMA_REGISTRY_PORT ) );
   }

错误:

Caused by: java.net.SocketException: Unexpected end of file from server

第二次尝试:

       public String getClientUrl() {
          return String.format( "http://%s:%s", getHost(), getExposedPorts().get( 0 ) );
       }

错误:

Caused by: java.net.ConnectException: Connection refused

Avro Schema 看起来像这样:

{
  "type": "record",
  "name": "TestDto",
  "namespace": "test.model",
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        {
          "type": "long",
          "java-class": "java.lang.Long"
        }
      ]
    },
    {
      "name": "labels",
      "type": [
        "null",
        {
          "type": "array",
          "items": "string",
          "java-class": "java.util.Set"
        }
      ]
    }
  ]
}

我有点不明白为什么所有的 java 类型都在里面,但我想那是另一个话题了。

接下来的尝试,是以融合用户 avro 为例:

      final io.confluent.kafka.schemaregistry.avro.AvroSchema schema1 = new io.confluent.kafka.schemaregistry.avro.AvroSchema( new Schema.Parser().parse( "{\n"
            + "  \"namespace\": \"io.confluent.developer\",\n"
            + "  \"type\": \"record\",\n"
            + "  \"name\": \"User\",\n"
            + "  \"fields\": [\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"string\",\n"
            + "      \"avro.java.string\": \"String\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"int\"\n"
            + "    }\n"
            + "  ]\n"
            + "}"));
schemaRegistryClient.register( subjectName, schema1, false );

Json Schema 寄存器现在也不起作用。所以寻找模式注册表的一般问题。

试过:

spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

Caused by: java.net.SocketException: Unexpected end of file from server
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
java apache-kafka avro confluent-schema-registry jackson-dataformat-avro
1个回答
0
投票

Schema Registry Testcontainer 的环境错误 改变自

withEnv( "SCHEMA_REGISTRY_HOST_NAME", "localhost" );

withEnv( "SCHEMA_REGISTRY_HOST_NAME", "cp-schema-registry" );
© www.soinside.com 2019 - 2024. All rights reserved.