完成所有作业后自动关闭Google Dataproc群集

问题描述 投票:5回答:6

所有作业完成后,如何以编程方式自动关闭Google Dataproc群集?

Dataproc provides的创建,监控和管理。但似乎我无法找到如何删除群集。

google-cloud-platform google-cloud-dataproc
6个回答
3
投票

gcloud dataproc CLI界面提供max-idle选项。这会在x个不活动量(即没有正在运行的作业)之后自动杀死Dataproc群集。它可以使用如下:

gcloud dataproc clusters create test-cluster \
    --project my-test-project \
    --zone europe-west1-b \
    --master-machine-type n1-standard-4 \
    --master-boot-disk-size 100 \
    --num-workers 2 \
    --worker-machine-type n1-standard-4 \
    --worker-boot-disk-size 100 \
    --max-idle 1h

1
投票

这取决于语言。就个人而言,我使用Python(pyspark),这里提供的代码对我来说很好:

https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/dataproc/submit_job_to_cluster.py

您可能需要根据自己的目的调整代码,并遵循README文件(https://github.com/GoogleCloudPlatform/python-docs-samples/tree/master/dataproc)中指定的先决条件步骤,例如启用API并在requirements.txt中安装软件包。

基本上,使用函数wait_for_job,你等到作业完成,并且使用delete_cluster,正如名称所示,您之前创建的集群将被删除。我希望这可以帮到你。


0
投票

有几种可编程的方法可以自动关闭集群:

  1. 直接打电话给REST api
  2. Use the gcloud CLI

在您的工作完成执行后,可以使用(调用)其中任何一个。

在这里查看更多:https://cloud.google.com/dataproc/docs/guides/manage-cluster#delete_a_cluster


0
投票

要实现这一目标,您有三种选择:

  1. 在群集创建期间设置--max-idle属性(请参阅https://stackoverflow.com/a/54239034/3227693)。
  2. 使用Dataproc Workflow Templates来管理集群生命周期。所有作业完成后,它可以自动创建集群以执行作业并删除集群。
  3. 使用完整的编排解决方案作为Cloud Composer来管理您的集群和作业生命周期。

0
投票

火花应用程序完成后,您可以删除群集。这里有些例子:

private SparkApplication(String[] args) throws
                                        org.apache.commons.cli.ParseException,
                                        IOException,
                                        InterruptedException {

    // Your spark code here

    if (profile != null && profile.equals("gcp")) {
        DataProcUtil.deleteCluster(clusterName);
    }
}

以下是您通过java删除群集的方法

 public static void deleteCluster(String clusterName) throws IOException, InterruptedException {

    logger.info("Try to delete cluster: {}....", clusterName);

    Process process = new ProcessBuilder("gcloud",
                                         "dataproc",
                                         "clusters",
                                         "delete",
                                         clusterName,
                                         "--async",
                                         "--quiet").start();

    int errCode = process.waitFor();
    boolean hasError = (errCode == 0 ? false : true);
    logger.info("Command executed, any errors? {}", hasError);
    String output;
    if (hasError) {
        output = output(process.getErrorStream());
    }
    else {
        output = output(process.getInputStream());
    }

    logger.info("Output: {}", output);

}

private static String output(InputStream inputStream) throws IOException {
    StringBuilder sb = new StringBuilder();

    try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {

        String line;
        while ((line = br.readLine()) != null) {

            sb.append(line)
              .append(System.getProperty("line.separator"));

        }
    }
    return sb.toString();

}

0
投票

您可以使用Scala代码执行此操作:

  • 创建集群
  • 跑完所有的工作
  • 作业终止时删除群集

为此,您可以使用Scala Future。

如果您有许多工作,您可以并行运行它们:

val gcpJarBucket = "gs://test_dataproc/dataproc/Dataproc.jar"
val jobs = Seq("package.class1", "package.class2")
val projectName: String = "automat-dataproc"
val clusterName: String = "your-cluster-name"

val timeout = 180 minute

// Working directory
implicit val wd = pwd

val future = Future {
  println("Creating the spark cluster...")
  % gcloud ("dataproc", "clusters", "create", clusterName, "--subnet", "default", "--zone", "europe-west1-b", "--master-machine-type", "n1-standard-4", "--master-boot-disk-size", "50", "--num-workers", "3", "--worker-machine-type", "n1-standard-4", "--worker-boot-disk-size", "50", "--project", projectName)
  println("Creating the spark cluster...DONE")
}.flatMap { _ =>
  {
    Future.sequence {
      jobs.map { jobClass =>
        Future {
          println(s"Launching the spark job from the class $jobClass...")
          % gcloud ("dataproc", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$gcpJarBucket")
          println(s"Launching the spark job from the class $jobClass...DONE")
        }
      }
    }

  }
}

Try { Await.ready(future, timeout) }.recover { case exp => println(exp) }
% bash ("-c", s"printf 'Y\n' | gcloud dataproc clusters delete $clusterName")
© www.soinside.com 2019 - 2024. All rights reserved.