Kafka Mongo Sink 问题-Monitoring.interceptor.connector-consumer-mongo-sink 节点断开连接

问题描述 投票:0回答:1

关键问题: confluence.monitoring.interceptor.connector-consumer-mongo-sink-0] 节点 -1 已断开连接

Kafka 是 Confluence 7.5.3,适用于 Connect、Zookeeper 和 Mongo 是最新的 7.0。一切都是通过 SSL 设置的。我正在使用 Kafka-connect-mongodb 旧版本和新版本 1.11.2 两个罐子都没有运气

   broker:
    image: confluentinc/cp-enterprise-kafka:7.5.3

   connect:
    image: confluentinc/cp-kafka-connect:7.5.3

连接

CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.3.jar

问题是当调用 Mongo Sink 连接器 API 时,我收到如下错误:

> [2024-03-07 00:05:50,832] DEBUG Bulk writing 4 document(s) into collection [mydb.products] (com.mongodb.kafka.connect.sink.MongoSinkTask)
> [2024-03-07 00:05:51,061] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
> [2024-03-07 00:05:51,064] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Cancelled in-flight API_VERSIONS request with correlation id 0 due to node -1 being disconnected (elapsed time since creation: 263ms, elapsed time since send: 263ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient)
> [2024-03-07 00:05:51,064] WARN [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Bootstrap broker broker.local:19092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
> [2024-03-07 00:05:51,142] INFO Cluster created with settings {hosts=[mongo1:27017, mongo2:27017, mongo3:27017], mode=MULTIPLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,142] INFO Adding discovered server mongo1:27017 to client view of cluster (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,151] INFO Adding discovered server mongo2:27017 to client view of cluster (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,152] INFO Adding discovered server mongo3:27017 to client view of cluster (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,161] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,179] INFO Opened connection [connectionId{localValue:2}] to mongo3:27017 (org.mongodb.driver.connection)
> [2024-03-07 00:05:51,179] INFO Opened connection [connectionId{localValue:3}] to mongo2:27017 (org.mongodb.driver.connection)
> [2024-03-07 00:05:51,179] INFO Opened connection [connectionId{localValue:1}] to mongo1:27017 (org.mongodb.driver.connection)
> [2024-03-07 00:05:51,184] INFO Monitor thread successfully connected to server with description ServerDescription{address=mongo1:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[7, 0, 6]}, minWireVersion=0, maxWireVersion=21, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3293648, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null, lastWriteDate=null, lastUpdateTimeNanos=704151849030593} (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,184] INFO Monitor thread successfully connected to server with description ServerDescription{address=mongo2:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[7, 0, 6]}, minWireVersion=0, maxWireVersion=21, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3288036, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null, lastWriteDate=null, lastUpdateTimeNanos=704151849030588} (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,184] INFO Monitor thread successfully connected to server with description ServerDescription{address=mongo3:27017, type=REPLICA_SET_GHOST, state=CONNECTED, ok=true, version=ServerVersion{versionList=[7, 0, 6]}, minWireVersion=0, maxWireVersion=21, maxDocumentSize=16777216, logicalSessionTimeoutMinutes=30, roundTripTimeNanos=3390299, setName='null', canonicalAddress=null, hosts=[], passives=[], arbiters=[], primary='null', tagSet=TagSet{[]}, electionId=null, setVersion=null, lastWriteDate=null, lastUpdateTimeNanos=704151849035558} (org.mongodb.driver.cluster)
> [2024-03-07 00:05:51,733] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)
> [2024-03-07 00:05:51,734] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Cancelled in-flight API_VERSIONS request with correlation id 1 due to node -1 being disconnected (elapsed time since creation: 167ms, elapsed time since send: 167ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient)
> [2024-03-07 00:05:51,734] WARN [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Bootstrap broker broker.local:19092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

执行 shell 脚本来启动 docker compose 后,我看到主机名/Ip 上的反对不匹配,如下所示:

Connecting to:          mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&tls=true&tlsCAFile=%2Fdata%2Fssl%2Fmongodb-ca.crt&tlsCertificateKeyFile=%2Fdata%2Fssl%2Fmongodb.pem&appName=mongosh+2.1.5
MongoServerSelectionError: Hostname/IP does not match certificate's altnames: IP: 127.0.0.1 is not in the cert's list:
bduser@willow-spr10:~/dpaul/kafka-docker-ssl/mongo$

docker-compose 看起来像:


    ---
version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.3
    container_name: zookeeper
    networks:
       kafka:
          aliases:
            - zookeeper.local
       mongo_network:
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: ERROR
      ZOOKEEPER_LOG4J_LOGLEVEL: ERROR
    volumes:
      - zk-data:/var/lib/zookeeper/data
      - zk-txn-logs:/var/lib/zookeeper/log
    
  broker:
    image: confluentinc/cp-enterprise-kafka:7.5.3
    container_name: broker
    networks:
       kafka:
          aliases:
            - broker.local
       mongo_network:
    depends_on:
      - zookeeper
    ports:
      - "19092:19092"
      - "9092:9092"
    environment:
      KAFKA_LOG4J_ROOT_LOGLEVEL: ERROR
      KAFKA_LOG4J_LOGLEVEL: ERROR
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper.local:22181
      KAFKA_LISTENERS: SSL://broker.local:19092
      KAFKA_ADVERTISED_LISTENERS: SSL://broker.local:19092
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.broker.keystore.jks
      KAFKA_SSL_KEYSTORE_CREDENTIALS: broker_keystore_creds
      KAFKA_SSL_KEY_CREDENTIALS: broker_sslkey_creds
      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.broker.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: broker_truststore_creds
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      KAFKA_LISTENER_NAME_INTERNAL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      KAFKA_SSL_CLIENT_AUTH: requested
      KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SSL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker.local:19092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper.local:22181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_REPORTER_SECURITY_PROTOCOL: SSL
      CONFLUENT_METRICS_REPORTER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.metrics.truststore.jks
      CONFLUENT_METRICS_REPORTER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.metrics.keystore.jks
      CONFLUENT_METRICS_REPORTER_SSL_TRUSTSTORE_PASSWORD: awesomekafka
      CONFLUENT_METRICS_REPORTER_SSL_KEYSTORE_PASSWORD: awesomekafka
      CONFLUENT_METRICS_REPORTER_SSL_KEY_PASSWORD: awesomekafka
      CONFLUENT_METRICS_ENABLE: "true"
      CONFLUENT_SUPPORT_CUSTOMER_ID: anonymous
    volumes:
      - kafka-data:/var/lib/kafka/data
      - ./secrets:/etc/kafka/secrets

  connect:
    image: confluentinc/cp-kafka-connect:7.5.3
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8083:8083"
    networks:
       kafka:
          aliases:
            - connect.local
       mongo_network:
    environment:
      CONNECT_REST_ACCESS_CONTROL_ALLOW_METHODS: 'GET,POST,PUT,DELETE,OPTIONS'
      CONNECT_REST_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      CONNECT_BOOTSTRAP_SERVERS: SSL://broker.local:19092
      # CONNECT_LISTENERS: 'https://0.0.0.0:8083'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter

      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,com.mongodb.kafka=DEBUG"
      CONNECT_PLUGIN_PATH: '/usr/share/confluent-hub-components/kafka-connect-mongodb/lib/mongo-kafka-connect-1.11.2-confluent.jar,/usr/share/java, /usr/share/confluent-hub-components'
      CONNECT_ZOOKEEPER_CONNECT: zookeeper.local:22181

      # CONNECT_SSL_CLIENT_AUTH: 'required'
      CONNECT_SSL_KEY_PASSWORD: awesomekafka
      CONNECT_SECURITY_PROTOCOL: SSL
      CONNECT_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.connect.truststore.jks
      CONNECT_SSL_TRUSTSTORE_PASSWORD: awesomekafka
      CONNECT_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.connect.keystore.jks
      CONNECT_SSL_KEYSTORE_PASSWORD: awesomekafka

      CONNECT_PRODUCER_SECURITY_PROTOCOL: SSL
      CONNECT_PRODUCER_BOOTSTRAP_SERVERS: SSL://broker.local:19092
      CONNECT_PRODUCER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.connect.truststore.jks
      CONNECT_PRODUCER_SSL_TRUSTSTORE_PASSWORD: awesomekafka
      CONNECT_PRODUCER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.connect.keystore.jks
      CONNECT_PRODUCER_SSL_KEYSTORE_PASSWORD: awesomekafka

      CONNECT_CONSUMER_SECURITY_PROTOCOL: SSL    
      CONNECT_CONSUMER_BOOTSTRAP_SERVERS: SSL://broker.local:19092
      CONNECT_CONSUMER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.connect.truststore.jks  
      CONNECT_CONSUMER_SSL_TRUSTSTORE_PASSWORD: awesomekafka
      CONNECT_CONSUMER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.connect.keystore.jks
      CONNECT_CONSUMER_SSL_KEYSTORE_PASSWORD: awesomekafka
      
      # Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.3.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
    command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
      # command: 
      #   "bash -c echo \"Installing Connector\" ;  confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.1.6 ; echo \"Launching Kafka Connect worker\" ; /etc/confluent/docker/run &sleep infinity"
    volumes:
      - ./kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb
      - ./secrets:/etc/kafka/secrets
  
   #MongoDB Replica Set

  mongo1:
    hostname: mongo1
    image: mongo
    container_name: mongo1
    ports:
      - 27017:27017
    restart: always
    entrypoint: [ "/usr/bin/mongod", "--config", "/data/config/ssl.conf", "--bind_ip_all", "--replSet", "dbrs" ]

    networks:
      - mongo_network
    volumes:
      - ./.db/mongo1:/data/db
      - ./wait-for-mongodb.sh:/scripts/wait-for-mongodb.sh
      - ./init.sh:/scripts/init.sh
      - ./ssl:/data/ssl
      - ./config:/data/config
    links:
      - mongo2
      - mongo3

  mongo2:
    hostname: mongo2
    image: mongo
    container_name: mongo2
    ports:
      - 27018:27017
    restart: always
    entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "dbrs" ]
    networks:
      - mongo_network
    volumes:
      - ./.db/mongo2:/data/db
      - ./ssl:/data/ssl
      - ./wait-for-mongodb.sh:/scripts/wait-for-mongodb.sh

  
  mongo3:
    hostname: mongo3
    image: mongo
    container_name: mongo3
    ports:
      - 27019:27017
    restart: always
    entrypoint: [ "/usr/bin/mongod", "--bind_ip_all", "--replSet", "dbrs" ]
    networks:
      - mongo_network
    volumes:
      - ./.db/mongo3:/data/db
      - ./ssl:/data/ssl
      - ./wait-for-mongodb.sh:/scripts/wait-for-mongodb.sh

  kafka-tools:
    build:
      context: ./kafka-tools
    depends_on:
      - broker
    container_name: kafka-tools
    networks:
       kafka:
          aliases:
            - kafka-tools.local
       mongo_network:
          
    environment:
      KAFKA_LOG4J_ROOT_LOGLEVEL: ERROR
      KAFKA_SECURITY_PROTOCOL: SSL
      KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      KAFKA_HEAP_OPTS: "-Xmx1G -Xms1G"
      KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/kafka.kafka-tools.truststore.jks
      KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/kafka.kafka-tools.keystore.jks
      KAFKA_SSL_TRUSTSTORE_PASSWORD: awesomekafka
      KAFKA_SSL_KEYSTORE_PASSWORD: awesomekafka
      KAFKA_SSL_KEY_PASSWORD: awesomekafka
    volumes:
      - ./secrets:/etc/kafka/secrets
      - ./config:/etc/kafka/config


volumes:
  zk-data:
    external: true
  zk-txn-logs:
    external: true
  kafka-data:
    external: true

networks:
  kafka:
  localnet:
    attachable: true
  mongo_network:
    driver: bridge
  

下面是 mong-sink 连接器 json 的样子:

> 
> {
>   "name": "mongo-sink",
>   "config": {
>     "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
>     "tasks.max": "1",
>     "topics": "mytopic",
>     "connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
>     "database": "mydb",
>     "collection": "products",
>     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
>     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
>     "key.converter.schemas.enable": "false",
>     "value.converter.schemas.enable": "false",
>     "mongo.errors.tolerance": "all",
>     "mongo.errors.log.enable": "true",
>     "errors.log.include.messages": "true",
>     "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy",
>     "name": "mongo-sink"
>   },
>   "tasks": [],
>   "type": "sink"
> }

请注意,我尝试将其更改为以下内容,但没有成功:

 "connection.uri": "mongodb://mongo1:27017,mongo2:27017,mongo3:27017/?ssl=true",

Mongo SSL 配置如下所示:

config/ssl.conf 如下:

> net:
>   ssl:
>     mode: allowSSL
>     PEMKeyFile: "/data/ssl/mongodb.pem"
>     CAFile: "/data/ssl/mongodb-cert.crt"
> storage:
>   dbPath: "/data/db"
> net:
>   bindIp: 127.0.0.1
>   port: 27017

我非常确定 mongodb 副本集工作正常,但 mongo 接收器连接器任务存在问题,如下所示:

> [2024-03-07 00:06:21,165] ERROR Error on mongodb operation (com.mongodb.kafka.connect.sink.MongoSinkTask)
> com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=mongo3:27017, type=REPLICA_SET_GHOST, roundTripTime=0.7 ms, state=CONNECTED}, {address=mongo2:27017, type=REPLICA_SET_GHOST, roundTripTime=0.6 ms, state=CONNECTED}, {address=mongo1:27017, type=REPLICA_SET_GHOST, roundTripTime=0.7 ms, state=CONNECTED}]
>         at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
>         at com.mongodb.internal.connection.AbstractMultiServerCluster.getDescription(AbstractMultiServerCluster.java:52)
>         at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:136)
>         at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:94)
>         at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:249)
>         at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:190)
>         at com.mongodb.client.internal.MongoCollectionImpl.executeBulkWrite(MongoCollectionImpl.java:468)
>         at com.mongodb.client.internal.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:448)
>         at com.mongodb.kafka.connect.sink.MongoSinkTask.processSinkRecords(MongoSinkTask.java:177)
>         at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$2(MongoSinkTask.java:117)
>         at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>         at com.mongodb.kafka.connect.sink.MongoSinkTask.lambda$put$3(MongoSinkTask.java:116)
>         at java.base/java.util.HashMap.forEach(HashMap.java:1337)
>         at com.mongodb.kafka.connect.sink.MongoSinkTask.put(MongoSinkTask.java:114)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:593)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:342)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:242)
>         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:211)
>         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
>         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
>         at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
>         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> [2024-03-07 00:06:21,168] ERROR Writing 4 document(s) into collection [mydb.products] failed -> remaining retries (3) (com.mongodb.kafka.connect.sink.MongoSinkTask)

我已经使用curl https://broker.local:19092和curl -v https://mongo1:27017从连接节点进行了测试,响应是肯定的

> 
> *   Trying 192.168.xx.x...
> * TCP_NODELAY set
> * Connected to broker.local (192.168.xx.xx) port 19092 (#0)
> * ALPN, offering h2
> * ALPN, offering http/1.1
> * successfully set certificate verify locations:
> *   CAfile: /etc/pki/tls/certs/ca-bundle.crt
>   CApath: none
> * TLSv1.3 (OUT), TLS handshake, Client hello (1):
> * TLSv1.3 (IN), TLS handshake, Server hello (2):
> * TLSv1.3 (IN), TLS handshake, [no content] (0):
> * TLSv1.3 (IN), TLS handshake, Encrypted Extensions (8):
> * TLSv1.3 (IN), TLS handshake, Request CERT (13):
> * TLSv1.3 (IN), TLS handshake, Certificate (11):
> * TLSv1.3 (OUT), TLS alert, unknown CA (560):

从 mongo connect 容器 docker 日志初始化的配置参数如下:

   [2024-03-07 00:26:37,270] INFO ConsumerConfig values:
 allow.auto.create.topics = true
 auto.commit.interval.ms = 5000
 auto.include.jmx.reporter = true
 auto.offset.reset = earliest
 bootstrap.servers = [SSL://broker.local:19092]
 check.crcs = true
 client.dns.lookup = use_all_dns_ips
 client.id = compose-connect-group--offsets
 client.rack =
 connections.max.idle.ms = 540000
 default.api.timeout.ms = 60000
 enable.auto.commit = false
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id = compose-connect-group
 group.instance.id = null
 heartbeat.interval.ms = 3000
 interceptor.classes = []
 internal.leave.group.on.close = true
 internal.throw.on.fetch.stable.offset.unsupported = false
 isolation.level = read_uncommitted
 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 300000
 max.poll.records = 500
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 30000
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.connect.timeout.ms = null
 sasl.login.read.timeout.ms = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.login.retry.backoff.max.ms = 10000
 sasl.login.retry.backoff.ms = 100
 sasl.mechanism = GSSAPI
 sasl.oauthbearer.clock.skew.seconds = 30
 sasl.oauthbearer.expected.audience = null
 sasl.oauthbearer.expected.issuer = null
 sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
 sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
 sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
 sasl.oauthbearer.jwks.endpoint.url = null
 sasl.oauthbearer.scope.claim.name = scope
 sasl.oauthbearer.sub.claim.name = sub
 sasl.oauthbearer.token.endpoint.url = null
 security.protocol = SSL
 security.providers = null
 send.buffer.bytes = 131072
 session.timeout.ms = 45000
 socket.connection.setup.timeout.max.ms = 30000
 socket.connection.setup.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
 ssl.endpoint.identification.algorithm = https
 ssl.engine.factory.class = null
 ssl.key.password = [hidden]
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.certificate.chain = null
 ssl.keystore.key = null
 ssl.keystore.location = /etc/kafka/secrets/kafka.connect.keystore.jks
 ssl.keystore.password = [hidden]
 ssl.keystore.type = JKS
 ssl.protocol = TLSv1.3
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.certificates = null
 ssl.truststore.location = /etc/kafka/secrets/kafka.connect.truststore.jks
 ssl.truststore.password = [hidden]
 ssl.truststore.type = JKS
 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
>  (org.apache.kafka.clients.consumer.ConsumerConfig)
apache-kafka docker-compose apache-kafka-connect mongo-connector
1个回答
0
投票

问题解决了... 关键问题如下:

Connecting to:          mongodb://127.0.0.1:27017/?directConnection=true&serverSelectionTimeoutMS=2000&tls=true&tlsCAFile=%2Fdata%2Fssl%2Fmongodb-ca.crt&tlsCertificateKeyFile=%2Fdata%2Fssl%2Fmongodb.pem&appName=mongosh+2.1.5
MongoServerSelectionError: Hostname/IP does not match certificate's altnames: IP: 127.0.0.1 is not in the cert's list:
bduser@willow-spr10:~/dpaul/kafka-docker-ssl/mongo$

主要通过在 TLS 配置中提及 --tlsAllowInvalidHostnames 来创建和启动 mongoDB 副本集来解决。

#!/bin/bash

mongosh --tls  --tlsAllowInvalidHostnames  --tlsCAFile /data/ssl/mongodb-ca.crt --tlsCertificateKeyFile /data/ssl/mongodb.pem <<EOF
var config = {
    "_id": "dbrs",
    "version": 1,
    "members": [
        {
            "_id": 1,
            "host": "mongo1:27017",
            "priority": 3
        }
    ]
};
rs.initiate(config, { force: true });
rs.status();

var config = {
    "_id": "dbrs",
    "version": 1,
    "members": [
        {
            "_id": 2,
            "host": "mongo2:27017",
            "priority": 2
        },
        {
            "_id": 3,
            "host": "mongo3:27017",
            "priority": 1
        }
    ]
};
rs.initiate(config, { force: true });
rs.status();
EOF

PS:我仍然遇到 Node -1 的问题,这仍然具有误导性和用词不当,我不知道这是否是最新的 mongo 或 kafka 连接错误...但现在忽略警告,因为我的问题在漫长的传奇之后解决了: -)

INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-mongo-sink-0] Cancelled in-flight API_VERSIONS request with correlation id 1086 due to node -1 being disconnected (elapsed time since creation: 51ms, elapsed time since send: 51ms, request timeout: 30000ms) (org.apache.kafka.clients.NetworkClient)
© www.soinside.com 2019 - 2024. All rights reserved.