与 csv 文件相比,将 mysql 表转换为 Spark 数据集非常慢

问题描述 投票:0回答:2

我在 Amazon s3 中有 csv 文件,大小为 62mb(114 000 行)。我将其转换为 Spark 数据集,并从中获取前 500 行。代码如下;

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");

 set.take(500)

整个操作需要20至30秒。

现在我正在尝试相同的操作,但使用的是 csv,我使用的是包含 119 000 行的 mySQL 表。 MySQL服务器位于亚马逊ec2。代码如下;

String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;
    
SparkSession spark=StartSpark.getSparkSession();

SQLContext sc = spark.sqlContext();

Dataset<Row> set = sc
            .read()
            .option("url", url)
            .option("dbtable", this.tableName)
            .option("driver","com.mysql.jdbc.Driver")
            .format("jdbc")
            .load();
set.take(500);

这需要 5 到 10 分钟。 我在jvm内运行spark。在这两种情况下使用相同的配置。

我可以使用partitionColumn、numParttition等,但我没有任何数字列,而且还有一个问题是我不知道表的架构。

我的问题不是如何减少所需的时间,因为我知道在理想情况下spark将在集群中运行,但我不明白的是为什么在上述两种情况下会有这么大的时间差异?

java mysql apache-spark jdbc amazon-s3
2个回答
10
投票

这个问题已在 StackOverflow 上多次讨论过:

以及外部来源:

所以重申一下 - 默认情况下

DataFrameReader.jdbc
不会分发数据或读取。它使用单线程、单执行器。

分发阅读:

  • 使用范围与

    lowerBound
    /
    upperBound
    :

    Properties properties;
    Lower
    
    Dataset<Row> set = sc
        .read()
        .option("partitionColumn", "foo")
        .option("numPartitions", "3")
        .option("lowerBound", 0)
        .option("upperBound", 30)
        .option("url", url)
        .option("dbtable", this.tableName)
        .option("driver","com.mysql.jdbc.Driver")
        .format("jdbc")
        .load();
    
  • predicates

    Properties properties;
    Dataset<Row> set = sc
        .read()
        .jdbc(
            url, this.tableName,
            {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
            properties
        )
    

-2
投票

请按照以下步骤操作

1.下载 mysql 的 JDBC 连接器的副本。我相信你已经拥有了。

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2.按照以下格式创建 db-properties.flat 文件

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>

3.首先创建一个空表,用于加载数据。

使用驱动程序类调用 Spark shell

spark-shell --driver-class-path  <your path to mysql jar>

然后导入所有需要的包

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

启动 hive 上下文或 sql 上下文

val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql

设置一些属性

sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

从文件加载 mysql 数据库属性

val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db-        properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")

创建一个查询以从表中读取数据并将其传递给 #sqlcontext 的 read 方法。这是你可以管理你的 where 子句的地方

val df1 = "(SELECT  * FROM your_table_name) as s1" 

通过jdbcurl,选择query和db属性读取方法

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

写到你的桌子上

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")
© www.soinside.com 2019 - 2024. All rights reserved.