sftp sink connector 返回 Failed to open new SFTP channel with existing session
错误的堆栈跟踪如下,我们能够使用连接器使用的凭据连接到 sftp 服务器,但是每次连接器尝试写入 sftp 服务器时都会出现此错误:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to open new SFTP channel with existing session.
at io.confluent.connect.sftp.connection.SftpConnection.newChannelFromSession(SftpConnection.java:108)
at io.confluent.connect.sftp.sink.storage.SftpOutputStream.createPath(SftpOutputStream.java:48)
at io.confluent.connect.sftp.sink.storage.SftpOutputStream.<init>(SftpOutputStream.java:44)
at io.confluent.connect.sftp.sink.storage.SftpSinkStorage.create(SftpSinkStorage.java:87)
at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriter.<init>(CsvRecordWriter.java:32)
at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:29)
at io.confluent.connect.sftp.sink.format.csv.CsvRecordWriterProvider.getRecordWriter(CsvRecordWriterProvider.java:12)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.getWriter(TopicPartitionWriter.java:408)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:454)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:250)
at io.confluent.connect.sftp.sink.TopicPartitionWriter.writePartitionWhe..
下面是 sftp 连接器的当前配置
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: update-sftp-sink-connector
namespace: confluent
spec:
name: update-sftp-sink-connector
taskMax: 1
class: io.confluent.connect.sftp.SftpSinkConnector
configs:
topics: updates-topic
file.delim: "."
sftp.host: "my.host.name"
sftp.port: "22"
sftp.username: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:username}
sftp.password: ${file:/mnt/secrets/connect-connector-secrets/sftp-creds:password}
sftp.working.dir: "/updates"
directory.delim: "/"
rotate.interval.ms: "120000"
flush.size: 1
partition.duration.ms: "120000"
partitioner.class: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
format.class: io.confluent.connect.sftp.sink.format.csv.CsvFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
storage.class: io.confluent.connect.sftp.sink.storage.SftpSinkStorage
locale: en-GB
timezone: UTC
timestamp.extractor: Record
restartPolicy:
type: OnFailure
maxRetry: 10
connectClusterRef:
name: connect
namespace: confluent`