将Spark DataFrame写入Redis时如何提高速度?

问题描述 投票:0回答:1

我正在开发基于Flask的图书推荐API,并且发现要管理多个请求,我需要预先计算相似度矩阵并将其存储在将来的查询中。该矩阵是使用PySpark基于约150万个具有图书ID,名称和元数据的数据库条目创建的,其结果可以用该模式描述(ij用于图书索引,dot用于相似性元数据):

StructType(List(StructField(i,IntegerType,true),StructField(j,IntegerType,true),StructField(dot,DoubleType,true)))

最初,我打算使用spark-redis连接器将其存储在Redis上。但是,以下命令的运行速度似乎很慢(即使初始书籍数据库的查询大小限制为非常适度的40k批处理):

similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").option("key.column", "i").save()

Spark将初始任务划分为9个阶段中的3个阶段,大约花了6个小时。奇怪的是,Spark执行者使用的存储内存非常低,大约20kb。典型的阶段活动阶段由Spark Application UI进行了描述:

org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

是否有可能以某种方式加快此过程?我的Spark会话是通过以下方式设置的:

SUBMIT_ARGS = "  --driver-memory 2G --executor-memory 2G --executor-cores 4 --packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf().set("spark.jars", "spark-redis/target/spark-redis_2.11-2.4.3-SNAPSHOT-jar-with-dependencies.jar").set("spark.executor.memory", "4g")
sc = SparkContext('local','example', conf=conf) 
sql_sc = SQLContext(sc)
python apache-spark pyspark redis data-science
1个回答
1
投票

您可以尝试使用Append保存模式以避免检查表中是否已存在数据:

similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").mode('append').option("key.column", "i").save()

另外,您可能想要更改

sc = SparkContext('local','example', conf=conf) 

sc = SparkContext('local[*]','example', conf=conf) 

以利用计算机上的所有内核。

BTW,在Redis中使用i作为键是否正确?它不是由ij组成吗?

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.