我有一个包含“main.py”文件的 s3 存储库,其中包含我构建的自定义模块(在“Cache”和“Helpers”内):
我的“main.py”文件看起来像这样:
from pyspark.sql import SparkSession
from spark_main import spark_process
from Cache import redis_main
file_root = 'flexible_dates_emr\parquet_examples\pe_1.parquet'
city_pairs = [('TLV', 'NYC', 'NYC', 'TLV'), ('TLV', 'ROM', 'ROM', 'TLV')]
def main():
spark = SparkSession.builder.appName('Test').getOrCreate()
spark_data = spark_process(spark, file_root, city_pairs)
redis_main.redis_update_from_older_file(spark_data)
print(spark_data)
if __name__ == '__main__':
main()
我有一个 EMR 集群,满足项目的所有要求,运行良好,但是当我尝试导入“spark_process”或“redis_main”等模块时,我的任务失败了。
我猜原因是因为它无法识别模块所在的文件。
如何使用这些模块? 谢谢。
将 main.py 作业提交到 EMR 集群时,它不理解从本地文件导入,因为 s3(文件夹)不是包模块;因此,这里需要将依赖文件传递到 main.py。
与
spark-submit
您可以使用 --py-files s3://<PATH_TO_FILE>/spark_main.py
但是,现在您已经有了一个完整的子模块,对于缓存和助手,可以通过压缩来传递它,只需使用 --py-files s3://<PATH_TO_FILE>/Cache.zip
并确保 zip 中也有 __init__.py
。