如何使用 Kafka connect 与模式注册表反序列化来自 Kafka 的 Divolte AVRO 记录?

问题描述 投票:0回答:1

如何使用 Kafka-connect 消费和反序列化 Divolte 收集器写入 Kafka 的 Avro 记录?我想将序列化的 AVRO 记录转回 JSON 事件并将它们放入运动数据流中。

我正在使用 AWS labs kinesis streams sink 插件,我的配置设置如下。我最初尝试在裸模式下使用 Divolte 并导入无注册的 Avro 转换器(在下面的工作配置中注释掉)但我无法让它工作,因为它返回错误“不是 Avro 文件”。

然后我将 Divolte 切换到融合模式,以便使用融合注册表/转换器'io.confluent.connect.avro.AvroConverter',如下所示(未注释)。这会引发错误:

kafka-connect| Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

这是出乎意料的,因为我在融合模式下使用 divolte,而解串器是融合解串器。以下是工作人员配置:

bootstrap.servers=broker:29092

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081/subjects/Kafka-key/versions/1
value.converter.schema.registry.url=http://localhost:8081/subjects/Kafka-key/versions/1

#key.converter=me.frmr.kafka.connect.RegistrylessAvroConverter
#value.converter=me.frmr.kafka.connect.RegistrylessAvroConverter
#key.converter.schema.path=/opt/kafka/config/DefaultEventRecord.avsc
#value.converter.schema.path=/opt/kafka/config/DefaultEventRecord.avsc

key.converter.schemas.enable=false
value.converter.schemas.enable=false

#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true

offset.storage.file.filename=offset.log
schemas.enable=false
plugin.path=/opt/kafka/plugins/

我的 divolte 收集器配置似乎是正确的,我添加了 1 的 confluent_id,对应于模式发布到模式注册表时的默认 divolte 模式 ID:

divolte {
  global {
    server {
      host = 0.0.0.0
      host = ${?DIVOLTE_HOST}
      port = 8290
      port = ${?DIVOLTE_PORT}
      use_x_forwarded_for = false
      use_x_forwarded_for = ${?DIVOLTE_USE_XFORWARDED_FOR}
      serve_static_resources = true
      serve_static_resources = ${?DIVOLTE_SERVICE_STATIC_RESOURCES}
      debug_requests = false
    }

    mapper {
      buffer_size = 1048576
      threads = 1
      duplicate_memory_size = 1000000
      user_agent_parser {
        type = non_updating
        cache_size = 1000
      }
    }

    kafka {
      enabled = false
      enabled = ${?DIVOLTE_KAFKA_ENABLED}
      threads = 2
      buffer_size = 1048576
      producer = {
        bootstrap.servers = ["localhost:9092"]
        bootstrap.servers = ${?DIVOLTE_KAFKA_BROKER_LIST}
        client.id = divolte.collector
        client.id = ${?DIVOLTE_KAFKA_CLIENT_ID}
        acks = 1
        retries = 0
        compression.type = lz4
        max.in.flight.requests.per.connection = 1

        sasl.jaas.config = ""
        sasl.jaas.config = ${?KAFKA_SASL_JAAS_CONFIG}

        security.protocol = PLAINTEXT
        security.protocol = ${?KAFKA_SECURITY_PROTOCOL}
        sasl.mechanism = GSSAPI
        sasl.kerberos.service.name = kafka
      }
    }
  }

  sources {
    browser1 = {
      type = browser
      event_suffix = event
      party_cookie = _dvp
      party_cookie = ${?DIVOLTE_PARTY_COOKIE}
      party_timeout = 730 days
      party_timeout = ${?DIVOLTE_PARTY_TIMEOUT}
      session_cookie = _dvs
      session_cookie = ${?DIVOLTE_SESSION_COOKIE}
      session_timeout = 30 minutes
      session_timeout = ${?DIVOLTE_SESSION_TIMEOUT}
      cookie_domain = ''
      cookie_domain = ${?DIVOLTE_COOKIE_DOMAIN}

      javascript {
        name = divolte.js
        name = ${?DIVOLTE_JAVASCRIPT_NAME}
        logging = false
        logging = ${?DIVOLTE_JAVASCRIPT_LOGGING}
        debug = false
        debug = ${?DIVOLTE_JAVASCRIPT_DEBUG}
        auto_page_view_event = true
        auto_page_view_event = ${?DIVOLTE_JAVASCRIPT_AUTO_PAGE_VIEW_EVENT}
      }
    }
  }

  sinks {
    kafka1 {
      type = kafka
      mode = confluent
      topic = clickstream
      topic = ${?DIVOLTE_KAFKA_TOPIC}
    }
  }

  mappings {
    a_mapping = {
    //schema_file = /opt/divolte/conf/DefaultEventRecord.avsc
    //mapping_script_file = schema-mapping.groovy
    confluent_id = 1
    sources = [browser1]
    sinks = [kafka1]
    }
  }
}

从 Divolte 配置可以看出,我没有声明架构或架构映射,因此使用默认值。所以在这种情况下,我将 DefaultEventRecord.avsc 模式文件注册为模式注册表:

{"schema":"{\"namespace\":\"io.divolte.record\",\"type\":\"record\",\"name\":\"DefaultEventRecord\",\"fields\":[{\"name\":\"detectedDuplicate\",\"type\":\"boolean\"},{\"name\":\"detectedCorruption\",\"type\":\"boolean\"},{\"name\":\"firstInSession\",\"type\":\"boolean\"},{\"name\":\"timestamp\",\"type\":\"long\"},{\"name\":\"clientTimestamp\",\"type\":\"long\"},{\"name\":\"remoteHost\",\"type\":\"string\"},{\"name\":\"referer\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"location\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"viewportPixelWidth\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"viewportPixelHeight\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"screenPixelWidth\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"screenPixelHeight\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"partyId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"sessionId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"pageViewId\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"eventType\",\"type\":\"string\",\"default\":\"unknown\"},{\"name\":\"userAgentString\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentFamily\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentVendor\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentType\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentVersion\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentDeviceCategory\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsFamily\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsVersion\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"userAgentOsVendor\",\"type\":[\"null\",\"string\"],\"default\":null}]}"} 

并使用命令将其发布到模式注册表

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @divolte-schema-v1.avsc http://localhost:8081/subjects/Kafka-key/versions

并将 URL 包含在工作人员配置中。

如果有人能指出我正确的方向,我将不胜感激。由于 Divolte 收集器旨在写入 Kafka,甚至具有内置的汇合模式,我希望使用 Kafka connect 从 Kafka 消费并使用 Kafka-connect 的内置转换器进行反序列化应该非常简单,但显然我有矛盾的东西。我的架构文件是否正确?我是否使用了正确的转换器来撤消 Divolte 的 AVRO 序列化?我是否正确配置了 Divolte 以在汇合模式下工作,汇合 ID 是否正确?文档对此不是很具体(我设置了 ID = 1,这是我在启动后将第一个模式发布到模式注册表时返回的 ID)。

apache-kafka deserialization avro confluent-schema-registry divolte
1个回答
0
投票

我认为你可能错误配置了 Kafka Connect。

internal.*.converter
属性不应被触及。事实上,它们已被弃用,应该被删除......

schema.registry.url
值不应该指任何版本,只
http://localhost:8081

schemas.enable
本身什么都不做。这只是
*.converter=JSONConverter
的一个值,如
value.converter=...JSONConverter
只有这样 才能设置
value.converter.schemas.enable=*
。 Avro always 有一个模式,不能被“禁用”


您还将架构发布到错误的路径。

Kafka-key
用于名为 Kafkatopic
key
模式。

您的 Divolte 配置说

topic = clickstream
,例如,不是
Kafka
...另外,如果键和值实际上是不同的有效负载,例如 subjects/clickstream-key
subjects/clickstream-value
,您需要 POST 两个
different
模式.当您执行 HTTP 请求时,您将获得该模式 ID 的响应,您需要将其作为
confluent_id
.

放入您的配置中

无论如何,我建议暂时跳过 Kafka Connect。您可以简单地使用

kafka-avro-console-consumer
调试写入 Kafka 的事件。如果可行,那么您应该使用 Connect 中的
AvroConverter
。更具体地说,“无注册”不使用模式 ID,因此设置
confluent_id = 1
没有意义。

另外,键和值转换器不需要相同。您可以将

--print-keys=true
添加到控制台消费者以将两者反序列化为 Avro。否则,我怀疑你的错误是因为键不是 Avro ...

© www.soinside.com 2019 - 2024. All rights reserved.