Confluent Sftp 接收器连接器返回无法使用现有会话打开新的 SFTP 通道

问题描述 投票:0回答:0

sftp sink connector 返回 Failed to open new SFTP channel with existing session

错误的堆栈跟踪如下,我们能够使用连接器使用的凭据连接到 sftp 服务器,但是每次连接器尝试写入 sftp 服务器时都会出现此错误:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to open new SFTP channel with existing session.
    at io.confluent.connect.sftp.connection.SftpConnection.newChannelFromSession(SftpConnection.java:108)
    at io.confluent.connect.sftp.sink.storage.SftpOutputStream.createPath(SftpOutputStream.java:48)
    at io.confluent.connect.sftp.sink.storage.SftpOutputStream.<init>(SftpOutputStream.java:44)
    at io.confluent.connect.sftp.sink.storage.SftpSinkStorage.create(SftpSinkStorage.java:87)
    at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriter.<init>(CsvRecordWriter.java:32)
    at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:29)
    at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:12)
    at io.confluent.connect.sftp.sink.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:408)
    at io.confluent.connect.sftp.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:454)
    at io.confluent.connect.sftp.sink.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:250)
    at io.confluent.connect.sftp.sink.TopicPartitionWriter.writePartitionWhe..

下面是 sftp 连接器的当前配置

apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
  name: update-sftp-sink-connector
  namespace: confluent
spec:
  name: update-sftp-sink-connector
  taskMax: 1
  class: io.confluent.connect.sftp.SftpSinkConnector
  configs:
    topics: updates-topic
    file.delim: "."
    sftp.host: "my.host.name"
    sftp.port: "22"
    sftp.username: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:username}
    sftp.password: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:password}
    sftp.working.dir: "/updates"
    directory.delim: "/"
    rotate.interval.ms: "120000"
    flush.size: 1
    partition.duration.ms: "120000"
    partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
    format.class: io.confluent.connect.sftp.sink.format.csv.CsvFormat
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    storage.class: io.confluent.connect.sftp.sink.storage.SftpSinkStorage
    locale: en-GB
    timezone: UTC
    timestamp.extractor: Record
  restartPolicy:
    type: OnFailure
    maxRetry: 10
  connectClusterRef:
    name: connect
    namespace: confluent`
apache-kafka sftp apache-kafka-connect confluent-platform
© www.soinside.com 2019 - 2024. All rights reserved.