kafka.common.KafkaException:无法将Zookeeper的经纪人信息从EC2解析为弹性搜索

问题描述 投票:0回答:1

我已经设置了aws MSK,我正尝试将记录从MSK转移到弹性搜索。我能够将数据推入MSK变成json格式。我想陷入弹性搜索。我能够正确设置所有设置。这是我在EC2实例上所做的

wget /usr/local http://packages.confluent.io/archive/3.1/confluent-oss-3.1.2-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-oss-3.1.2-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-3.1.2 /usr/local/confluent

/usr/local/confluent/etc/kafka-connect-elasticsearch

之后,我修改了kafka-connect-elasticsearch并设置了我的弹性搜索网址

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=AWSKafkaTutorialTopic
key.ignore=true
connection.url=https://search-abcdefg-risdfgdfgk-es-ex675zav7k6mmmqodfgdxxipg5cfsi.us-east-1.es.amazonaws.com
type.name=kafka-connect

生产者发送以下消息,如fomrat

{
        "data": {
                "RequestID":    517082653,
                "ContentTypeID":        9,
                "OrgID":        16145,
                "UserID":       4,
                "PromotionStartDateTime":       "2019-12-14T16:06:21Z",
                "PromotionEndDateTime": "2019-12-14T16:16:04Z",
                "SystemStartDatetime":  "2019-12-14T16:17:45.507000000Z"
        },
        "metadata":     {
                "timestamp":    "2019-12-29T10:37:31.502042Z",
                "record-type":  "data",
                "operation":    "insert",
                "partition-key-type":   "schema-table",
                "schema-name":  "dbo",
                "table-name":   "TRFSDIQueue"
        }
}

我对kafka连接如何从这里开始感到困惑?如果是的话,我该如何开始?

我也已经启动了如下所示的架构注册表,这给了我错误。

/usr/local/confluent/bin/schema-registry-start /usr/local/confluent/etc/schema-registry/schema-registry.properties

当我这样做时,我得到以下错误

[2019-12-29 13:49:17,861] ERROR Server died unexpectedly:  (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51)
kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"listener_security_protocol_map":{"CLIENT":"PLAINTEXT","CLIENT_SECURE":"SSL","REPLICATION":"PLAINTEXT","REPLICATION_SECURE":"SSL"},"endpoints":["CLIENT:/

请帮助。

根据回答的建议,我升级了kafka connect版本,但随后我开始出现错误提示

 ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:63)
io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException: Error initializing kafka store while initializing schema registry
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:210)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.initSchemaRegistry(SchemaRegistryRestApplication.java:61)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:72)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:39)
        at io.confluent.rest.Application.createServer(Application.java:201)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:41)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: Timed out trying to create or validate schema topic configuration
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:168)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:111)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:208)
        ... 5 more
Caused by: java.util.concurrent.TimeoutException
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.createOrVerifySchemaTopic(KafkaStore.java:161)
        ... 7 more
elasticsearch apache-kafka apache-kafka-connect confluent-schema-registry
1个回答
1
投票

首先,Confluent Platform 3.1.2相当老。我建议您获得与Kafka版本一致的版本

您使用位于bin和etc / kafka文件夹下的相应connect-*脚本和属性来启动Kafka Connect

例如,

/usr/local/confluent/bin/connect-standalone \
  /usr/local/confluent/etc/kafka/kafka-connect-standalone.properties \ 
  /usr/local/confluent/etc/kafka-connect-elasticsearch/quickstart.properties

关于Schema Registry,您可以搜索其Github问题,以寻找试图使MSK正常工作的多个人,但是根本问题与MSK不公开PLAINTEXT侦听器以及Schema Registry不支持命名侦听器有关。 (自版本5.x起,这可能已更改)


您也可以尝试在ECS / EKS中使用Connect和Schema Registry容器,而不是在EC2计算机中提取

© www.soinside.com 2019 - 2024. All rights reserved.