安排一个python文件每周自动运行

问题描述 投票:0回答:1

我的Python代码涉及从Azure Blob存储中获取文件,对该文件进行更改,然后将其发送回Azure存储。我有一个工作代码来完成这项任务。现在我需要安排它每周在特定时间运行。最好使用的工具是什么,我可以在其中简单地上传我的 ipynb 文件并安排它?我尝试使用 Power Automate,但它没有上传 ipynb 文件的选项。有人可以帮忙吗?另外,我可以获得有关如何使用该工具来实现我想要的目标的步骤吗?我尝试研究所有选项,但没有找到适合我的任务的完美教程。使用 VS Code 研究了 Azure Data Functions,但不知道步骤。

python scheduled-tasks azure-automation ipynb
1个回答
0
投票

您可以使用 Azure Batch 服务,当您的 Python 代码从 Blob 存储中选取文件进行更改并将其发送回同一存储时。

将您的 Python 代码文件添加到存储帐户中,我们的 Azure 批处理服务将从中选择它作为资源,并在您选择的可以部署的池中运行它。然后,您可以使用作业调度程序来安排根据您的时间运行代码的作业。 Azure 批处理服务使用更少的计算来运行代码。

在存储帐户容器中添加 python 脚本,并为上传的 python 脚本 blob 创建 SAS 令牌 URL:-

enter image description here

将此存储帐户链接到您的批处理服务:-

enter image description here

使用 Linux 操作系统创建池,然后创建作业计划:-

enter image description here

在计划中选择您要运行作业的时间:-

enter image description here

选择作业配置(更新)> 从上面创建的池中添加池 ID。并在命令行中添加以下命令:-

/bin/bash -c "pip install azure-storage-blob && python scheduler.py"

enter image description here

Resource select Http url > in Value add the Storage blob with SAS Token URL > In File path add scheduler.py

enter image description here

scheduler.py

from azure.storage.blob import BlobServiceClient

# Azure Storage account details
account_name = 'silistrg89'
account_key = 'xxxxxxxx'
container_name = 'data'
blob_name = 'Hello test.txt'

# Connect to the Azure Blob service
blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)

# Get a specific blob
blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

# Download the blob content
blob_data = blob_client.download_blob()
blob_text = blob_data.readall()

# Modify the content (for example, appending a string)
modified_text = blob_text + b"\nModified content appended."

# Upload the modified content back to Azure Blob storage
blob_client.upload_blob(modified_text, overwrite=True)

print("File modified and uploaded successfully.")

已创建工作计划:-

enter image description here

如果您想在 Azure 函数中实现此功能,您需要创建一个计时器触发器函数,请参阅此处。在计时器触发器中,您需要每周使用一次 CRON 表达式到特定时间。

并在您的 function_app.py 中使用此代码:-

import logging
import azure.functions as func
from azure.storage.blob import BlobServiceClient

app = func.FunctionApp()

@app.schedule(schedule="0 * * * * *", arg_name="myTimer", run_on_startup=True,
              use_monitor=False) 
def timer_trigger(myTimer: func.TimerRequest) -> None:
    # Azure Storage account details
    account_name = 'silistrg89'
    account_key = 'GLw7xxxxxxxxxx'
    container_name = 'data'
    blob_name = 'Hello test.txt'

    # Connect to the Azure Blob service
    blob_service_client = BlobServiceClient(account_url=f"https://{account_name}.blob.core.windows.net", credential=account_key)

    # Get a specific blob
    blob_client = blob_service_client.get_blob_client(container=container_name, blob=blob_name)

    # Download the blob content
    blob_data = blob_client.download_blob()
    blob_text = blob_data.readall()

    # Modify the content (for example, appending a string)
    modified_text = blob_text + b"\nModified content appended."

    # Upload the modified content back to Azure Blob storage
    blob_client.upload_blob(modified_text, overwrite=True)

    print("File modified and uploaded successfully.")

本地输出:-

enter image description here

将其部署在您的 Azure Function App 中,该函数将在 CRON 表达式指定的时间自动触发:-

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.