运行弹性搜索接收器连接器时,收到警告遇到非法文档错误 - >响应状态:'BAD_REQUEST'

问题描述 投票:0回答:1

我尝试使用elasticsearch接收器连接器将汇合审计日志捕获到elasticsearch 就我而言, 我为 kafka connect 创建了一个 docker-compose 文件,然后在其上附加弹性接收器连接器配置,之后日志消息将下沉到 elasticsearch 集群。 但是在运行整个过程时,在 kafka 连接日志中我收到一些警告,

[2023-03-17 15:47:28,630] WARN [elastic-sink|task-0] Encountered an illegal document error -> Response status: 'BAD_REQUEST',

2023-03-17 21:17:28 索引: 'confluence-audit-log-events', 2023-03-17 21:17:28 文档 ID:“confluence-audit-log-events+1+36008”。 2023-03-17 21:17:28 忽略并且不会索引记录。 (io.confluence.connect.elasticsearch.ElasticsearchClient:649)

我不知道它为什么会出现,是否有任何配置,我在elasticsearch接收器连接器配置中丢失了,这是否会影响elasticsearch中数据的下沉

有人可以帮我吗?

为了供您参考,我附上了 docker-compose 和 elastic search 配置

用于连接器的 docker-compose

---
version: '3.5'
services:
  connect:
    container_name: kafka-connect
    image: confluentinc/cp-kafka-connect:7.2.4
    container_name: kafka-connect
    ports:
      - "8083:8083"
      - "5005:5005"
    volumes:
      - ./data:/data
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "bootstrap"
      CONNECT_GROUP_ID: "connect-7.3.1"
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs-7.3.1
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets-7.3.1
      CONNECT_STATUS_STORAGE_TOPIC: connect-status-7.3.1
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"  
      #CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "url"
      #CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "username:secret"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: /data
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      #CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.1.jar
      CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All
      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='password';"
      CONNECT_SASL_MECHANISM: PLAIN
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username="org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='password';"
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
      CONNECT_TOPIC_CREATION_ENABLE: "true"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"

elasticsearch 接收器连接器配置文件

curl -X PUT \
     -H "Content-Type: application/json" \
     --data '{
               "tasks.max": "1",
               "connector.class": "ElasticsearchSinkConnector",
               "topics": "confluent-audit-log-events",
               "key.converter": "org.apache.kafka.connect.storage.StringConverter",
               "value.converter": "org.apache.kafka.connect.json.JsonConverter",
               "value.converter.schemas.enable": "false",
               "consumer.override.bootstrap.servers": "bootstrap",
               "consumer.override.sasl.mechanism": "PLAIN",
               "consumer.override.security.protocol": "SASL_SSL",
               "consumer.override.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";",
               "consumer.override.client.dns.lookup": "use_all_dns_ips",
               "schema.ignore": "true", 
               "key.ignore": "true",
               "connection.url": "elasticsearch url",
               "connection.username": "user",
               "connection.password": "password",
               "errors.log.enable": true,
               "errors.log.include.messages": true,
               "write.method": "UPSERT",
               "max.retries": "1",  
               "retry.backoff.ms": "1000",
               "batch.size": "100", 
               "behavior.on.malformed.documents": "WARN",
               "behavior.on.null.values": "IGNORE",
               "linger.ms": "10000"
          }' \
     http://localhost:8083/connectors/elastic-sink/config | jq .
 
elasticsearch apache-kafka apache-kafka-connect
1个回答
0
投票

您必须检查elasticsearch上的映射配置。如果可能的话,请将elasticsearch上的索引模板发送给我

© www.soinside.com 2019 - 2024. All rights reserved.