我有一个旧的数据流模板,我们更新了数据库安全性,现在我需要暂存和额外的文件才能使其工作

问题描述 投票:0回答:1

我遇到了一个问题,这让我很烦恼,我写这篇文章是为了寻找解决方案。

在我的旧架构中,一切都工作正常:

  • 我们在 JDBC 中有一些数据,可以从特定子网访问

  • 我们使用云函数提取数据并将其写入BigQuery

我的新设置:

  • 相同的数据库,但具有额外的安全性,我必须使用证书来连接

  • 我已在连接字符串中添加了一些参数 useSSL=true&requireSSL=true&verifyServerCertificate=true&serverTimezone=UTC&trustCertificateKeyStoreUrl=file:/extra_files/ca-root.pem

  • 其余功能的设置相同

我遇到的错误,我无法解决它:java.sql.SQLException:无法创建 PoolableConnectionFactory(无法打开文件:/extra_files/ca-root.pem [/extra_files/ca-root.pem(没有这样的文件)或目录)]

从此https://cloud.google.com/dataflow/docs/guides/templates/ssl-certificates 就是这么简单,但我的设置有点复杂

我已经添加了模板参数,但作业仍然无法正常工作,文件不存在,我丢失了一些我不知道是什么的东西。

我的工作设置: 数据流 - MySQLToBigQuery.java:

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;

/**
 * MySQL to BigQuery pipeline.
 */
public class MySQLToBigQuery {
    public static void main(String[] args) {
        // Create pipeline
        CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(CustomPipelineOptions.class);

        run(options);
    }

    /**
     * Runs the pipeline with the supplied options.
     *
     * @param options The execution parameters to the pipeline.
     * @return PipelineResult
     */
    private static PipelineResult run(CustomPipelineOptions options) {
        // Create the pipeline
        Pipeline pipeline = Pipeline.create(options);

        /*
         * Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
         *        2) Append TableRow to BigQuery via BigQueryIO
         */
        pipeline
                /*
                 * Step 1: Read records via JDBC and convert to TableRow
                 *         via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
                 */
                .apply(
                        "Read from MySQL",
                        JdbcIO.<TableRow>read()
                                .withDataSourceConfiguration(
                                        JdbcIO.DataSourceConfiguration.create(
                                                StaticValueProvider.of("com.mysql.cj.jdbc.Driver"), options.getConnectionURL())
                                                .withUsername(options.getUsername())
                                                .withPassword(options.getPassword())
                                                .withConnectionProperties(options.getConnectionProperties()))
                                .withQuery(options.getQuery())
                                .withCoder(TableRowJsonCoder.of())
                                .withRowMapper(getResultSetToTableRow()))

                /*
                 * Step 2: Append TableRow to an existing BigQuery table
                 */
                .apply(
                        "Write to BigQuery",
                        BigQueryIO.writeTableRows()
                                .withoutValidation()
                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                                .to(options.getOutputTable()));

        // Execute the pipeline.
        return pipeline.run();
    }

    /**
     * Factory method for {@link ResultSetToTableRow}.
     */
    private static JdbcIO.RowMapper<TableRow> getResultSetToTableRow() {
        return new ResultSetToTableRow();
    }
/**
 * {@link JdbcIO.RowMapper} implementation to convert Jdbc ResultSet rows to UTF-8 encoded JSONs.
 */
private static class ResultSetToTableRow implements JdbcIO.RowMapper<TableRow> {

    @Override
    public TableRow mapRow(ResultSet resultSet) throws Exception {

        ResultSetMetaData metaData = resultSet.getMetaData();

        TableRow outputTableRow = new TableRow();

        for (int i = 1; i <= metaData.getColumnCount(); i++) {
            outputTableRow.set(metaData.getColumnName(i), resultSet.getObject(i));
        }

        return outputTableRow;
    }
}

}

数据流:CustomPipelineOptions.java

import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;

public interface CustomPipelineOptions extends PipelineOptions {
    @Description(
            "The JDBC connection URL string. " + "for example: jdbc:mysql://some-host:3306/sampledb")
    ValueProvider<String> getConnectionURL();

    void setConnectionURL(ValueProvider<String> connectionURL);

    @Description(
            "JDBC connection property string. " + "for example: unicode=true;characterEncoding=UTF-8")
    ValueProvider<String> getConnectionProperties();

    void setConnectionProperties(ValueProvider<String> connectionProperties);

    @Description("JDBC connection user name. ")
    ValueProvider<String> getUsername();

    void setUsername(ValueProvider<String> username);

    @Description("JDBC connection password. ")
    ValueProvider<String> getPassword();

    void setPassword(ValueProvider<String> password);

    @Description("Source data query string. " + "for example: select * from sampledb.sample_table")
    ValueProvider<String> getQuery();

    void setQuery(ValueProvider<String> query);

    @Description(
            "BigQuery Table spec to write the output to"
                    + "for example: some-project-id:somedataset.sometable")
    ValueProvider<String> getOutputTable();

    void setOutputTable(ValueProvider<String> outputTable);

    @Description("Temporary directory for BigQuery loading process")
    ValueProvider<String> getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(ValueProvider<String> bigQueryLoadingTemporaryDirectory);
}

我的云函数index.js:

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event Event payload.
 * @param {!Object} context Metadata for the event.
 */
exports.startDataflow = async (event, context) => {
    // general constants
    const REGION = process.env.REGION;
    const ZONE = process.env.ZONE;
    const BUCKET = process.env.BUCKET;
    const SUBNET = process.env.SUBNET;
    const NETWORK = process.env.NETWORK
    const TOPIC_MONITOR = process.env.TOPIC_MONITOR;

    // MySQL config
    const mySqlConfig = {
        host: process.env.MYSQL_HOST,
        port: process.env.MYSQL_PORT,
        username: process.env.MYSQL_USERNAME,
        password: process.env.MYSQL_PASSWORD,
        database: process.env.MYSQL_DATABASE,
        properties: process.env.MYSQL_PROPERTIES,
        timezone: process.env.MYSQL_TIMEZONE
    };
    
    // import modules
    const DataflowUtil = require('./dataflow-util');
    const dataflowUtil = new DataflowUtil();

    const BqUtil = require('./bq-util.js');
    const bqUtil = new BqUtil();

    const QueryUtil = require('./query-util');
    const queryUtil = new QueryUtil(BUCKET);


    const SchedulerUtil = require('./scheduler-util');
    const schedulerUtil = new SchedulerUtil();

    const userData = Buffer.from(event.data, 'base64').toString();
    console.log(`Input data received: ${userData}`);

    const inputData = JSON.parse(userData);

    async function main() {
        // find Project ID
        console.log('Finding current project ID.');
        const projectId = await dataflowUtil.getProjectId();
        
        // create Big Query staging and destination table
        let stagingTableExists = await bqUtil.tableExists(inputData.stagingTable);
        if (stagingTableExists) {
            console.log('Deleting Big Query staging table.');
            await bqUtil.deleteTable(inputData.stagingTable);
            stagingTableExists = false;
        }

        const destinationTableExists = await bqUtil.tableExists(inputData.destinationTable);
        if (!stagingTableExists || !destinationTableExists) {
            // extract schema from MySQL
            if (typeof inputData.schema !== "undefined") {
                console.log('Extracting Big Query schema from input data.');
                var fields = inputData.schema;
            } else {
                console.log('Extracting Big Query schema from MySQL.');
                const fields = await bqUtil.extractFieldsFromMySql(inputData.columns, inputData.sourceTable, mySqlConfig);
            }

            if (!stagingTableExists) {
                console.log('Creating Big Query staging table.');
                await bqUtil.createTable(inputData.stagingTable, fields);
            }
   
            if (!destinationTableExists) {
                console.log('Creating Big Query destination table.');
                await bqUtil.createTable(inputData.destinationTable, fields);
            }
        }

        // assemble source MySQL query
        const queryFields = inputData.columns.join("`, `");
        let query = `SELECT \`${queryFields}\` FROM \`${inputData.sourceTable}\``;
        
        // get the latest offset in order to compose the query
        if (typeof inputData.limitColumn !== 'undefined' && inputData.limitColumn.length > 0) {
            console.log('Reading current offset from Cloud Storage.');
            var limitStart = await queryUtil.lastOffset(inputData.jobName, inputData.limitColumn, '0000-00-00 00:00:00');
            var limitEnd = queryUtil.flexibleUTC(-5);
            var extendedLimitEnd = queryUtil.flexibleUTC(+5);;
            query += ` WHERE \`${inputData.limitColumn}\` >= '${limitStart}' AND \`${inputData.limitColumn}\` <= '${extendedLimitEnd}'`;
        }
        console.log(`MySQL query: ${query}`);

        // run dataflow job
        console.log('Starting Dataflow job from template.');
        const dataflowConfig = {
            projectId: projectId,
            region: REGION,
            jobName: inputData.jobName,
            templatePath: `gs://${BUCKET}/templates/MySQLToBigQuery`,
            connection: mySqlConfig,
            query: query,
            outputTable: inputData.stagingTable,
            bqLoadTempLocation: `gs://${BUCKET}/bq-temp`,
            tempLocation: `gs://${BUCKET}/temp`,
            network: NETWORK,
            subnetwork: SUBNET,
            zone: ZONE,
            machineType: inputData.machineType || 'n1-standard-1',
        };
        await dataflowUtil.startJobFromTemplate(dataflowConfig).then((dataflowJobId) => {
            // setup Cloud Scheduler monitoring cron
            console.log('Setting up Cloud Scheduler cron for monitoring Dataflow job.');
            inputData.dataflowJobId = dataflowJobId; // pass the Dataflow job ID to the monitoring function
            const config = {
                projectId: projectId,
                region: REGION,
                jobName: `monitor-dataflow-${dataflowJobId}`,
                description: 'Triggers a Cloud Function for monitoring a Dataflow job.',
                topic: TOPIC_MONITOR,
                payload: inputData,
                schedule: '* * * * *',
                timezone: "UTC"
            };
            
            return schedulerUtil.createJob(config);
        }).then((data)=>{
            // save new offset
            if (typeof limitEnd !== "undefined" && typeof limitStart !== "undefined") {
                console.log('Saving new offset in Cloud Storage.');
                return queryUtil.saveOffset(inputData.jobName, inputData.limitColumn, limitEnd, limitStart);
            } else {
                return null;
            }
        }).then((data) => {
            console.log('Done starting dataflow job.');
        });
    }

    await main();
};

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event Event payload.
 * @param {!Object} context Metadata for the event.
 */
exports.monitorDataflow = async (event, context) => {

    // general constants
    const REGION = process.env.REGION;
    const BUCKET = process.env.BUCKET;

    // import modules
    const DataflowUtil = require('./dataflow-util');
    const dataflowUtil = new DataflowUtil();

    const SchedulerUtil = require('./scheduler-util');
    const schedulerUtil = new SchedulerUtil();

    const BqUtil = require('./bq-util');
    const bqUtil = new BqUtil();

    const QueryUtil = require('./query-util');
    const queryUtil = new QueryUtil(BUCKET);

    const userData = Buffer.from(event.data, 'base64').toString();
    console.log(`Input data received: ${userData}`);

    const inputData = JSON.parse(userData);

    /**
     * Returns the MERGE query
     * 
     * @param {string} stagingTable 
     * @param {string} destinationTable 
     * @param {string} columns 
     * @param {string} pkColumn 
     */
    function assembleMergeQuery(stagingTable, destinationTable, columns, pkColumn) {

        let updateStatement = [];
        columns.forEach(element => {
            updateStatement.push(`D.\`${element}\` = S.\`${element}\``);
        });
    
        const query = `MERGE \`${destinationTable.replace(':', '.')}\` D
            USING \`${stagingTable.replace(':', '.')}\` S ON S.\`${pkColumn}\` = D.\`${pkColumn}\`
            WHEN NOT MATCHED THEN
                INSERT (\`${columns.join("`, `")}\`) VALUES (\`${columns.join("`, `")}\`)
            WHEN MATCHED THEN
                UPDATE SET ${updateStatement.join(", ")}`;
    
        return query;
    }

    async function main() {
        const projectId = await dataflowUtil.getProjectId();

        const status = await dataflowUtil.getJobStatus(projectId, REGION, inputData.dataflowJobId);
        if (status === 'JOB_STATE_DONE') {
            console.log('Deleting Cloud Scheduler job.');
            await schedulerUtil.deleteJob(inputData.csJobName);
           
            console.log('Running Big Query MERGE query.');
            const query = assembleMergeQuery(inputData.stagingTable, inputData.destinationTable, inputData.columns, inputData.pkColumn);
            let queryJobId = await bqUtil.runQuery(query, projectId);
            console.log(`Big Query query ${queryJobId} has been started. Done.`);
        } else if(status === 'JOB_STATE_FAILED' || status === 'JOB_STATE_CANCELLED'){
            console.log(`Deleting Cloud Scheduler job for failed/canceled dataflow job (${status}).`);
            await schedulerUtil.deleteJob(inputData.csJobName);
            
            console.log('Rolling back offset.');
            await queryUtil.rollbackOffset(inputData.jobName, inputData.limitColumn);
            console.log('Done.');
        } else {
            console.log('Job is still in progress (' + status + '). Exiting.');
        }
    }

    await main();
};
google-cloud-functions google-cloud-dataflow
1个回答
0
投票

https://cloud.google.com/dataflow/docs/guides/templates/ssl-certificates中描述的过程仅适用于Google提供的模板

您是否使用 google 提供的 MySql to BigQuery 模板?如果是的话,这应该可行。要仔细检查,您还可以在 Dataflow 控制台中检查工作日志。应该有一个日志表明该文件已暂存。

如果您使用自己的自定义模板,则必须实现像 this 这样的 JvmInitializer 并确保它位于类路径中。在 JvmInitializer 的

beforeProcessing
方法中,您可以从 GCS 复制证书并将其保存在本地。 JvmInitializer 在数据处理开始之前在每个工作线程中执行。因此,在创建与数据库的连接之前,应确保工作程序中存在证书。

© www.soinside.com 2019 - 2024. All rights reserved.