[当前,我在使用Kafka Spark流API读取来自kafka主题的实时流数据之后,正在为创建的spark数据帧执行以下插入操作。我需要根据批大小将这些数据加载到DB2登台表中。数据大小是我从该主题消耗的每秒数千个事务。
Class DF_Creation{
.
DB2_CLASS.insert(DB2_Table, final_dataframe, batchSize);
.
}
Class DB2_CLASS{
.
public static void insert(String DB2_Table, Dataset<Row> final_dataframe, int batchSize){
CREATE DB2 Connection..Connection conn = ......
CREATE STATEMENT Statement stmt = conn.createStatement()) {
String truncate = "TRUNCATE TABLE DB2_Table IMMEDIATE";
stmt.execute(truncate);
final_dataframe.foreachPartition((ForeachPartitionFunction<Row>) rows -> {
String insertQuery = "INSERT INTO " + DB2_Table + " (COL1,COL2,COL3) VALUES (?, ?, ?) ";
try (Connection conn = CREATE DB2 Connection
PreparedStatement insertStmt = conn.prepareStatement(insertQuery)) {
conn.setAutoCommit(false);
try{
int cnt = 0;
while (rows.hasNext()) {
int idx = 0;
Row row = rows.next();
insertStmt.setString(++idx, row.getAs("COL1"));
insertStmt.setString(++idx, row.getAs("COL2"));
insertStmt.setString(++idx, row.getAs("COL3"));
insertStmt.addBatch();
cnt++;
if (cnt >= batchSize) {
insertStmt.executeBatch();
conn.commit();
insertStmt.clearBatch();
cnt = 0;
}
}}catch{..}
}
}
}}
这在我遍历每一行,读取每一列以创建批处理时,影响了火花作业的性能。有什么方法可以直接创建批处理,而无需遍历行和列。
请提出建议。
谢谢
您可以通过spark数据框编写器中的JDBC选项直接将数据框写入目标表。不再需要在逐行遍历整个数据帧的同时创建准备好的语句。 Spark内部处理所有这些操作。
您可以写:
import com.ibm.db2.jcc._
val jdbcUrl = "jdbc:db2://host:port/database_name"
val db2Properties = new Properties()
final_dataframe.write.mode("append").option("driver", "com.ibm.db2.jcc.DB2Driver").jdbc(jdbcUrl, table = "table_name", db2Properties)