我有一个汇合的 JDBC 源连接器,我将其用于 MSSQL 数据库,问题是它在读取 100 行后立即抛出以下错误。我很困惑,因为我没有在同一连接器的任何其他数据库中遇到过这个问题。
Error: WorkerSinkTask{id=Sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:196)
java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1535)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.kafka.common.utils.Time.timer(Time.java:79)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:321)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1454)
[2022-12-14 14:35:43,647] INFO [Sink|task-0] Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:161)
尝试过的解决方案:
我已尝试按照几个论坛中的建议增加批量大小,但仍然存在相同的问题。
java.lang.OutOfMemoryError:Java 堆空间
这意味着您的批量大小对于 Connect 工作线程来说太大。增加批次会使问题变得更糟,因为这意味着需要更多内存。
Connect 的默认最大堆为
2G
...您可以使用环境变量增加工作内存,例如,将大小加倍。
export KAFKA_HEAP_OPTS="-Xms256M -Xmx4G"
bin/connect-distributed.sh ...
否则,您需要减少批量大小。
此外,您还可以尝试使用 Debezium,它将准确捕获 DELETE 和 UPDATE 查询,而 JDBC 源则无法捕获。
如果我能解决问题,我也会遇到同样的事件。