Kafka elasticsearch连接器-'刷新超时已到期,记录未刷新:'

问题描述 投票:3回答:1

我对kafka有一个奇怪的问题-> elasticsearch连接器。第一次启动时,一切都很棒,我在elasticsearch中收到了新数据,并通过kibana仪表板对其进行了检查,但是当我使用同一生产者应用程序将新数据生成到kafka中并尝试再次启动连接器时,我没有在Elasticsearch中获取任何新数据。现在我收到了这样的错误:

[2018-02-04 21:38:04,987] ERROR WorkerSinkTask{id=log-platform-elastic-0} Commit of offsets threw an unexpected exception for sequence number 14: null (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
org.apache.kafka.connect.errors.ConnectException: Flush timeout expired with unflushed records: 15805

我正在使用下一个命令来运行连接器:

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties log-platform-elastic.properties

connect-avro-standalone.properties

bootstrap.servers=kafka-0.kafka-hs:9093,kafka-1.kafka-hs:9093,kafka-2.kafka-hs:9093
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
# producer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
# consumer.interceptor.classes=io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
#rest.host.name=
rest.port=8084
#rest.advertised.host.name=
#rest.advertised.port=
plugin.path=/usr/share/java

log-platform-elastic.properties

name=log-platform-elastic
key.converter=org.apache.kafka.connect.storage.StringConverter
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=member_sync_log, order_history_sync_log # ... and many others
key.ignore=true
connection.url=http://elasticsearch:9200
type.name=log

我检查了与kafka代理,elasticsearch和schema-registry的连接(此时,schema-registry和连接器位于同一主机上),一切都很好。 Kafka代理在端口9093上运行,我可以使用kafka-avro-console-consumer读取主题数据。如果有任何帮助,我将不胜感激!

elasticsearch apache-kafka apache-kafka-connect confluent
1个回答
0
投票

仅将flush.timeout.ms大于10000(默认为10秒)

根据文档:

flush.timeout.ms超时时间(以毫秒为单位)刷新,以及在等待缓冲区空间可用时完成的请求作为记录添加。如果超过此超时时间任务将失败。

类型:long默认值:10000重要性:低

See documentation

© www.soinside.com 2019 - 2024. All rights reserved.