import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
public class BigQueryToOracle {
public static void main(String[] args) {
// Create pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
// Initialize the pipeline
Pipeline pipeline = Pipeline.create(options);
// Read from BigQuery
PCollection<TableRow> input = pipeline.apply(
BigQueryIO.readTableRows()
.from("your_project_id:your_dataset_id.your_table_id")
.withTemplateCompatibility()
.withoutValidation());
// Write to Oracle
input.apply(JdbcIO.<TableRow>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"oracle.jdbc.driver.OracleDriver",
"jdbc:oracle:thin:@//your_oracle_host:your_oracle_port/your_oracle_sid")
.withUsername("your_username")
.withPassword("your_password"))
.withStatement(buildInsertStatement(input.getSchema()))
.withPreparedStatementSetter((element, statement) -> {
// Set values from TableRow to the PreparedStatement
for (int i = 0; i < input.getSchema().size(); i++) {
statement.setObject(i + 1, element.get(input.getSchema().get(i).getName()));
}
}));
// Run the pipeline
pipeline.run().waitUntilFinish();
}
private static String buildInsertStatement(Schema schema) {
StringBuilder statementBuilder = new StringBuilder("INSERT INTO your_oracle_table (");
boolean first = true;
for (Schema.Field field : schema.getFields()) {
if (!first) {
statementBuilder.append(", ");
}
statementBuilder.append(field.getName());
first = false;
}
statementBuilder.append(") VALUES (");
for (int i = 0; i < schema.size(); i++) {
if (i > 0) {
statementBuilder.append(", ");
}
statementBuilder.append("?");
}
statementBuilder.append(")");
return statementBuilder.toString();
}
}
代码有什么问题