如何将“客户”表的值与“组合”表的行值进行分类?
我决定创建一个组合表来开发主行(客户表)中的所有组合。
我计划检查客户的行是否与组合表的一行重合,以将其分类为部门 B(组合表)。
我有这个流程,但 Dtabricks 返回错误:
for i,j in select_df.iterrows():
for u,v in dfCombinacionesDias.iterrows():
if((select_df["MONDAY"][i] == registro["LUNES"][u]) and (select_df["TUESDAY"][i] == registro["MARTES"][u]) and (select_df["WEDNESDAY"][i] == registro["MIERCOLES"][u]) and (select_df["THURSDAY"][i] == registro["JUEVES"][u]) and (select_df["FRIDAY"][i] == registro["VIERNES"][u]) and (select_df["SATURDAY"][i] == registro["SABADO"][u]) and (select_df["SUNDAY"][i] == registro["DOMINGO"][u])):
Sector = "B"
else:
Sector = "A"
vSubSeq = "('{}','{}')".format(select_df["IDClient"][i],Sector)
sqlInsertSequence = "Insert into {0}.{1} values {2}".format(dSCHEMA, Table, vSubSeq,vdataDeltaPath)
print(sqlInsertSequence)
dfTables = spark.sql(sqlInsertSequence)
我添加带有不同表格(客户、组合和部门)的图像
我认为我需要一个 for 来逐行循环表(组合表),以便与客户表中的行进行比较(如果有匹配项),我将此值保存在新表(扇区表)中,并且显然会存在其他 for循环客户表。但我想知道一种有助于查找表格进行比较的算法?
我有这个流程,但 Dtabricks 返回错误:
“返回错误”...含糊不清。例如,您正在使用
registro
,它似乎没有在您提供的代码摘录中的任何地方定义。
并且您没有有效地使用 Databricks' 功能。迭代 DataFrames (Spark) 的行效率很低,尤其是使用嵌套循环时。
相反,您可以使用 Spark 的 DataFrame API(来自 Apache Spark API 参考)更有效地获得所需的结果。
作为替代方法,不使用嵌套循环,您可以:
Combinations
DataFrame 的列以匹配 Clients
DataFrame 的列。Clients
DataFrame 和 Combinations
DataFrame。连接后,
Clients
DataFrame 中与 Combinations
DataFrame 匹配的任何行都将被分类为“B
”。没有匹配的行将为“A
”。
from pyspark.sql.functions import col, when
# Assuming you have loaded your data into two DataFrames: df_clients and df_combinations
# Step 1: Rename columns in df_combinations to match df_clients
df_combinations = df_combinations.withColumnRenamed("LUNES", "MONDAY")\
.withColumnRenamed("MARTES", "TUESDAY")\
.withColumnRenamed("MIERCOLES", "WEDNESDAY")\
.withColumnRenamed("JUEVES", "THURSDAY")\
.withColumnRenamed("VIERNES", "FRIDAY")\
.withColumnRenamed("SABADO", "SATURDAY")\
.withColumnRenamed("DOMINGO", "SUNDAY")
# Step 2: Join df_clients with df_combinations
df_joined = df_clients.join(df_combinations, on=["MONDAY", "TUESDAY", "WEDNESDAY", "THURSDAY", "FRIDAY", "SATURDAY", "SUNDAY"], how="left_outer")
# Step 3: Create a new column "Sector" based on whether there is a match in df_combinations
df_result = df_joined.withColumn("Sector", when(col("MONDAY").isNotNull(), "B").otherwise("A"))
# Step 4: If you want to store the result in another table
df_result.select("IDClient", "Sector").write.format("delta").save("/path/to/save/location")
它使用 DataFrame 转换和显式循环上的操作来对 Spark DataFrame 进行操作。