使用参数自动完成PySpark SQL脚本

问题描述 投票:0回答:1

我有一个PySpark Sql脚本,需要每天运行,我想把所需的参数传递给脚本,并在脚本中的SQL查询中使用它们。

例如:以下是我想传递给脚本的参数,并在脚本中使用它们。

my_st_dt='2019-02-04'
my_end_dt='2019-02-10'
mth_yyyymm='201902'
my_partition_dt='20190204'
my_table_name='table_1'
my_path='hdfs:///abcd/efgh/ijkl/mnop'

my_query1='''
SELECT * FROM parquet.`{my_file_path}/{my_table}/data`
WHERE {my_partition_name} = {partition} AND (my_date >= '{partition_st_dt}' AND 
       my_date <= '{partition_end_dt}') 
'''.format(my_file_path=my_path,my_table=my_table_name,my_partition_name='my_yyyymm', \
           partition=mth_yyyymm,partition_st_dt=my_st_dt, partition_end_dt=my_end_dt)

我的脚本中有几个类似上面的查询。谁能告诉我写这段代码的有效方法,使我不必每次运行时都要编辑脚本?如果除了Python的".format "之外还有其他的选择,那么也请告诉我。先谢谢你了。

pyspark-sql
1个回答
0
投票

我们有两种方法可以做到这一点。

方法一:使用argsparse 并在执行脚本时传递所有需要的变量。

parser = argparse.ArgumentParser(
        description="Usage: spark2-submit ens_load_pb_sdps.py <sdp_name>")
parser.add_argument(
    "-start_date", "--start_date"
    type=date,
    help='start date for query')
parser.add_argument(
    "end_date", "--end_date"
    type=date,
    help='start date for query')
parser.add_argument(
    "-table_name", "--table_name"
    type=date,
    help='start date for query')
  parser.add_argument(
    "-file_path", "--file_path"
    type=date,
    help='start date for query')
parser.add_argument(
    "partition_name", "--partition_name"
    type=date,
    help='start date for query')
parser.add_argument(
    "-partition_value", "--partition_value"
    type=date,
    help='start date for query')

args = parser.parse_args()

start_date, end_date, table_name, file_path, partition_name, partition_value = args.start_date, args.end_date, args.table_name, args.file_path, args.partition_name, args.partition_value

query_stmt="""
SELECT * FROM parquet.{file_path}/{table_name}/data
WHERE {partition_name} = {partition_value} AND (my_date >= '{start_date}' AND 
       my_date <= '{end_date}')
""".format(file_path=file_path, table_name=table_name, partition_name=partition_name, \
           partition_value=partition_value, start_date=start_date, end_date=end_date)

方法2:使用conf文件另外,在执行脚本的过程中,我们也可以将该值保存在conf json文件中,然后将该json文件加载到python dict中,然后在形成查询时使用它。

{
  "start_date": "2019-01-01",
  "end_date": "2020-01-01",
  "partition_name": "name",
  "partition_value": "John Doe",
  "file_path": "C://files/dummyPathh/",
  "table_name": "dummyTable"
}

加载conf文件的Python代码-

file_path = "C://path/to/config" // path to conf file
json_file = glob.glob(file_path)
with open(json_file, 'r', encoding="utf8") as file:
    conf_object = json.load(file)

start_date, end_date, table_name, file_path, partition_name, partition_value = conf_object.start_date, conf_object.end_date, conf_object.table_name, conf_object.file_path, conf_object.partition_name, conf_object.partition_value

query_stmt="""
SELECT * FROM parquet.{file_path}/{table_name}/data
WHERE {partition_name} = {partition_value} AND (my_date >= '{start_date}' AND 
        my_date <= '{end_date}')
""".format(file_path=file_path, table_name=table_name, partition_name=partition_name, \
            partition_value=partition_value, start_date=start_date, end_date=end_date)

在运行不同的查询时,你可以改变conf文件中的值,为了更进一步,你可以制作一个嵌套的dict json conf文件,然后使用它,这样你就不必在每次运行查询时编辑conf文件。

© www.soinside.com 2019 - 2024. All rights reserved.