所有作业完成后,如何以编程方式自动关闭Google Dataproc群集?
Dataproc provides的创建,监控和管理。但似乎我无法找到如何删除群集。
gcloud dataproc
CLI界面提供max-idle
选项。这会在x个不活动量(即没有正在运行的作业)之后自动杀死Dataproc群集。它可以使用如下:
gcloud dataproc clusters create test-cluster \
--project my-test-project \
--zone europe-west1-b \
--master-machine-type n1-standard-4 \
--master-boot-disk-size 100 \
--num-workers 2 \
--worker-machine-type n1-standard-4 \
--worker-boot-disk-size 100 \
--max-idle 1h
这取决于语言。就个人而言,我使用Python(pyspark),这里提供的代码对我来说很好:
您可能需要根据自己的目的调整代码,并遵循README文件(https://github.com/GoogleCloudPlatform/python-docs-samples/tree/master/dataproc)中指定的先决条件步骤,例如启用API并在requirements.txt
中安装软件包。
基本上,使用函数wait_for_job
,你等到作业完成,并且使用delete_cluster
,正如名称所示,您之前创建的集群将被删除。我希望这可以帮到你。
有几种可编程的方法可以自动关闭集群:
gcloud
CLI在您的工作完成执行后,可以使用(调用)其中任何一个。
在这里查看更多:https://cloud.google.com/dataproc/docs/guides/manage-cluster#delete_a_cluster
要实现这一目标,您有三种选择:
--max-idle
属性(请参阅https://stackoverflow.com/a/54239034/3227693)。火花应用程序完成后,您可以删除群集。这里有些例子:
private SparkApplication(String[] args) throws
org.apache.commons.cli.ParseException,
IOException,
InterruptedException {
// Your spark code here
if (profile != null && profile.equals("gcp")) {
DataProcUtil.deleteCluster(clusterName);
}
}
以下是您通过java删除群集的方法
public static void deleteCluster(String clusterName) throws IOException, InterruptedException {
logger.info("Try to delete cluster: {}....", clusterName);
Process process = new ProcessBuilder("gcloud",
"dataproc",
"clusters",
"delete",
clusterName,
"--async",
"--quiet").start();
int errCode = process.waitFor();
boolean hasError = (errCode == 0 ? false : true);
logger.info("Command executed, any errors? {}", hasError);
String output;
if (hasError) {
output = output(process.getErrorStream());
}
else {
output = output(process.getInputStream());
}
logger.info("Output: {}", output);
}
private static String output(InputStream inputStream) throws IOException {
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = br.readLine()) != null) {
sb.append(line)
.append(System.getProperty("line.separator"));
}
}
return sb.toString();
}
您可以使用Scala代码执行此操作:
为此,您可以使用Scala Future。
如果您有许多工作,您可以并行运行它们:
val gcpJarBucket = "gs://test_dataproc/dataproc/Dataproc.jar"
val jobs = Seq("package.class1", "package.class2")
val projectName: String = "automat-dataproc"
val clusterName: String = "your-cluster-name"
val timeout = 180 minute
// Working directory
implicit val wd = pwd
val future = Future {
println("Creating the spark cluster...")
% gcloud ("dataproc", "clusters", "create", clusterName, "--subnet", "default", "--zone", "europe-west1-b", "--master-machine-type", "n1-standard-4", "--master-boot-disk-size", "50", "--num-workers", "3", "--worker-machine-type", "n1-standard-4", "--worker-boot-disk-size", "50", "--project", projectName)
println("Creating the spark cluster...DONE")
}.flatMap { _ =>
{
Future.sequence {
jobs.map { jobClass =>
Future {
println(s"Launching the spark job from the class $jobClass...")
% gcloud ("dataproc", "jobs", "submit", "spark", s"--cluster=$clusterName", s"--class=$jobClass", "--region=global", s"--jars=$gcpJarBucket")
println(s"Launching the spark job from the class $jobClass...DONE")
}
}
}
}
}
Try { Await.ready(future, timeout) }.recover { case exp => println(exp) }
% bash ("-c", s"printf 'Y\n' | gcloud dataproc clusters delete $clusterName")