如何使用PreparedStatement在不迭代行的情况下将Spark Dataframe添加到批处理中

问题描述 投票:-2回答:1

[当前,我在使用Kafka Spark流API读取来自kafka主题的实时流数据之后,正在为创建的spark数据帧执行以下插入操作。我需要根据批大小将这些数据加载到DB2登台表中。数据大小是我从该主题消耗的每秒数千个事务。

Class DF_Creation{
.
DB2_CLASS.insert(DB2_Table, final_dataframe, batchSize);
.
}

Class DB2_CLASS{
.
public static void insert(String DB2_Table, Dataset<Row> final_dataframe, int batchSize){
CREATE DB2 Connection..Connection conn = ......
CREATE STATEMENT Statement stmt = conn.createStatement()) {
String truncate = "TRUNCATE TABLE DB2_Table IMMEDIATE";
stmt.execute(truncate);
final_dataframe.foreachPartition((ForeachPartitionFunction<Row>) rows -> {
       String insertQuery = "INSERT INTO " + DB2_Table + " (COL1,COL2,COL3) VALUES (?, ?, ?) ";
       try (Connection conn = CREATE DB2 Connection
             PreparedStatement insertStmt = conn.prepareStatement(insertQuery)) {
            conn.setAutoCommit(false);
            try{
                            int cnt = 0;
                            while (rows.hasNext()) {
                            int idx = 0;
                            Row row = rows.next();
                    insertStmt.setString(++idx, row.getAs("COL1"));
                    insertStmt.setString(++idx, row.getAs("COL2"));
                    insertStmt.setString(++idx, row.getAs("COL3"));
                        insertStmt.addBatch();
                                            cnt++;
                    if (cnt >= batchSize) {
                        insertStmt.executeBatch();
                        conn.commit();
                        insertStmt.clearBatch();
                        cnt = 0;
                    }
                        }}catch{..}
        }
    }
}}              

这在我遍历每一行,读取每一列以创建批处理时,影响了火花作业的性能。有什么方法可以直接创建批处理,而无需遍历行和列。

请提出建议。

谢谢

apache-spark jdbc apache-spark-sql database-connection prepared-statement
1个回答
0
投票

您可以通过spark数据框编写器中的JDBC选项直接将数据框写入目标表。不再需要在逐行遍历整个数据帧的同时创建准备好的语句。 Spark内部处理所有这些操作。

您可以写:

import com.ibm.db2.jcc._

val jdbcUrl = "jdbc:db2://host:port/database_name"

val db2Properties = new Properties()

final_dataframe.write.mode("append").option("driver", "com.ibm.db2.jcc.DB2Driver").jdbc(jdbcUrl, table = "table_name", db2Properties)
© www.soinside.com 2019 - 2024. All rights reserved.