我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,并且该 IP 在 AWS 端完全允许。
我打开了
commit_offset_in_finalize=True
并设置了group.id
。也残了enable.auto.commit
.
在工作日志中,我可以看到一直有以下警告产生:
[Consumer clientId=consumer-Reader-2_offset_consumer_452577593_my-group-id-695, groupId=Reader-2_offset_consumer_452577593_my-group-id] Connection to node -3 (b-3-public.some-cluster-name.amazonaws.com/XXX.XXX.XXX.XXX:YYYY) could not be established. Broker may not be available.
[Consumer clientId=consumer-Reader-2_offset_consumer_1356187250_my-group-id-640, groupId=Reader-2_offset_consumer_1356187250_my-group-id] Bootstrap broker b-3-public.some-cluster-name.amazonaws.com:YYYY(id: -3 rack: null) disconnected
org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before the position for partition my-topic-4 could be determined
还有错误:
org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before successfully committing offsets {my-topic-1=OffsetAndMetadata{offset=13610611, leaderEpoch=null, metadata=''}}
事件不多,例如 5/秒,所以根本没有负载。
我登录到托管我的工作的 VM 并运行
toolbox
看看发生了什么。
我注意到一直在创建连接日志以到达 Kafka。具有以下所有参数的是 100-200 个已建立的连接。 Earlir 它超过 300-400 百和 SYN_SENT 连接总共堆积到 2000 个连接,使工作机器根本无法连接到 Kafka。
知道是什么导致了这么多连接吗?
流水线如下所示:
with Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read record from Kafka' >> ReadFromKafka(
consumer_config={
'bootstrap.servers': bootstrap_servers,
'group.id': 'my-group-id',
'default.api.timeout.ms' : '300000',
'enable.auto.commit' : 'false',
'security.protocol': 'SSL',
'ssl.truststore.location': truststore_location,
'ssl.truststore.password': truststore_password,
'ssl.keystore.location': keystore_location,
'ssl.keystore.password': keystore_password,
'ssl.key.password': key_password
},
topics=['my-topic'],
with_metadata=True,
commit_offset_in_finalize=True
)
| 'Format message element to name tuple' >> ParDo(
FormatMessageElement(logger, corrupted_events_table, bq_table_name)
)
| 'Get events row' >> ParDo(
BigQueryEventRow(logger)
)
| 'Write events to BigQuery' >> io.WriteToBigQuery(
table=bq_table_name,
dataset=bq_dataset,
project=project,
schema=event_table_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND,
create_disposition=io.BigQueryDisposition.CREATE_IF_NEEDED,
additional_bq_parameters=additional_bq_parameters,
insert_retry_strategy=RetryStrategy.RETRY_ALWAYS
)
)
这里是开始参数(删除标准参数):
python3 streaming_job.py \
(...)
--runner DataflowRunner \
--experiments=use_runner_v2 \
--number_of_worker_harness_threads=1 \
--experiments=no_use_multiple_sdk_containers \
--sdk_container_image=${DOCKER_IMAGE} \
--sdk_harness_container_image_overrides=".*java.*,${DOCKER_IMAGE_JAVA}"
gcloud dataflow jobs run streaming_job \
(...)
--worker-machine-type=n2-standard-4 \
--num-workers=1 \
--max-workers=10
我尝试修改:
number_of_worker_harness_threads
- 更少的线程,更少的连接
no_use_multiple_sdk_containers
- 每个工人一个 SDK 容器,工人的连接更少
更多资源——更多SDK容器,更多连接
default.api.timeout.ms
- 通过增加它,超时次数减少了
并以上述参数结束。仍然有 100-200 个连接,当其他阶段无事可做时,ReadFromKafka 正在疯狂工作
我遇到了类似的问题(TimeoutException:在确定分区 my-topic-4 的位置之前超时 300000ms 已过期)并通过删除“--experiments=use_runner_v2”选项解决了这个问题。