我是个非常新手,只花了一周的时间。这是我在pyspark中的代码,该代码在具有单个主设备和两个从设备的独立Spark集群上运行。试图运行一个作业会导致读取1100万条记录数据并对其进行一些处理,然后将数据帧转储到oracle表中。它似乎像该程序一样创建了404个分区来完成任务。在控制台或终端上,我可以看到403/404已完成,但是分区404上的最后一个也是最后一个任务要花很长时间才能完成工作。我无法完成工作。谁能告诉我代码的问题。任何人都可以帮助优化Spark的性能,或者可以为我提供指南或其他内容?任何tu语或指南将有所帮助。在此先感谢
# creating a spark session
spark = SparkSession \
.builder \
.appName("pyspark_testing_29012020") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# target table schema and column order
df_target = spark.read.csv("mycsv path", header = True)
df_local_schema = df_target.schema
df_column_order = df_target.columns
# dataframe with input file/dataset values and schema
df_source = spark.read\
.format("csv")\
.option("header", "false")\
.option("inferschema", "true")\
.option("delimiter", ",")\
.schema(df_local_schema)\
.load("csv path")
# dataframe with the target file/dataset values
df_target = spark.read\
.format("jdbc") \
.option("url", "jdbc:oracle:thin:system/[email protected]:0101:orcl") \
.option("dbtable", "mydata") \
.option("user", "system") \
.option("password", "oracle123") \
.option("driver", "oracle.jdbc.driver.OracleDriver")\
.load()
# splitting the target table in to upper and lower sections
df_target_upper = df_target.where(df_target['Key'] < 5) # set A
df_source_upper = df_source.where(df_source['Key'] < 5) # set B
df_source_lower = df_source.where(df_source['Key'] > 4) # set D
df_target_lower = df_target.where(df_target['key'] > 4) # set C
''' now programming for the upper segment of the data '''
# set operation A-B
A_minus_B = df_target_upper.join(df_source_upper,
on=['key1', 'key2', 'key3', 'key4'],
how='left_anti')
A_minus_B = A_minus_B.select(sorted(df_column_order))
# set operation B-A
B_minus_A = df_source_upper.join(df_target_upper,
on=['key1', 'key2','key3','key4'],how = 'left_anti')
B_minus_A = B_minus_A.select(sorted(df_column_order))
# union of A-B and B-A
AmB_union_BmA = A_minus_B.union(B_minus_A)
AmB_union_BmA = AmB_union_BmA.select(sorted(df_column_order))
# A-B left anti B-A to get the uncommon record in both the dataframes
new_df = A_minus_B.join(B_minus_A, on=['key'], how = 'left_anti')
new_df = new_df.select(sorted(df_column_order))
AmB_union_BmA = AmB_union_BmA.select(sorted(df_column_order))
AnB = df_target_upper.join(df_source_upper,
on=['key1', 'key2', 'key3', 'key4'],
how='inner')
df_AnB_without_dupes = dropDupeDfCols(AnB)
new_AnB = df_AnB_without_dupes.select(sorted(df_column_order))
final_df = AmB_union_BmA.union(new_AnB)
final_df.show()
result_df = B_minus_A.union(new_df)
df_result_upper_seg = result_df.union(new_AnB)
''' now programming for the lower segment of the data '''
# set operation C-D
C_minus_D = df_target_lower.join(df_source_lower, on=['key'], how='left_anti')
C_minus_D = C_minus_D.select(sorted(df_column_order))
# set operation D-C
D_minus_C = df_source_lower.join(df_target_lower, on=['key'], how = 'left_anti')
D_minus_C = D_minus_C.select(sorted(df_column_order))
# union of C-D union D-C
CmD_union_DmC = C_minus_D.union(D_minus_C)
CmD_union_DmC = CmD_union_DmC.select(sorted(df_column_order))
# C-D left anti D-C to get the uncommon record in both the dataframes
lower_new_df = C_minus_D.join(D_minus_C, on=['key'], how = 'left_anti')
lower_new_df = lower_new_df.select(sorted(df_column_order))
CmD_union_DmC = CmD_union_DmC.select(sorted(df_column_order))
CnD = df_target_lower.join(df_source_lower,
on=['key'], how='inner')
new_CnD = dropDupeDfCols(CnD)
new_CnD = new_CnD.select(sorted(df_column_order))
lower_final_df = CmD_union_DmC.union(new_CnD)
result_df_lower = D_minus_C.union(lower_new_df)
df_result_lower_seg = result_df_lower.union(new_CnD)
df_final_result .write \
.format("jdbc") \
.mode("overwrite")\
.option("url", "jdbc:oracle:thin:system/[email protected]:1010:orcl") \
.option("dbtable", "mydata") \
.option("user", "system") \
.option("password", "oracle123") \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.save()