Python apache beam 从 kafka 读取 - 我无法连接到 kafka

问题描述 投票:0回答:1

我想做一个 python apache beam Kafka 客户端,它将从 Kafka 获取流数据(Kafka 返回大量数据,如数百万/数十亿),按键和值过滤数据,并按列表批量返回数据字典。我卡在与 Kafka 的连接上,使用

ReadFromKafka
连接时遇到问题,看起来该方法仅返回
None
但不应该(我使用底部的脚本测试了连接)。我不知道我做错了什么,如果有人帮助我,我将非常感激。我使用
DirectRunner
作为管道运行程序。 (
self.kafka_config.topics
是主题列表)

我当前的管道 apache beam kafka 脚本看起来是这样的:

import logging
from typing import Any, Dict, Iterator, List, Optional

import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions

from internal_kafka_client import KafkaConfig


class KafkaProcessData(beam.DoFn):
    # pylint: disable = abstract-method
    def __init__(self, filter_key: Any = None, filter_value: Any = None):
        super().__init__()
        self.filter_key = filter_key
        self.filter_value = filter_value

    def process(self, element: Any, *args, **kwargs) -> Iterator[Any]:
        if self.filter_key and self.filter_value:
            if (
                self.filter_key in element
                and element[self.filter_key] == self.filter_value
            ):
                yield element
        yield element


class BatchElements(beam.DoFn):
    # pylint: disable = abstract-method
    def __init__(self, batch_size: int):
        super().__init__()
        self.batch_size = batch_size
        self.buffer = []

    def process(self, element: Any, *args, **kwargs) -> Iterator[List[Any]]:
        self.buffer.append(element)
        if len(self.buffer) >= self.batch_size:
            yield self.buffer
            self.buffer = []


class KafkaClient:
    # pylint: disable = too-few-public-methods
    """
    pipeline_runner choices:
        - DataflowRunner (streaming mode)
        - FlinkRunner (
                streaming mode, locally, not on cluster, haven't tested with cluster
            )
        - DirectRunner (streaming mode)
    """
    _PIPELINE_TYPES = ["DataflowRunner", "FlinkRunner", "DirectRunner"]

    def __init__(
        self,
        kafka_config: KafkaConfig,
        logger: logging.Logger,
        pipeline_runner: str,
        apache_beam_pipeline_options: Optional[Dict] = None,
    ) -> None:
        if not apache_beam_pipeline_options:
            apache_beam_pipeline_options = {}
        if pipeline_runner not in self._PIPELINE_TYPES:
            raise ValueError(
                f"Given pipeline type {pipeline_runner} is not in "
                f"available pipeline list {self._PIPELINE_TYPES}"
            )
        self.pipeline_runner = pipeline_runner
        self.kafka_config = kafka_config
        self.logger = logger
        self.beam_options = PipelineOptions(
            **apache_beam_pipeline_options, save_main_session=True
        )

    def process(self, filter_key: Any = None, filter_value: Any = None) -> List[Dict]:
        kafka_consumer_config = {
            "bootstrap.servers": self.kafka_config.server_address,
            "auto.offset.reset": self.kafka_config.offset_reset,
        }
        if self.kafka_config.group_id:
            kafka_consumer_config["group.id"] = self.kafka_config.group_id

        with beam.Pipeline(self.pipeline_runner, options=self.beam_options) as pipeline:
            # pylint: disable = unsupported-binary-operation
            kafka_data = pipeline | "Read from Kafka" >> ReadFromKafka(
                topics=self.kafka_config.topics,
                consumer_config=kafka_consumer_config,
            )
            processed_data = kafka_data | "Process Data" >> beam.ParDo(
                KafkaProcessData(filter_key=filter_key, filter_value=filter_value)
            )
            processed_data_batches = processed_data | "Batch Data" >> beam.ParDo(
                BatchElements(batch_size=self.kafka_config.batch_size)
            )
            results = pipeline.run()
            results.wait_until_finish()
            return processed_data_batches[0] if processed_data_batches else []

我测试kafka连接的脚本。

from confluent_kafka import Consumer, KafkaError

bootstrap_servers = '<<kafka_address>>:<<kafka_port>>'
topic = '<<topic_name>>'

consumer_config = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

def test_kafka_connection():
    consumer = Consumer(consumer_config)
    consumer.subscribe([topic])
    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print('Kafka error: {}'.format(msg.error()))
                    break
            print('Received message: {}'.format(msg.value().decode('utf-8')))

    except KeyboardInterrupt:
        consumer.close()
        print('Consumer closed')

if __name__ == '__main__':
    test_kafka_connection()

我在谷歌中寻找答案,但根据卡夫卡连接,大多数或全部解决方案看起来与我的相同,所以我不知道我做错了什么,我在这里寻求帮助:)

python-3.x apache-beam
1个回答
0
投票

事实证明,

DirectRunner
将所有数据加载到内存中,并且出现了内存不足异常。为了解决这个问题,我必须在
max_num_records
中添加
ReadFromKafka
参数,并将
enable.auto.commit
设置为
False
var 中的
kafka_consumer_config

© www.soinside.com 2019 - 2024. All rights reserved.