Spark应用程序完成回调

问题描述 投票:6回答:4

我有一个bash脚本,每晚都会启动我的独立EC2 Spark集群并执行一个应用程序。我希望在应用程序完成后注意到我可以停止群集。

我想知道是否存在基于spark应用程序状态的某种回调。

我很新兴,所以另外一个解决这个问题的提示将不胜感激。

谢谢。

更新:

使用http://<master-host>:8080/metrics/master/jsonhttp://<master-host>:8080/metrics/applications/json提供的json,我可以获得应用程序的状态(WAITING,RUNNING,FINISHED),但是我无法获得驱动程序的状态,它将告诉您执行是否失败。我确信指标必须有一个特定的配置才能显示,但我找不到它。

为了获得这种状态,我废弃了http://<master-host>:8080中提供的Web UI,以找到执行我的应用程序的驱动程序并获取其状态。

scala amazon-ec2 hdfs apache-spark
4个回答
2
投票
spark-submit --status $submitID --master $master 2>&1 | grep "success"

1
投票

免责声明:此示例需要更改代码,具有一些服务布局假设,并使用一些内部Spark类。

在阅读了隐藏的rest-apis并尝试包装SparkSubmit类以获取Future对象之后,我找到了SparkListener类。无论您需要什么粒度,它都有onJobStart / End,onApplicationStart / End等。

以下是应用程序主要方法中Jobs的概念证明:

//... build spark conf
val sparkContext = new SparkContext(sparkConf)
//programmatically register listener
sparkContext.addSparkListener(new SparkListener {

  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    println(s"[ ${jobStart.jobId} ] Job started.")
  }

  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    println(s"[ ${jobEnd.jobId} ] Job completed with Result : ${jobEnd.jobResult}")
    //(our other services already rely on ActiveMQ)
    val connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616")
    val connection = connectionFactory.createConnection
    connection.setClientID("Client_" + Math.random())
    connection.start

    val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
    val sendQueue = session.createQueue("job_queue.spark_job_completed")

    val producer = session.createProducer(sendQueue)
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)

    val textMessage = session.createTextMessage( s"""{\"jobId\" : \"${jobEnd.jobId}\", \"jobResult\" : \"${jobEnd.jobResult}\"}""")

    producer.send(textMessage)

    connection.close
  }

  //api just sends the time :/
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
    println(s"[ ${applicationEnd.time} ] Application Completed.")
  }
})
// ... do spark work

我们的团队需要在Spark 1.5.2中完成Spark作业/应用程序时通知外部应用程序。此外,如果没有大量的端口转发,Spark UI就不容易获得,因此它现在可以与先前存在的监视工具集成。

资料来源:

  1. https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/exercises/spark-exercise-custom-scheduler-listener.html
  2. https://myadventuresincoding.wordpress.com/2011/10/15/jms-how-to-do-synchronous-messaging-with-activemq-in-scala/
  3. http://arturmkrtchyan.com/apache-spark-hidden-rest-api

1
投票

我使用shell / bash脚本来提交spark应用程序,所以这里是我如何使用bash do while循环获取应用程序警报的示例,你应该在shell中进行spark提交。

Spark-submit ........

一旦你做了火花提交,睡了5秒钟

sleep 5s

然后通过下面的do while循环开始检查应用程序的状态,替换为您的应用程序名称: -

 current_status=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $6}')
application_number=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $1}')

while true; do
  current_status=$(yarn application -status $application_number | sed -n '10p' | awk -F':' '{print $2}')


if [ ${current_status} == "RUNNING" ];then
continue
else
current_status_2=$(yarn application -status $application_number  | sed -n '11p' | awk -F':' '{print $2}')

    if [ ${current_status_2} == "SUCCEEDED" ];then
    echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@  SPARK APPLICATION SUCCEEDED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log

    elif [ ${current_status_2} == "FAILED" ];then
    echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@  SPARK APPLICATION FAILED WITH $application_number">> /log_folder/logfile`date +"%Y_%m_%d"`.log

    elif [ ${current_status_2} == "KILLED" ];then
    echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@  SPARK APPLICATION KILLED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log

    break;
    fi

fi
sleep 5s
  check=$(yarn application -status $application_number | sed -n '11p' | awk -F':' '{print $2}')
if [ ${check} == "UNDEFINED" ];then
continue
else
break;
fi

done

0
投票

通过访问Spark的内部指标系统,可以解决这个问题。

在terminal命令下面获取当前运行的spark应用程序的指标

curl -X GET "http://<spark master>:4040/metrics/json/"

此命令可以从脚本执行,如果没有正在运行的应用程序,您可以生成警报。

© www.soinside.com 2019 - 2024. All rights reserved.