是否可以在 Dataflow 中的 BigQuery 上执行脚本?

问题描述 投票:0回答:3

我有两个表:一个包含要处理的数据的表和一个跟踪已处理数据的跟踪表。

因此,例如,下面的跟踪表将表明我们已经处理了那两天的数据。

date
---
3/15/23
3/16/23

如果我只是直接在 BigQuery 或其他数据库中运行它,我会做这样的事情(在伪代码中)

last_processed_date = max(date) from tracking table
date_to_process = last_processed_date + 1

select * from main table where date = date_to_process

insert into tracking table values(date_to_process)

问题是,由于这是一个“脚本”而不是单个表达式,我认为我无法使用 Dataflow 使用 BigQueryIO 连接器来运行它。我想知道是否有我遗漏的东西可以使这成为可能。

google-bigquery google-cloud-dataflow apache-beam
3个回答
0
投票

这是一个示例 Node.js 代码,它使用 @google-cloud/bigquery 库来完成您想要做的事情:

const { BigQuery } = require('@google-cloud/bigquery');

async function processData() {
  // Instantiate a client
  const bigquery = new BigQuery();

  // Define the name of the tables
  const mainTable = 'my_dataset.my_main_table';
  const trackingTable = 'my_dataset.my_tracking_table';

  // Get the last processed date from the tracking table
  const [row] = await bigquery.query(`SELECT MAX(date) as last_processed_date FROM ${trackingTable}`);
  const lastProcessedDate = row.last_processed_date;

  // Calculate the date to process
  const dateToProcess = new Date(lastProcessedDate);
  dateToProcess.setDate(dateToProcess.getDate() + 1);

  // Query the main table for the data to process
  const query = `SELECT * FROM ${mainTable} WHERE date = @date_to_process`;
  const options = {
    queryParameters: {
      date_to_process: dateToProcess,
    },
  };
  const [results] = await bigquery.createQueryJob({ query, options }).then((job) => job.getQueryResults());

  // Insert the date to the tracking table
  await bigquery.dataset('my_dataset').table('my_tracking_table').insert({ date: dateToProcess });
}

processData();

此代码使用@google-cloud/bigquery 库中的 BigQuery 类与 BigQuery 交互。它首先查询跟踪表以获取最后处理的日期并计算要处理的日期。然后,它使用参数化查询查询主表以获取要处理的数据,以避免 SQL 注入。最后,它将日期插入到跟踪表中以将其标记为已处理。


0
投票

一个可能的选择是数据流 sql 作业 - 请参阅此处

https://cloud.google.com/dataflow/docs/guides/sql/parameterized-queries#using_timestamps_in_parameterized_queries

可以从您的主数据流调用该作业,也可以通过编程方式调用,另请参阅此

https://cloud.google.com/dataflow/docs/guides/sql/parameterized-queries#using_timestamps_in_parameterized_queries


0
投票

我认为最好在

Dataflow
之外执行前 2 个查询:

last_processed_date = max(date) from tracking table
date_to_process = last_processed_date + 1

然后将

date_to_process
作为管道选项传递给
Dataflow
作业。

在您的工作中,您的输入连接器将是一个

BigQueryIO.read
,它通过日期参数执行以下查询:

select * from main table where date = date_to_process

输出连接器将是一个

BigQuery.write
.

解决方案 1:

您可以使用

Shell
cli 和
gcloud
创建一个
bq
脚本来执行第一个查询并增加值。

然后使用之前计算的日期启动

Dataflow
作业。

例子:

# Execute the first query
last_processed_date="bq query --quiet --use_legacy_sql=false 'SELECCT max(date) from tracking table' | awk '{if(NR>1)print}'"

date_to_process=$((++last_processed_date))

# Launch the Dataflow job, example with Python
python -m your_folder.main \
        --runner=DataflowRunner \
        --region=europe-west1 \
        --date_to_process=$date_to_process \
       ....

解决方案2:

如果您在项目中使用像

Airflow
这样的管道编排器,您可以应用解决方案 1 中提供的相同逻辑,但使用 Python 代码和运算符

- A first operator BigQueryInsertJobOperator that executes query, get the result and pass it with xcom to a second operator
- The second operator BeamRunPythonPipelineOperator that launches the Dataflow job and passing the date as pipeline argument
© www.soinside.com 2019 - 2024. All rights reserved.