同时连接两个 Kafka 流的 Table API

问题描述 投票:0回答:1

我有一个 Kafka 生产者,它从两个大文件中读取数据并以具有相同结构的 JSON 格式发送它们:

def create_sample_json(row_id, data_file): return {'row_id':int(row_id), 'row_data': data_file}

生产者将每个文件分成小块,并从每个块创建 JSON 格式,最后在 for 循环中发送它们。

发送这两个文件的过程通过多线程同时发生。

我想从这些流(s1.row_id == s2.row_id)中进行join,并最终在我的生产者在 Kafka 上发送数据时进行一些流处理。因为生产者从多个来源生成大量数据,我迫不及待地要把它们全部消耗掉,而且必须同时完成。

我不确定 Table API 是否是一个好方法,但这是迄今为止我的 pyflink 代码:

from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import  EnvironmentSettings
from pyflink.table.expressions import col
from pyflink.table.table_environment import StreamTableEnvironment

KAFKA_SERVERS = 'localhost:9092'

def log_processing():
  env = StreamExecutionEnvironment.get_execution_environment()
  env.add_jars("file:///flink_jar/kafka-clients-3.3.2.jar")
  env.add_jars("file:///flink_jar/flink-connector-kafka-1.16.1.jar")
  env.add_jars("file:///flink_jar/flink-sql-connector-kafka-1.16.1.jar")

  settings = EnvironmentSettings.new_instance() \
    .in_streaming_mode() \
    .build()

  t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings)

  t1 = f"""
         CREATE TEMPORARY TABLE table1(
            row_id INT,
            row_data STRING
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'datatopic',
          'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
          'properties.group.id' = 'MY_GRP',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

 t2 = f"""
        CREATE TEMPORARY TABLE table2(
            row_id INT,
            row_data STRING
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'datatopic',
          'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
          'properties.group.id' = 'MY_GRP',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """
 p1 = t_env.execute_sql(t1)
 p2 = t_env.execute_sql(t2)

//请告诉我下一步应该做什么:

// 问题:

// 1) 我是否需要单独使用我的消费者类中的数据,然后将它们插入到这些表中,或者将从我们在这里实现的内容中消费数据(因为我传递了连接器、主题、bootstartap.servers 的名称)等...)?

// 2)如果是这样:

2.1) 如何在 Python 中从这些流中进行连接?

2.2)当我的 Producer 会发送数千条消息时,如何阻止之前的数据?我想确保不要进行重复查询。

// 3) 如果没有,我该怎么办?

非常感谢。

python apache-kafka apache-flink pyflink
1个回答
0
投票

// 1) 我是否需要单独使用我的消费者类中的数据,然后将它们插入到这些表中,或者将从我们在这里实现的内容中消费数据(因为我传递了连接器、主题、bootstartap.servers 的名称)等...)?

后一个,数据将由我们实现的“kafka”表连接器消耗。并且您需要定义一个Sink表作为您插入的目标,Sink表可以是一个带有您想要输出的主题的kafka连接器表。

2.1) 如何在 Python 中从这些流中进行连接?

您可以编写 SQL 连接 table1 和 table2,然后在 Python 中插入到您的接收器表中

2.2)当我的 Producer 会发送数千条消息时,如何阻止之前的数据?我想确保不要进行重复查询。

您可以在“加入”之前或“插入”之前过滤这些消息,在您的情况下,“WHERE”子句就足够了

© www.soinside.com 2019 - 2024. All rights reserved.