GCP Dataflow ReadFromKafka 创建大量连接

问题描述 投票:0回答:1

我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,并且该 IP 在 AWS 端完全允许。

我打开了

commit_offset_in_finalize=True
并设置了
group.id
。也残了
enable.auto.commit
.

在工作日志中,我可以看到一直有以下警告产生:

[Consumer clientId=consumer-Reader-2_offset_consumer_452577593_my-group-id-695, groupId=Reader-2_offset_consumer_452577593_my-group-id] Connection to node -3 (b-3-public.some-cluster-name.amazonaws.com/XXX.XXX.XXX.XXX:YYYY) could not be established. Broker may not be available.

[Consumer clientId=consumer-Reader-2_offset_consumer_1356187250_my-group-id-640, groupId=Reader-2_offset_consumer_1356187250_my-group-id] Bootstrap broker b-3-public.some-cluster-name.amazonaws.com:YYYY(id: -3 rack: null) disconnected

org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before the position for partition my-topic-4 could be determined

还有错误:

org.apache.kafka.common.errors.TimeoutException: Timeout of 300000ms expired before successfully committing offsets {my-topic-1=OffsetAndMetadata{offset=13610611, leaderEpoch=null, metadata=''}}

事件不多,例如 5/秒,所以根本没有负载。

我登录到托管我的工作的 VM 并运行

toolbox
看看发生了什么。

我注意到一直在创建连接日志以到达 Kafka。具有以下所有参数的是 100-200 个已建立的连接。 Earlir 它超过 300-400 百和 SYN_SENT 连接总共堆积到 2000 个连接,使工作机器根本无法连接到 Kafka。

知道是什么导致了这么多连接吗?

流水线如下所示:

with Pipeline(options=pipeline_options) as pipeline:
    (
        pipeline
        |   'Read record from Kafka' >> ReadFromKafka(
                consumer_config={
                    'bootstrap.servers': bootstrap_servers,
                    'group.id': 'my-group-id',
                    'default.api.timeout.ms' : '300000',
                    'enable.auto.commit' : 'false',
                    'security.protocol': 'SSL',
                    'ssl.truststore.location': truststore_location,
                    'ssl.truststore.password': truststore_password,
                    'ssl.keystore.location': keystore_location,
                    'ssl.keystore.password': keystore_password,
                    'ssl.key.password': key_password
                },
                topics=['my-topic'],
                with_metadata=True,
                commit_offset_in_finalize=True
            )

        |   'Format message element to name tuple' >> ParDo(
                FormatMessageElement(logger, corrupted_events_table, bq_table_name)
            )

        |   'Get events row' >> ParDo(
                BigQueryEventRow(logger)
            )

        |   'Write events to BigQuery' >> io.WriteToBigQuery(
                table=bq_table_name,
                dataset=bq_dataset,
                project=project,
                schema=event_table_schema,
                write_disposition=io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=io.BigQueryDisposition.CREATE_IF_NEEDED,
                additional_bq_parameters=additional_bq_parameters,
                insert_retry_strategy=RetryStrategy.RETRY_ALWAYS
            )
    )

这里是开始参数(删除标准参数):

python3 streaming_job.py \
(...)
    --runner DataflowRunner \
    --experiments=use_runner_v2 \
    --number_of_worker_harness_threads=1 \
    --experiments=no_use_multiple_sdk_containers \
    --sdk_container_image=${DOCKER_IMAGE} \
    --sdk_harness_container_image_overrides=".*java.*,${DOCKER_IMAGE_JAVA}"

    
gcloud dataflow jobs run streaming_job \
(...)
    --worker-machine-type=n2-standard-4 \
    --num-workers=1 \
    --max-workers=10

我尝试修改:

number_of_worker_harness_threads
- 更少的线程,更少的连接

no_use_multiple_sdk_containers
- 每个工人一个 SDK 容器,工人的连接更少

更多资源——更多SDK容器,更多连接

default.api.timeout.ms
- 通过增加它,超时次数减少了

并以上述参数结束。仍然有 100-200 个连接,当其他阶段无事可做时,ReadFromKafka 正在疯狂工作

python google-cloud-platform apache-kafka google-cloud-dataflow apache-beam
1个回答
0
投票

我遇到了类似的问题(TimeoutException:在确定分区 my-topic-4 的位置之前超时 300000ms 已过期)并通过删除“--experiments=use_runner_v2”选项解决了这个问题。

© www.soinside.com 2019 - 2024. All rights reserved.