我有一个PySpark Sql脚本,需要每天运行,我想把所需的参数传递给脚本,并在脚本中的SQL查询中使用它们。
例如:以下是我想传递给脚本的参数,并在脚本中使用它们。
my_st_dt='2019-02-04'
my_end_dt='2019-02-10'
mth_yyyymm='201902'
my_partition_dt='20190204'
my_table_name='table_1'
my_path='hdfs:///abcd/efgh/ijkl/mnop'
my_query1='''
SELECT * FROM parquet.`{my_file_path}/{my_table}/data`
WHERE {my_partition_name} = {partition} AND (my_date >= '{partition_st_dt}' AND
my_date <= '{partition_end_dt}')
'''.format(my_file_path=my_path,my_table=my_table_name,my_partition_name='my_yyyymm', \
partition=mth_yyyymm,partition_st_dt=my_st_dt, partition_end_dt=my_end_dt)
我的脚本中有几个类似上面的查询。谁能告诉我写这段代码的有效方法,使我不必每次运行时都要编辑脚本?如果除了Python的".format "之外还有其他的选择,那么也请告诉我。先谢谢你了。
我们有两种方法可以做到这一点。
方法一:使用argsparse 并在执行脚本时传递所有需要的变量。
parser = argparse.ArgumentParser(
description="Usage: spark2-submit ens_load_pb_sdps.py <sdp_name>")
parser.add_argument(
"-start_date", "--start_date"
type=date,
help='start date for query')
parser.add_argument(
"end_date", "--end_date"
type=date,
help='start date for query')
parser.add_argument(
"-table_name", "--table_name"
type=date,
help='start date for query')
parser.add_argument(
"-file_path", "--file_path"
type=date,
help='start date for query')
parser.add_argument(
"partition_name", "--partition_name"
type=date,
help='start date for query')
parser.add_argument(
"-partition_value", "--partition_value"
type=date,
help='start date for query')
args = parser.parse_args()
start_date, end_date, table_name, file_path, partition_name, partition_value = args.start_date, args.end_date, args.table_name, args.file_path, args.partition_name, args.partition_value
query_stmt="""
SELECT * FROM parquet.{file_path}/{table_name}/data
WHERE {partition_name} = {partition_value} AND (my_date >= '{start_date}' AND
my_date <= '{end_date}')
""".format(file_path=file_path, table_name=table_name, partition_name=partition_name, \
partition_value=partition_value, start_date=start_date, end_date=end_date)
方法2:使用conf文件另外,在执行脚本的过程中,我们也可以将该值保存在conf json文件中,然后将该json文件加载到python dict中,然后在形成查询时使用它。
{
"start_date": "2019-01-01",
"end_date": "2020-01-01",
"partition_name": "name",
"partition_value": "John Doe",
"file_path": "C://files/dummyPathh/",
"table_name": "dummyTable"
}
加载conf文件的Python代码-
file_path = "C://path/to/config" // path to conf file
json_file = glob.glob(file_path)
with open(json_file, 'r', encoding="utf8") as file:
conf_object = json.load(file)
start_date, end_date, table_name, file_path, partition_name, partition_value = conf_object.start_date, conf_object.end_date, conf_object.table_name, conf_object.file_path, conf_object.partition_name, conf_object.partition_value
query_stmt="""
SELECT * FROM parquet.{file_path}/{table_name}/data
WHERE {partition_name} = {partition_value} AND (my_date >= '{start_date}' AND
my_date <= '{end_date}')
""".format(file_path=file_path, table_name=table_name, partition_name=partition_name, \
partition_value=partition_value, start_date=start_date, end_date=end_date)
在运行不同的查询时,你可以改变conf文件中的值,为了更进一步,你可以制作一个嵌套的dict json conf文件,然后使用它,这样你就不必在每次运行查询时编辑conf文件。