Apache Flink 未打印任何新的流数据

问题描述 投票:0回答:1

我正在对用户的访问日志数据进行实时数据处理。它基本上跟踪用户在办公室的出勤情况。

在kafka流中,每当用户刷门时,日志事件就会进入kafka,然后在使用该事件后,我将检查用户是否是第一次访问。如果是,那么出勤时间将被标记,如果用户数据已经存在于kafka中,那么我将简单地增加计数,例如用户访问的次数。这是我现在正在研究的简单用例。

我对 Flink 很陌生,所以我用 python 准备了基本的 Flink 代码。

import json
import os

from pyflink.common import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types
from pyflink.table import StreamTableEnvironment, EnvironmentSettings


def my_map(obj):
    json_obj = json.loads(json.loads(obj))
    return json.dumps(json_obj["name"])


def kafkaread():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    jars_path = "/home/vishal/flink/flink-pipeline/flink-env3.10/flink_test_1/"
    jar_files = [
        "file:///" + jars_path + "flink-sql-connector-kafka-3.1.0-1.18.jar",
    ]
    jar_files_str = ";".join(jar_files)

    env.add_jars(jar_files_str)

    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.get_config().set('table.exec.source.idle-timeout', '1000 ms')

   
    // with below code I am taking the latest data which is coming in kafka
    create_table_sql = """
        CREATE TABLE access_logs_source (
                        `name` STRING,
                        `door` STRING,
                        `access_time` TIMESTAMP
                      ) WITH (
                        'connector' = 'kafka',
                        'topic' = 'access_logs_topic_5',
                        'properties.bootstrap.servers' = 'localhost:9092',
                        'properties.group.id' = 'flink-group-access_logs_5',
                        'scan.startup.mode' = 'latest-offset',
                        'format' = 'json'
                      );
      """
    // It will fetch all the data which is present in kafka along with latest data so that I can check if latest data is present here or not
    historical_table_sql = """
        CREATE TABLE access_logs_historical (
                        `name` STRING,
                        `door` STRING,
                        `access_time` TIMESTAMP
                      ) WITH (
                        'connector' = 'kafka',
                        'topic' = 'access_logs_topic_5',
                        'properties.bootstrap.servers' = 'localhost:9092',
                        'properties.group.id' = 'flink-group-access_logs_5',
                        'scan.startup.mode' = 'earliest-offset',
                        'format' = 'json'
                      );
      """

    # Execute the SQL command to create the table
    t_env.execute_sql(create_table_sql)
    
    t_env.execute_sql(historical_table_sql)

    # Define the SQL command to select data from the created table
    select_sql = "SELECT name FROM access_logs_source"

    # Execute the SQL command to select data from the table and print the result
    result_table = t_env.execute_sql(select_sql)
    # result_table.print()
    result_rows = result_table.collect()
    // new log data of the user
    name_to_filter = ''
    for row in result_rows:
        # print(row[0])

        name_to_filter = row[0]
        # break
        print('-------', name_to_filter)
            # result_rows.close()
            
        // checking if new log data is present in the kafka topic or not
        check_query = """
          SELECT * from access_logs_historical where name = '{0}'
        """
        check_query = check_query.format(name_to_filter)
        print('++++++++++ ', check_query)
        # t_env.execute_sql(check_query)
        table = t_env.sql_query(check_query)
        table_result = table.execute()
        table_result.print()
    

    # Execute the Flink job
    # t_env.execute("Create and Select Table Job")

if __name__ == '__main__':
    kafkaread()

但问题是,如果不同名称的数据进入kafka,它不会打印任何内容。

假设我将此事件发送到kafka 我的数据 = { '名字': 'Bhuvi19', '门': '门220', '访问时间': '2024-03-05 13:08:00' }

每当我发送名称为“Bhuvi19”的数据时,它就会对数据进行打印,但是每当我使用“Bhuvi20”等不同名称发送数据时,它就不会显示任何内容。我想无论我将新数据发送到kafka,然后进程都会再次运行以检查我提到的步骤,但是对于新数据,它有点挂在那里。

你能帮我如何实现这个目标吗?

谢谢。

apache-kafka apache-flink flink-streaming flink-sql pyflink
1个回答
0
投票

我对 Pyflink SQL 不太有经验,但最近做了很多流处理 在线查看

select_sql = "SELECT name FROM access_logs_source"
,您将获得稍后用于过滤的名称

获取您所做的所有名称

result_table.collect()

但是在循环本身内部你没有做

.collect()

从这里开始,更多的是猜测,但这可能只是您将获得数据的第一个名称,因此要解决这个问题,您应该使用相同的

t_env
,您必须在循环内执行每个查询

© www.soinside.com 2019 - 2024. All rights reserved.