我使用 Jackson 从 POJO 生成 Avro 模式并将其序列化。到目前为止,在杰克逊的帮助下效果很好。
但是,我无法使用缓存模式注册表将此模式上传到模式注册表。
测试设置与 Testcontainer 见这里:
SchemaRegistryClient schemaRegistryClient( final String url ) {
return new CachedSchemaRegistryClient( url, 100 );
}
public byte[] serialize( final Optional<Object> root ) {
if ( root.isEmpty() ) {
return new byte[0];
}
final Object rootObject = root.get();
final String subjectName = rootObject.getClass().getSimpleName();
try {
// Jackson Avro Schema
final AvroSchema schema = getAvroSchema( rootObject );
final io.confluent.kafka.schemaregistry.avro.AvroSchema avroSchema = new io.confluent.kafka.schemaregistry.avro.AvroSchema( schema.getAvroSchema(),
1 );
schemaRegistryClient.register( subjectName, avroSchema, false );
return avroMapper.writer( schema ).writeValueAsBytes( rootObject );
} catch ( final IOException | RuntimeException e ) {
throw new SerializationException( "Error serializing Avro message", e );
} catch ( final RestClientException e ) {
throw toKafkaException( e );
}
}
错误是:
java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:900)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1688)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:290)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:397)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:370)
有人知道这方面的经验吗?
编辑
Client 是 7.3.3,Container 是 confluentinc/cp-schema-registry:7.3.3.
有趣的是网址。重用了 KafkaContainer 的想法,但也许我需要另一个端口?
第一次尝试:
public String getClientUrl() {
return String.format( "http://%s:%s", getHost(), getMappedPort( SCHEMA_REGISTRY_PORT ) );
}
错误:
Caused by: java.net.SocketException: Unexpected end of file from server
第二次尝试:
public String getClientUrl() {
return String.format( "http://%s:%s", getHost(), getExposedPorts().get( 0 ) );
}
错误:
Caused by: java.net.ConnectException: Connection refused
Avro Schema 看起来像这样:
{
"type": "record",
"name": "TestDto",
"namespace": "test.model",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "long",
"java-class": "java.lang.Long"
}
]
},
{
"name": "labels",
"type": [
"null",
{
"type": "array",
"items": "string",
"java-class": "java.util.Set"
}
]
}
]
}
我有点不明白为什么所有的 java 类型都在里面,但我想那是另一个话题了。
接下来的尝试,是以融合用户 avro 为例:
final io.confluent.kafka.schemaregistry.avro.AvroSchema schema1 = new io.confluent.kafka.schemaregistry.avro.AvroSchema( new Schema.Parser().parse( "{\n"
+ " \"namespace\": \"io.confluent.developer\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"User\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"string\",\n"
+ " \"avro.java.string\": \"String\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"age\",\n"
+ " \"type\": \"int\"\n"
+ " }\n"
+ " ]\n"
+ "}"));
schemaRegistryClient.register( subjectName, schema1, false );
Json Schema 寄存器现在也不起作用。所以寻找模式注册表的一般问题。
试过:
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
Caused by: java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
Schema Registry Testcontainer 的环境错误 改变自
withEnv( "SCHEMA_REGISTRY_HOST_NAME", "localhost" );
到
withEnv( "SCHEMA_REGISTRY_HOST_NAME", "cp-schema-registry" );