在同一数据框中合并 RDD 映射结果列的方法

问题描述 投票:0回答:1

我正在 pyspark 代码中进行一次转换并创建新列。我观察到 source_df 正在被新列取代。是否可以将新列与现有数据框列合并。

source_df=source_df.rdd.map(lambda x:winner_calc(x["org_attributes_dict"])).toDF()

Output 
+---------+----------+------------+
|winner_bn|winner_hj|winner_value|
+---------+----------+------------+
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
| No Match|         2|           1|
+---------+----------+------------+


共享示例代码,因为无法共享实际代码。如果您在最终结果中看到实际数据框正在被所有行的新值“H”覆盖。我想将其添加为现有数据框中的新列而不是覆盖它。

import sys,os
import concurrent.futures
from concurrent.futures import *
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.context import SparkConf
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime
from pyspark.sql.functions import array
from pyspark.sql.functions import sha2, concat_ws
from pyspark.sql.functions import  udf
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
#from pyspark.sql.functions import StringType
from pyspark.sql.functions import row_number,lit,col,expr
from pyspark.sql.window import Window
import requests
import json
import traceback
import base64
import pandas as pd 
import pyspark.sql.types as T
from pyspark.sql import functions as F


def val():
    return (tuple('H'))

###############################

class JobBase(object):
    spark=None
    def __start_spark_glue_context(self):
        conf = SparkConf().setAppName("python_thread")
        self.sc = SparkContext(conf=conf)
        self.glueContext = GlueContext(self.sc)
        self.spark = self.glueContext.spark_session
            
    def execute(self):
        self.__start_spark_glue_context()
        d =[{"account_number": 1, "v1": 100830, "v2": 1000},
                {"account_number": 2, "v1": 2000, "v2": 2},
                {"account_number": 3, "v1": 555, "v2": 55}]

        df = self.spark.createDataFrame(d)
        df.show()
        try:
            df=df.rdd.map(lambda x :val()).toDF()
        except Exception as exp:
            exception_type, exception_value, exception_traceback = sys.exc_info()
            traceback_string = traceback.format_exception(exception_type, exception_value, exception_traceback)
            err_msg = json.dumps({
                "errorType": exception_type.__name__,
                "errorMessage": str(exception_value),
                "stackTrace": traceback_string})
            print(err_msg)
        
        df.show()

        
def main():
    job = JobBase()
    job.execute() 
    

if __name__ == '__main__':
    main()







Output 


+--------------+------+----+
|account_number|    v1|  v2|
+--------------+------+----+
|             1|100830|1000|
|             2|  2000|   2|
|             3|   555|  55|
+--------------+------+----+


+---+
| _1|
+---+
|  H|
|  H|
|  H|
+---+



pyspark rdd
1个回答
0
投票

导入

lit()
功能并更换线路

df=df.rdd.map(lambda x :val()).toDF()

df = df.withColumn("v3", lit("H"))

注意:您可以在

lit()
函数中传递任何常量值以满足您的要求。

输出:

+--------------+------+----+---+
|account_number|    v1|  v2| v3|
+--------------+------+----+---+
|             1|100830|1000|  H|
|             2|  2000|   2|  H|
|             3|   555|  55|  H|
+--------------+------+----+---+
© www.soinside.com 2019 - 2024. All rights reserved.