我有两个文件。 我有一个文件,我正在创建sparkcontext。
代码是这样的
spark_conf = (SparkConf().setAppName(app_name)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.task.maxFailures", "14")
.set("spark.port.maxRetries", "50")
.set("spark.yarn.max.executor.failures", "14"))
spark_context = SparkContext(conf=spark_conf)
sqlContext=HiveContext(spark_context)
然后是另一个包含所有代码的文件。把它命名为:function_file.py
它必须具有以下功能:该功能只对数据进行一些操作。
def adjust_name(line):
if line is not None:
if "(" in line:
if "\(" in line:
tem1 = line.split("\(")
return tem1[0]
else:
tem1 = line.split("(")
return tem1[0]
else:
return line
else:
return line
现在我们正在创建adjust_name
函数的udf。
adjust=udf(adjust_name,StringType())
我们在process_sql函数中使用这个udf作为
和另一个功能,完成所有表加载和所有。例如
def process_sql(sqlContext,source_db,processing_db,table_name):
.
.
.df3 = df3.withColumn('org_name',trim(adjust(df3['col_name'])))
return table_name.
现在在create_spark.py文件中我将function_file导入为模块。我正在调用process_sql函数
x= function_file.process_sql(sqlContext,source_db,processing_db,table_name)
所有参数都是事先定义的。但我得到的错误如下:
ValueError:无法一次运行多个SparkContexts;现有的SparkContext()由udf在function_file.py中创建
:Connecting to Spark and creating context with dim_emp_atsc_test_4_sept spark_context = SparkContext(conf=spark_conf)
ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=process_handler.py, master=yarn-client) created by udf at ..
我认为这个问题与HiveContext
和SparkContext
有关。
尝试只使用其中一个或在创建HiveContext
时传递SparkContext
作为构造函数参数。