我有一个bash脚本,每晚都会启动我的独立EC2 Spark集群并执行一个应用程序。我希望在应用程序完成后注意到我可以停止群集。
我想知道是否存在基于spark应用程序状态的某种回调。
我很新兴,所以另外一个解决这个问题的提示将不胜感激。
谢谢。
更新:
使用http://<master-host>:8080/metrics/master/json
或http://<master-host>:8080/metrics/applications/json
提供的json,我可以获得应用程序的状态(WAITING,RUNNING,FINISHED),但是我无法获得驱动程序的状态,它将告诉您执行是否失败。我确信指标必须有一个特定的配置才能显示,但我找不到它。
为了获得这种状态,我废弃了http://<master-host>:8080
中提供的Web UI,以找到执行我的应用程序的驱动程序并获取其状态。
spark-submit --status $submitID --master $master 2>&1 | grep "success"
免责声明:此示例需要更改代码,具有一些服务布局假设,并使用一些内部Spark类。
在阅读了隐藏的rest-apis并尝试包装SparkSubmit类以获取Future对象之后,我找到了SparkListener类。无论您需要什么粒度,它都有onJobStart / End,onApplicationStart / End等。
以下是应用程序主要方法中Jobs的概念证明:
//... build spark conf
val sparkContext = new SparkContext(sparkConf)
//programmatically register listener
sparkContext.addSparkListener(new SparkListener {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
println(s"[ ${jobStart.jobId} ] Job started.")
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
println(s"[ ${jobEnd.jobId} ] Job completed with Result : ${jobEnd.jobResult}")
//(our other services already rely on ActiveMQ)
val connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616")
val connection = connectionFactory.createConnection
connection.setClientID("Client_" + Math.random())
connection.start
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val sendQueue = session.createQueue("job_queue.spark_job_completed")
val producer = session.createProducer(sendQueue)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
val textMessage = session.createTextMessage( s"""{\"jobId\" : \"${jobEnd.jobId}\", \"jobResult\" : \"${jobEnd.jobResult}\"}""")
producer.send(textMessage)
connection.close
}
//api just sends the time :/
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
println(s"[ ${applicationEnd.time} ] Application Completed.")
}
})
// ... do spark work
我们的团队需要在Spark 1.5.2中完成Spark作业/应用程序时通知外部应用程序。此外,如果没有大量的端口转发,Spark UI就不容易获得,因此它现在可以与先前存在的监视工具集成。
资料来源:
我使用shell / bash脚本来提交spark应用程序,所以这里是我如何使用bash do while循环获取应用程序警报的示例,你应该在shell中进行spark提交。
Spark-submit ........
一旦你做了火花提交,睡了5秒钟
sleep 5s
然后通过下面的do while循环开始检查应用程序的状态,替换为您的应用程序名称: -
current_status=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $6}')
application_number=$(yarn application --list | grep <your_application_name> | sort -n | tail -1 |awk -F' ' '{print $1}')
while true; do
current_status=$(yarn application -status $application_number | sed -n '10p' | awk -F':' '{print $2}')
if [ ${current_status} == "RUNNING" ];then
continue
else
current_status_2=$(yarn application -status $application_number | sed -n '11p' | awk -F':' '{print $2}')
if [ ${current_status_2} == "SUCCEEDED" ];then
echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@ SPARK APPLICATION SUCCEEDED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log
elif [ ${current_status_2} == "FAILED" ];then
echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@ SPARK APPLICATION FAILED WITH $application_number">> /log_folder/logfile`date +"%Y_%m_%d"`.log
elif [ ${current_status_2} == "KILLED" ];then
echo "`date "+%Y_%m_%d-%H:%M:%S"`@@@ SPARK APPLICATION KILLED WITH $application_number ">> /log_folder/logfile`date +"%Y_%m_%d"`.log
break;
fi
fi
sleep 5s
check=$(yarn application -status $application_number | sed -n '11p' | awk -F':' '{print $2}')
if [ ${check} == "UNDEFINED" ];then
continue
else
break;
fi
done
通过访问Spark的内部指标系统,可以解决这个问题。
在terminal命令下面获取当前运行的spark应用程序的指标
curl -X GET "http://<spark master>:4040/metrics/json/"
此命令可以从脚本执行,如果没有正在运行的应用程序,您可以生成警报。