需要有关此Apache Spark(pyspark)问题的帮助。.>>
我有一个dataFrame(df1),它具有单列和单行,其中包含max_timestamp
+------------------+
|max_timestamp |
+-------------------+
|2019-10-24 21:18:26|
+-------------------+
我有另一个DataFrame,其中包含2列-EmpId和时间戳
masterData = [(1, '1999-10-24 21:18:23',), (1, '2019-10-24 21:18:26',), (2, '2020-01-24 21:18:26',)] df_masterdata = spark.createDataFrame(masterData, ['dsid', 'txnTime_str']) df_masterdata = df_masterdata.withColumn('txnTime_ts', col('txnTime_str').cast(TimestampType())).drop('txnTime_str') df_masterdata.show(5, False) +----+-------------------+ |dsid|txnTime_ts | +----+-------------------+ |1 |1999-10-24 21:18:23| |1 |2019-10-24 21:18:26| |2 |2020-01-24 21:18:26| +----+-------------------+
对象用于根据条件txnTime_ts
[我要做什么->将列'max_timestamp'添加到第二个DataFrame,并通过比较两个值来过滤记录。
df_masterdata1 = df_masterdata.withColumn('maxTime', maxTS2['TEMP_MAX'])
Pyspark不允许我将maxTS2中的列添加到dataFrame-df_masterdata
错误-
AnalysisException: 'Resolved attribute(s) TEMP_MAX#207255 missing from dsid#207263L,txnTime_ts#207267 in operator !Project [dsid#207263L, txnTime_ts#207267, TEMP_MAX#207255 AS maxTime#207280].;;\n!Project [dsid#207263L, txnTime_ts#207267, TEMP_MAX#207255 AS maxTime#207280]\n+- Project [dsid#207263L, txnTime_ts#207267]\n +- Project [dsid#207263L, txnTime_str#207264, cast(txnTime_str#207264 as timestamp) AS txnTime_ts#207267]\n +- LogicalRDD [dsid#207263L, txnTime_str#207264], false\n'
关于如何解决此问题的任何想法?感谢您的帮助。
需要此Apache Spark(pyspark)问题的帮助。我有一个dataFrame(df1),它具有单列和单行,它包含max_timestamp + ------------- ----- + | max_timestamp | + ---...
如果您实际上有一个具有单个行/列的DF,最有效的方法是从数据帧中提取值,然后对它过滤df_masterdata
。如果您仍然需要在数据框的上下文中执行此操作,则应使用join
,例如:]
df_masterdata1 = df_masterdata.join(df1, df_masterdata.txnTime_ts <= df1.max_timestamp)