我一直在试验和谷歌搜索几个小时,没有运气。
我有一个火花流媒体应用程序,在本地火花群中运行良好。现在我需要在cloudera 5.4.4上部署它。我需要能够启动它,让它在后台持续运行,并能够阻止它。
我试过这个:
$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs
但它只是无休止地印刷这些线条。
15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
问题1:因为它是一个流媒体应用程序,它需要连续运行。那么如何在“后台”模式下运行呢?我可以找到在纱线上提交火花作业的所有例子似乎都假设应用程序会做一些工作并终止,因此您希望在前台运行它。但流媒体并非如此。
接下来......此时应用程序似乎无法正常运行。我认为这可能是我的错误或配置错误,所以我试着查看日志以查看发生了什么:
$ yarn logs -applicationId application_1438092860895_012
但它告诉我:
/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.
所以 问题2:如果应用程序正在运行,为什么它没有日志文件?
所以最终我不得不杀了它:
$ yarn application -kill application_1438092860895_012
这会带来 问题3:假设我最终可以在后台启动并运行应用程序,“yarn application -kill”是阻止它的首选方式吗?
我终于找到了安全关闭火花流工作的方法。
- 编写套接字服务器线程等待停止流式上下文
包xxx.xxx.xxx
import java.io. {BufferedReader,InputStreamReader}
import java.net。{ServerSocket,Socket}
import org.apache.spark.streaming.StreamingContext
对象KillServer {
class NetworkService(port:Int,ssc:StreamingContext)扩展Runnable {
val serverSocket = new ServerSocket(port)
def run(){
Thread.currentThread()。setName(“Zhuangdy |在端口等待正常停止”+端口)
while(true){
val socket = serverSocket.accept()
(new Handler(socket,ssc))。run()
}
}
}
class Handler(socket:Socket,ssc:StreamingContext)extends Runnable {
def run(){
val reader = new InputStreamReader(socket.getInputStream)
val br = new BufferedReader(reader)
if(br.readLine()==“kill”){
ssc.stop(true,true)
}
br.close();
}
}
def run(port:Int,ssc:StreamingContext):Unit = {
(新的NetworkService(port,ssc))。运行
}
}
在你的 main
启动流上下文的方法,添加以下代码
ssc.start()
KillServer.run(11212,ssc)
ssc.awaitTermination()
写spark-submit将作业提交给yarn,并将输出直接输出到稍后将使用的文件
spark-submit --class“com.Mainclass”\
--conf“spark.streaming.stopGracefullyOnShutdown = true”\
--master yarn-cluster --queue“root”\
- 部署模式集群
--executor-cores 4 --num-executors 8 --executor-memory 3G \
hdfs:///xxx.jar>输出2>&1&
- 最后,安全关闭火花流工作而不会丢失数据或计算结果不会持续! (用于正常停止流上下文的服务器套接字正在驱动程序上运行,因此您将grep第3步的输出以获取驱动程序地址,并使用echo nc发送套接字kill命令)
#!/斌/庆典
driver =`cat output | grep ApplicationMaster | grep -Po'\ d +。\ d +。\ d +。\ d +'`
echo“kill”| nc $ driver 11212
driverid =`纱线应用程序 - 列表2>&1 | grep ad.Stat | grep -Po'application_ \ d + _ \ d +'`
纱线应用-kill $ driverid
最后一个难题是如何以优雅的方式停止在YARN上部署的Spark Streaming应用程序。停止(或者说杀死)YARN应用程序的标准方法是使用命令 yarn application -kill [applicationId]
。此命令会停止Spark Streaming应用程序,但这可能发生在批处理中。因此,如果作业从Kafka读取数据,将处理结果保存在HDFS上并最终提交Kafka偏移量,那么当作业在提交偏移之前停止时,您应该期待HDFS上的重复数据。
解决正常关闭问题的第一个尝试是在关闭钩子中调用Spark流上下文停止方法。
sys.addShutdownHook {
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}
令人失望的是,一个关闭挂钩调用太晚,无法完成启动批处理,Spark应用程序几乎立即被杀死。此外,无法保证JVM根本不会调用关闭挂钩。
在撰写此博客文章时,唯一确认的方法是在YARN上正常关闭Spark Streaming应用程序,以某种方式通知应用程序有关计划关闭,然后以编程方式停止流式传输(但不是从关闭挂钩)。命令 yarn application -kill
如果通知的应用程序在定义的超时后没有停止,则应仅用作最后的手段。
可以使用HDFS上的标记文件(最简单的方法)或使用驱动程序上公开的简单Socket / HTTP端点(复杂方式)通知应用程序计划关闭。
因为我喜欢KISS原理,下面你可以找到用于使用标记文件启动/停止Spark Streaming应用程序的shell脚本伪代码:
start() {
hdfs dfs -touchz /path/to/marker/my_job_unique_name
spark-submit ...
}
stop() {
hdfs dfs -rm /path/to/marker/my_job_unique_name
force_kill=true
application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
for i in `seq 1 10`; do
application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
if [ -n "$application_status" ]; then
sleep 60s
else
force_kill=false
break
fi
done
$force_kill && yarn application -kill ${application_id}
}
在Spark Streaming应用程序中,后台线程应该监视标记文件,当文件消失时停止上下文调用
streamingContext.stop(stopSparkContext = true, stopGracefully = true)
你也可以参考 http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html