我尝试使用elasticsearch接收器连接器将汇合审计日志捕获到elasticsearch 就我而言, 我为 kafka connect 创建了一个 docker-compose 文件,然后在其上附加弹性接收器连接器配置,之后日志消息将下沉到 elasticsearch 集群。 但是在运行整个过程时,在 kafka 连接日志中我收到一些警告,
[2023-03-17 15:47:28,630] WARN [elastic-sink|task-0] Encountered an illegal document error -> Response status: 'BAD_REQUEST',
2023-03-17 21:17:28 索引: 'confluence-audit-log-events', 2023-03-17 21:17:28 文档 ID:“confluence-audit-log-events+1+36008”。 2023-03-17 21:17:28 忽略并且不会索引记录。 (io.confluence.connect.elasticsearch.ElasticsearchClient:649)
我不知道它为什么会出现,是否有任何配置,我在elasticsearch接收器连接器配置中丢失了,这是否会影响elasticsearch中数据的下沉
有人可以帮我吗?
为了供您参考,我附上了 docker-compose 和 elastic search 配置
用于连接器的 docker-compose
---
version: '3.5'
services:
connect:
container_name: kafka-connect
image: confluentinc/cp-kafka-connect:7.2.4
container_name: kafka-connect
ports:
- "8083:8083"
- "5005:5005"
volumes:
- ./data:/data
environment:
CONNECT_BOOTSTRAP_SERVERS: "bootstrap"
CONNECT_GROUP_ID: "connect-7.3.1"
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs-7.3.1
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets-7.3.1
CONNECT_STATUS_STORAGE_TOPIC: connect-status-7.3.1
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
#CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "url"
#CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: "username:secret"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: /data
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
#CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.1.jar
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: All
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='password';"
CONNECT_SASL_MECHANISM: PLAIN
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username="org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='password';"
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
CONNECT_TOPIC_CREATION_ENABLE: "true"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
elasticsearch 接收器连接器配置文件
curl -X PUT \
-H "Content-Type: application/json" \
--data '{
"tasks.max": "1",
"connector.class": "ElasticsearchSinkConnector",
"topics": "confluent-audit-log-events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"consumer.override.bootstrap.servers": "bootstrap",
"consumer.override.sasl.mechanism": "PLAIN",
"consumer.override.security.protocol": "SASL_SSL",
"consumer.override.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"\" password=\"\";",
"consumer.override.client.dns.lookup": "use_all_dns_ips",
"schema.ignore": "true",
"key.ignore": "true",
"connection.url": "elasticsearch url",
"connection.username": "user",
"connection.password": "password",
"errors.log.enable": true,
"errors.log.include.messages": true,
"write.method": "UPSERT",
"max.retries": "1",
"retry.backoff.ms": "1000",
"batch.size": "100",
"behavior.on.malformed.documents": "WARN",
"behavior.on.null.values": "IGNORE",
"linger.ms": "10000"
}' \
http://localhost:8083/connectors/elastic-sink/config | jq .
您必须检查elasticsearch上的映射配置。如果可能的话,请将elasticsearch上的索引模板发送给我