我在 Amazon s3 中有 csv 文件,大小为 62mb(114 000 行)。我将其转换为 Spark 数据集,并从中获取前 500 行。代码如下;
DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");
set.take(500)
整个操作需要20至30秒。
现在我正在尝试相同的操作,但使用的是 csv,我使用的是包含 119 000 行的 mySQL 表。 MySQL服务器位于亚马逊ec2。代码如下;
String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;
SparkSession spark=StartSpark.getSparkSession();
SQLContext sc = spark.sqlContext();
Dataset<Row> set = sc
.read()
.option("url", url)
.option("dbtable", this.tableName)
.option("driver","com.mysql.jdbc.Driver")
.format("jdbc")
.load();
set.take(500);
这需要 5 到 10 分钟。 我在jvm内运行spark。在这两种情况下使用相同的配置。
我可以使用partitionColumn、numParttition等,但我没有任何数字列,而且还有一个问题是我不知道表的架构。
我的问题不是如何减少所需的时间,因为我知道在理想情况下spark将在集群中运行,但我不明白的是为什么在上述两种情况下会有这么大的时间差异?
这个问题已在 StackOverflow 上多次讨论过:
以及外部来源:
所以重申一下 - 默认情况下
DataFrameReader.jdbc
不会分发数据或读取。它使用单线程、单执行器。
分发阅读:
使用范围与
lowerBound
/ upperBound
:
Properties properties;
Lower
Dataset<Row> set = sc
.read()
.option("partitionColumn", "foo")
.option("numPartitions", "3")
.option("lowerBound", 0)
.option("upperBound", 30)
.option("url", url)
.option("dbtable", this.tableName)
.option("driver","com.mysql.jdbc.Driver")
.format("jdbc")
.load();
predicates
Properties properties;
Dataset<Row> set = sc
.read()
.jdbc(
url, this.tableName,
{"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
properties
)
1.下载 mysql 的 JDBC 连接器的副本。我相信你已经拥有了。
wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar
2.按照以下格式创建 db-properties.flat 文件
jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>
3.首先创建一个空表,用于加载数据。
spark-shell --driver-class-path <your path to mysql jar>
import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql
sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db- properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")
val df1 = "(SELECT * FROM your_table_name) as s1"
val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)
df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")