如何使用Python API列出所有数据流作业

问题描述 投票:0回答:3

我的用例涉及获取项目中存在的所有流数据流作业的作业 ID 并取消它。更新我的数据流作业的源并重新运行它。

我正在尝试使用 python 来实现这一点。直到现在我还没有遇到任何有用的文档。 我想到使用 python 的库子进程来执行 gcloud 命令作为解决方法。但我再次无法存储结果并使用它。

有人可以指导我什么是最好的方法吗?

python google-cloud-platform google-cloud-dataflow
3个回答
5
投票

除了直接使用其余 API 之外,您还可以在 google-api-python-client 中使用为 API 生成的 Python 绑定。对于简单的调用,它不会增加那么多的价值,但是当传递许多参数时,它比原始 HTTP 库更容易使用。

使用该库,作业列表调用将如下所示

from googleapiclient.discovery import build
import google.auth
credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
df_service = build('dataflow', 'v1b3', credentials=credentials)
response = df_service.projects().locations().jobs().list(
  project_id=project_id,
  location='<region>').execute()

3
投票

您可以像这样直接使用Dataflow Rest api

    from google.auth.transport.requests import AuthorizedSession
    import google.auth

    base_url = 'https://dataflow.googleapis.com/v1b3/projects/'

    credentials, project_id = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
    project_id = 'PROJECT_ID'
    location = 'europe-west1'
    authed_session = AuthorizedSession(credentials)
    response = authed_session.request('GET', f'{base_url}{project_id}/locations/{location}/jobs')
    print(response.json())

您必须导入 google-auth 依赖项。

您还可以添加查询参数

?filter=ACTIVE
以仅获取与您的流作业匹配的活动数据流。


1
投票

这是您拥有的完整三位一体选项:

  1. 直接使用 REST API(answer@guillaume blaquiere)
  2. 使用 API 发现服务(answer by @danielm)
  3. 使用 google-cloud-dataflow-client python SDK 库(我自己回答)

安装客户端库:

pip install google-cloud-dataflow-client

然后

from google.cloud.dataflow_v1beta3 import JobsV1Beta3Client, ListJobsRequest

client = JobsV1Beta3Client()
request = ListJobsRequest(
    project_id="<PROJECT_ID>",
    location="<LOCATION>", # won't work without location
    filter="ACTIVE", # remove if you also want finished
)
jobs = client.list_jobs(request=request)
for job in jobs:
    print(job)

看起来您只能找到过去 30 天的工作。

© www.soinside.com 2019 - 2024. All rights reserved.