快速入门中的Ksql avro格式给出错误

问题描述 投票:0回答:1

嘿,我正在做KSQL快速入门示例。问题是我想使用avro格式生成数据,并且将错误列表底部列出。

本教程位于https://docs.ksqldb.io/en/latest/tutorials/basics-docker/

重复问题

git clone https://github.com/confluentinc/ksql.git
cd ksql

git checkout 5.5.0-post

cd docs/tutorials/
docker-compose up -d

如果您是docker ps,您应该看到以下容器正在运行:

confluentinc/ksqldb-examples:5.5.0 
confluentinc/cp-ksql-server:5.4.0
confluentinc/cp-schema-registry:5.4.0,
confluentinc/cp-enterprise-kafka:5.4.0
confluentinc/cp-zookeeper:5.4.0

如果我以这样的定界格式运行示例,代码将起作用:

docker run --network tutorials_default --rm --name datagen-users \
    confluentinc/ksqldb-examples:5.5.0 \
    ksql-datagen \
        bootstrap-server=kafka:39092 \
        quickstart=users \
        format=delimited \
        topic=users \
        msgRate=1 

该示例使用avro格式,我想使用该格式。此处的示例:

docker run --network tutorials_default --rm --name datagen-users \
    confluentinc/ksqldb-examples:5.5.0 \
    ksql-datagen \
        bootstrap-server=kafka:39092 \
        quickstart=users \
        format=avro \
        topic=users \
        msgRate=1 

当我使用avro格式时,出现以下错误:

[2020-06-06 15:46:47,632] INFO AvroDataConfig values: 
    connect.meta.data = true
    enhanced.avro.schema.support = false
    schemas.cache.config = 1000
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,650] INFO JsonSchemaDataConfig values: 
    decimal.format = BASE64
    schemas.cache.size = 1000
 (io.confluent.connect.json.JsonSchemaDataConfig:179)
[2020-06-06 15:46:47,651] INFO JsonSchemaDataConfig values: 
    decimal.format = BASE64
    schemas.cache.size = 1000
 (io.confluent.connect.json.JsonSchemaDataConfig:179)
[2020-06-06 15:46:47,654] INFO ProtobufDataConfig values: 
    schemas.cache.config = 1000
 (io.confluent.connect.protobuf.ProtobufDataConfig:179)
[2020-06-06 15:46:47,672] INFO KsqlConfig values: 
    ksql.access.validator.enable = auto
    ksql.authorization.cache.expiry.time.secs = 30
    ksql.authorization.cache.max.entries = 10000
    ksql.connect.url = http://localhost:8083
    ksql.connect.worker.config = 
    ksql.extension.dir = ext
    ksql.hidden.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.insert.into.values.enabled = true
    ksql.internal.topic.min.insync.replicas = 1
    ksql.internal.topic.replicas = 1
    ksql.metric.reporters = []
    ksql.metrics.extension = null
    ksql.metrics.tags.custom = 
    ksql.new.api.enabled = false
    ksql.output.topic.name.prefix = 
    ksql.persistence.wrap.single.values = true
    ksql.persistent.prefix = query_
    ksql.pull.queries.enable = true
    ksql.query.persistent.active.limit = 2147483647
    ksql.query.pull.enable.standby.reads = false
    ksql.query.pull.max.allowed.offset.lag = 9223372036854775807
    ksql.readonly.topics = [_confluent.*, __confluent.*, _schemas, __consumer_offsets, __transaction_state, connect-configs, connect-offsets, connect-status, connect-statuses]
    ksql.schema.registry.url = http://localhost:8081
    ksql.security.extension.class = null
    ksql.service.id = default_
    ksql.sink.window.change.log.additional.retention = 1000000
    ksql.streams.shutdown.timeout.ms = 300000
    ksql.transient.prefix = transient_
    ksql.udf.collect.metrics = false
    ksql.udf.enable.security.manager = true
    ksql.udfs.enabled = true
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
 (io.confluent.ksql.util.KsqlConfig:347)
[2020-06-06 15:46:47,720] INFO AvroDataConfig values: 
    connect.meta.data = true
    enhanced.avro.schema.support = false
    schemas.cache.config = 1
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,752] INFO ProcessingLogConfig values: 
    ksql.logging.processing.rows.include = false
    ksql.logging.processing.stream.auto.create = false
    ksql.logging.processing.stream.name = KSQL_PROCESSING_LOG
    ksql.logging.processing.topic.auto.create = false
    ksql.logging.processing.topic.name = 
    ksql.logging.processing.topic.partitions = 1
    ksql.logging.processing.topic.replication.factor = 1
 (io.confluent.ksql.logging.processing.ProcessingLogConfig:347)
[2020-06-06 15:46:47,767] INFO AvroConverterConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,770] INFO KafkaAvroSerializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,771] INFO KafkaAvroDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    specific.avro.reader = false
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,771] INFO AvroDataConfig values: 
    connect.meta.data = false
    enhanced.avro.schema.support = false
    schemas.cache.config = 1000
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,772] INFO AvroConverterConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,772] INFO KafkaAvroSerializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,772] INFO KafkaAvroDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    specific.avro.reader = false
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,773] INFO AvroDataConfig values: 
    connect.meta.data = false
    enhanced.avro.schema.support = false
    schemas.cache.config = 1000
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,774] INFO AvroConverterConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,775] INFO KafkaAvroSerializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,775] INFO KafkaAvroDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    specific.avro.reader = false
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,776] INFO AvroDataConfig values: 
    connect.meta.data = false
    enhanced.avro.schema.support = false
    schemas.cache.config = 1000
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,776] INFO AvroConverterConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.connect.avro.AvroConverterConfig:179)
[2020-06-06 15:46:47,776] INFO KafkaAvroSerializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroSerializerConfig:179)
[2020-06-06 15:46:47,777] INFO KafkaAvroDeserializerConfig values: 
    bearer.auth.token = [hidden]
    proxy.port = -1
    schema.reflection = false
    auto.register.schemas = true
    max.schemas.per.subject = 1000
    basic.auth.credentials.source = URL
    specific.avro.reader = false
    value.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
    schema.registry.url = [http://localhost:8081]
    basic.auth.user.info = [hidden]
    proxy.host = 
    schema.registry.basic.auth.user.info = [hidden]
    bearer.auth.credentials.source = STATIC_TOKEN
    key.subject.name.strategy = class io.confluent.kafka.serializers.subject.TopicNameStrategy
 (io.confluent.kafka.serializers.KafkaAvroDeserializerConfig:179)
[2020-06-06 15:46:47,777] INFO AvroDataConfig values: 
    connect.meta.data = false
    enhanced.avro.schema.support = false
    schemas.cache.config = 1000
 (io.confluent.connect.avro.AvroDataConfig:347)
[2020-06-06 15:46:47,817] WARN The configuration 'ksql.schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:355)
[2020-06-06 15:46:47,817] WARN The configuration 'ksql.schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig:355)
[2020-06-06 15:46:48,038] ERROR Failed to send HTTP request to endpoint: http://localhost:8081/subjects/users-value/versions (io.confluent.kafka.schemaregistry.client.rest.RestService:268)
java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:264)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:138)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:281)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at io.confluent.ksql.datagen.DataGenProducer.produceOne(DataGenProducer.java:122)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:91)
    at io.confluent.ksql.datagen.DataGen.lambda$getProducerTask$1(DataGen.java:111)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: users
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic users :
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:281)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at io.confluent.ksql.datagen.DataGenProducer.produceOne(DataGenProducer.java:122)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:91)
    at io.confluent.ksql.datagen.DataGen.lambda$getProducerTask$1(DataGen.java:111)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:463)
    at sun.net.www.http.HttpClient.openServer(HttpClient.java:558)
    at sun.net.www.http.HttpClient.<init>(HttpClient.java:242)
    at sun.net.www.http.HttpClient.New(HttpClient.java:339)
    at sun.net.www.http.HttpClient.New(HttpClient.java:357)
    at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1226)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1162)
    at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1056)
    at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:990)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1340)
    at sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1315)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:264)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:138)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:281)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at io.confluent.ksql.datagen.DataGenProducer.produceOne(DataGenProducer.java:122)
    at io.confluent.ksql.datagen.DataGenProducer.populateTopic(DataGenProducer.java:91)
    at io.confluent.ksql.datagen.DataGen.lambda$getProducerTask$1(DataGen.java:111)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
apache-kafka avro ksqldb
1个回答
0
投票

[似乎docker文件未正确公开Schema Registry的端口,请尝试如下添加ports映射:

schema-registry:
    image: <something>
    depends_on:
      - zookeeper
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
© www.soinside.com 2019 - 2024. All rights reserved.