无法通过引用链从 START_OBJECT 反序列化 `java.lang.String` 实例:java.util.LinkedHashMap[\"key.schema\"

问题描述 投票:0回答:1

嗨,两周以来我一直遇到一个问题,我正在尝试使用 CSV 源连接器在 kafka 分区中上传 CSV。首先,我在 Windows 11 上尝试使用

first.row.as.header:true
,创建了源连接器,但它的状态仍然失败,因为它无法从输入文件推断架构。然后,我尝试在虚拟机中使用ubuntu,但由于同样的原因它不起作用。现在,我删除了标题行并尝试使用我自己的架构。我收到以下错误。

root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-spooldir-00/config  -d @csv-connector.json
HTTP/1.1 500 Internal Server Error
Date: Tue, 14 Nov 2023 10:39:25 GMT
Content-Type: application/json
Content-Length: 299
Server: Jetty(9.4.40.v20210413)

{"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT token\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 447] (through reference chain: java.util.LinkedHashMap[\"key.schema\"])"}root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# 

现在这是我的 docker compose

root@vineet-virtual-machine:/home/vineet/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero# cat docker-compose.yml 
---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.0
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use broker:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.0
    container_name: schema-registry
    ports:
      - "8081:8081"
    depends_on:
      - broker
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:29092

  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.2.0
    container_name: kafka-connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
    #  ---------------
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    # If you want to use the Confluent Hub installer to d/l component, but make them available
    # when running this offline, spin up the stack once and then run : 
    #   docker cp kafka-connect:/usr/share/confluent-hub-components ./data/connect-jars
    volumes:
      - $PWD/data:/data
    # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:2.0.0
        confluent-hub install --no-prompt jcustenborder/kafka-connect-spooldir:2.0.60
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.21.0
    container_name: ksqldb
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      KSQL_STREAMS_PRODUCER_MAX_BLOCK_MS: 9223372036854775807
      KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'

  control-center:
    image: confluentinc/cp-enterprise-control-center:6.2.0
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: 'kafka-connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_KSQL_KSQLDB_URL: "http://ksqldb:8088"
      # The advertised URL needs to be the URL on which the browser
      #  can access the KSQL server (e.g. http://localhost:8088/info)
      CONTROL_CENTER_KSQL_KSQLDB_ADVERTISED_URL: "http://localhost:8088"
      # -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
      # Useful settings for development/laptop use - modify as needed for Prod
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
      CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
    command:
      - bash
      - -c 
      - |
        echo "Waiting two minutes for Kafka brokers to start and 
               necessary topics to be available"
        sleep 120  
        /etc/confluent/docker/run

# Other systems
  mysql:
    # *-----------------------------*
    # To connect to the DB:
    #   docker exec -it mysql bash -c 'mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
    # *-----------------------------*
    image: mysql:8.0
    container_name: mysql
    ports:
      - 3306:3306
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw
    volumes:
     - ${PWD}/data/mysql:/docker-entrypoint-initdb.d
     - ${PWD}/data:/data

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.10.1
    container_name: elasticsearch
    hostname: elasticsearch
    ports:
      - 9200:9200
    environment:
      xpack.security.enabled: "false"
      ES_JAVA_OPTS: "-Xms1g -Xmx1g"
      discovery.type: "single-node"

  kibana:
    image: docker.elastic.co/kibana/kibana:7.10.1
    container_name: kibana
    hostname: kibana
    depends_on:
      - elasticsearch
    ports:
      - 5601:5601
    environment:
      xpack.security.enabled: "false"
      discovery.type: "single-node"
    command:
      - bash
      - -c
      - |
        /usr/local/bin/kibana-docker &
        echo "Waiting for Kibana to be ready ⏳"
        while [ $$(curl -H 'kbn-xsrf: true' -s -o /dev/null -w %{http_code} http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) -ne 200 ] ; do 
          echo -e "\t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) " (waiting for 200)"
          sleep 5  
        done

        echo -e "\t" $$(date) " Kibana saved objects request response: " $$(curl -H 'kbn-xsrf: true' -o /dev/null -w %{http_code} -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*) $$(curl -H 'kbn-xsrf: true' -s http://localhost:5601/api/saved_objects/_find?type=index-pattern&search_fields=title&search=*)

        echo -e "\n--\n+> Pre-creating index pattern"
        curl -s -XPOST 'http://localhost:5601/api/saved_objects/index-pattern/mysql-debezium-asgard.demo.orders' \
          -H 'kbn-xsrf: nevergonnagiveyouup' \
          -H 'Content-Type: application/json' \
          -d '{"attributes":{"title":"mysql-debezium-asgard.demo.orders","timeFieldName":"CREATE_TS"}}'

        echo -e "\n--\n+> Setting the index pattern as default"
        curl -s -XPOST 'http://localhost:5601/api/kibana/settings' \
          -H 'kbn-xsrf: nevergonnagiveyouup' \
          -H 'content-type: application/json' \
          -d '{"changes":{"defaultIndex":"mysql-debezium-asgard.demo.orders"}}'

        echo -e "\n--\n+> Opt out of Kibana telemetry"
        curl 'http://localhost:5601/api/telemetry/v2/optIn' \
            -H 'kbn-xsrf: nevergonnagiveyouup' \
            -H 'content-type: application/json' \
            -H 'accept: application/json' \
            --data-binary '{"enabled":false}' \
            --compressed

        sleep infinity

  neo4j:
    image: neo4j:4.2.3
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    entrypoint: 
      - /bin/sh 
      - -c 
      - |
        apk add jq; 
        while [ 1 -eq 1 ];do sleep 60;done

这是我的源连接器

vineet@vineet-virtual-machine:~/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero$ cat csv-connector.json 
{
        "connector.class": "com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector",
        "topic": "inventory_spooldir_01",
        "input.path": "/data/unprocessed",
        "finished.path": "/data/processed",
        "error.path": "/data/error",
        "input.file.pattern": ".*\\.csv",
        "schema.generation.key.name":"book",
    "schema.generation.value.name":"books",
    "schema.generation.enabled" : "false",
    "key.schema": {
                "name": "com.github.jcustenborder.kafka.connect.model.Key",
                "type": "STRUCT",
                "isOptional": false,
                "fieldSchemas": {
                  "GUID": {
                    "type": "STRING",
                    "isOptional": false
                    }
                  }
              },
         "value.schema": {
                "name": "com.github.jcustenborder.kafka.connect.model.Value",
                "type": "STRUCT",
                "isOptional": false,
                "fieldSchemas": {
                  "ID": {
                    "type": "STRING",
                    "isOptional": true
                   },
                  "NAME": {
                    "type": "STRING",
                    "isOptional": true
                   },
                  "AUTHOR": {
                    "type": "STRING",
                    "isOptional": true
                  },
                  "PRICE": {
                        "type": "STRING",
                        "isOptional": true
                  },
                  "STOCK": {
                        "type": "STRING",
                        "isOptional": true
                  },
                  "PUBLICATION": {
                        "type": "STRING",
                        "isOptional": true
                  },
                  "GUID": {
                        "type": "STRING",
                        "isOptional": false
                  }

                }
             }
        }

这是 100 万行中的前几行。我在本地使用nodejs脚本手动创建了它。

vineet@vineet-virtual-machine:~/Downloads/kafka-connect/demo-scene-master/kafka-connect-zero-to-hero$ head ./data/unprocessed/testdata.csv 
1,Ricky,Lawrence,$2641.54,8394,Tmh,11ae5913-3efe-4526-a66c-9c0e91b704bc
2,Devin,Thomas,$9588.26,5173,Evergreen,6a1e4f8f-e69d-40ae-8ee1-81ea58805f3a
3,Ernest,Henry,$3938.58,1297,Evergreen,dcadba97-81a4-4e17-8f1c-5a8a4f01b777
4,Susie,King,$8660.66,4209,Bhawan,0e3b4283-bcc4-498c-8293-c861625fc045
5,Lina,Jones,$670.80,1613,Bhawan,86edcbab-b599-40e2-b564-340ebe820243
6,Troy,McCoy,$132.60,2956,Tmh,a8a4c6ef-3dc9-4978-b766-97ee7b4444af
7,Ola,White,$2066.71,5027,Tmh,64d3be45-b2f0-47ad-b1c0-3dfc0892d6bc
8,David,Watkins,$3660.59,5276,Bharati,6f738ca5-d0ee-46b9-8aea-41ab3a2ab9a2
9,Effie,Kennedy,$8158.57,9929,Tmh,c5caec0d-0fd5-403f-aa90-759eab8ff45f
10,Elva,Harris,$8996.29,8765,Arihant,0ea64116-6b27-4586-ae04-1580165ab703

此外,我已授予 testdata.csv 777 权限。谢谢

json apache-kafka apache-kafka-connect
1个回答
0
投票

key.schema
value.schema
必须是内联、转义的 JSON 对象。

Kafka Connect 值只能是字符串/数字/布尔值,不能是对象。

© www.soinside.com 2019 - 2024. All rights reserved.