问题 在纱线中启动/停止火花流工作的正确方法是什么?


我一直在试验和谷歌搜索几个小时,没有运气。

我有一个火花流媒体应用程序,在本地火花群中运行良好。现在我需要在cloudera 5.4.4上部署它。我需要能够启动它,让它在后台持续运行,并能够阻止它。

我试过这个:

$ spark-submit --master yarn-cluster --class MyMain my.jar myArgs

但它只是无休止地印刷这些线条。

15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)

问题1:因为它是一个流媒体应用程序,它需要连续运行。那么如何在“后台”模式下运行呢?我可以找到在纱线上提交火花作业的所有例子似乎都假设应用程序会做一些工作并终止,因此您希望在前台运行它。但流媒体并非如此。

接下来......此时应用程序似乎无法正常运行。我认为这可能是我的错误或配置错误,所以我试着查看日志以查看发生了什么:

$ yarn logs -applicationId application_1438092860895_012

但它告诉我:

/tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.

所以 问题2:如果应用程序正在运行,为什么它没有日志文件?

所以最终我不得不杀了它:

$ yarn application -kill application_1438092860895_012

这会带来 问题3:假设我最终可以在后台启动并运行应用程序,“yarn application -kill”是阻止它的首选方式吗?


4210
2017-07-28 18:25


起源



答案:


  1. 你可以关闭 spark-submit 安慰。当写出时,作业已在后台运行 RUNNING 州。
  2. 日志就在后面可见 申请完成。在运行期间,所有日志都可以在本地工作节点直接访问(您可以在YARN资源管理器Web UI上看到)并聚合到HDFS 工作完成后
  3. yarn application -kill 可能是阻止Spark流应用程序的最佳方法,但它并不完美。做一些会更好 优雅的关机 停止所有流接收器并停止流式传输上下文,但我个人不知道该怎么做。

8
2017-07-29 08:38



我和Keven有同样的问题,但你的答案1对我来说似乎不起作用。我有一个python流应用程序。当我将它提交给我的独立火花时,它打印出信息日志并打印'app-20160403171906-0003 / 0现在正在运行'但是我无法退出提交。 - vutran


答案:


  1. 你可以关闭 spark-submit 安慰。当写出时,作业已在后台运行 RUNNING 州。
  2. 日志就在后面可见 申请完成。在运行期间,所有日志都可以在本地工作节点直接访问(您可以在YARN资源管理器Web UI上看到)并聚合到HDFS 工作完成后
  3. yarn application -kill 可能是阻止Spark流应用程序的最佳方法,但它并不完美。做一些会更好 优雅的关机 停止所有流接收器并停止流式传输上下文,但我个人不知道该怎么做。

8
2017-07-29 08:38



我和Keven有同样的问题,但你的答案1对我来说似乎不起作用。我有一个python流应用程序。当我将它提交给我的独立火花时,它打印出信息日志并打印'app-20160403171906-0003 / 0现在正在运行'但是我无法退出提交。 - vutran


我终于找到了安全关闭火花流工作的方法。

  1. 编写套接字服务器线程等待停止流式上下文
    包xxx.xxx.xxx

    import java.io. {BufferedReader,InputStreamReader}
    import java.net。{ServerSocket,Socket}

    import org.apache.spark.streaming.StreamingContext

    对象KillServer {

      class NetworkService(port:Int,ssc:StreamingContext)扩展Runnable {
        val serverSocket = new ServerSocket(port)

        def run(){
          Thread.currentThread()。setName(“Zhuangdy |在端口等待正常停止”+端口)
          while(true){
            val socket = serverSocket.accept()
            (new Handler(socket,ssc))。run()
          }
        }
      }

      class Handler(socket:Socket,ssc:StreamingContext)extends Runnable {
        def run(){
          val reader = new InputStreamReader(socket.getInputStream)
          val br = new BufferedReader(reader)
          if(br.readLine()==“kill”){
            ssc.stop(true,true)
          }
          br.close();
        }
      }

      def run(port:Int,ssc:StreamingContext):Unit = {
        (新的NetworkService(port,ssc))。运行
      }
    }
  1. 在你的 main 启动流上下文的方法,添加以下代码

    ssc.start()
    KillServer.run(11212,ssc)
    ssc.awaitTermination()
  2. 写spark-submit将作业提交给yarn,并将输出直接输出到稍后将使用的文件

    spark-submit --class“com.Mainclass”\
            --conf“spark.streaming.stopGracefullyOnShutdown = true”\
            --master yarn-cluster --queue“root”\
             - 部署模式集群
            --executor-cores 4 --num-executors 8 --executor-memory 3G \
            hdfs:///xxx.jar>输出2>&1&

  1. 最后,安全关闭火花流工作而不会丢失数据或计算结果不会持续! (用于正常停止流上下文的服务器套接字正在驱动程序上运行,因此您将grep第3步的输出以获取驱动程序地址,并使用echo nc发送套接字kill命令)

    #!/斌/庆典
    driver =`cat output | grep ApplicationMaster | grep -Po'\ d +。\ d +。\ d +。\ d +'`
    echo“kill”| nc $ driver 11212
    driverid =`纱线应用程序 - 列表2>&1 | grep ad.Stat | grep -Po'application_ \ d + _ \ d +'`
    纱线应用-kill $ driverid


2
2017-07-27 09:58



虽然这可能有用,但我后来才知道“纱线应用程序 - 杀戮”会向你的应用程序发送一个sigint,你可以优雅地处理和关闭它。例如在scala中:sys.ShutdownHookThread {LOGGER.info(“Stopping spark context ...”)ssc.stop(stopSparkContext = true,stopGracefully = true)LOGGER.info(“Stopped”)} - Kevin Pauli
它看起来像 sys.ShutdownHookThread 方法停止工作Spark 1.5。我可以验证它在Spark 1.6.1中不起作用。 - zznq


  1. 你的数据来源是什么?如果它是可靠的,像卡夫卡直接接收器,纱线杀死关闭应该没问题。当您的应用程序重新启动时,它将从上一个完整的批处理偏移中读取如果数据源不可靠,或者您想自己处理正常关闭,则必须在流上下文中实现某种外部挂钩。我遇到了同样的问题,我最终实现了一个小的黑客,在webui中添加一个新的选项卡作为停止按钮。

1
2017-07-29 15:40





最后一个难题是如何以优雅的方式停止在YARN上部署的Spark Streaming应用程序。停止(或者说杀死)YARN应用程序的标准方法是使用命令 yarn application -kill [applicationId]。此命令会停止Spark Streaming应用程序,但这可能发生在批处理中。因此,如果作业从Kafka读取数据,将处理结果保存在HDFS上并最终提交Kafka偏移量,那么当作业在提交偏移之前停止时,您应该期待HDFS上的重复数据。

解决正常关闭问题的第一个尝试是在关闭钩子中调用Spark流上下文停止方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

令人失望的是,一个关闭挂钩调用太晚,无法完成启动批处理,Spark应用程序几乎立即被杀死。此外,无法保证JVM根本不会调用关闭挂钩。

在撰写此博客文章时,唯一确认的方法是在YARN上正常关闭Spark Streaming应用程序,以某种方式通知应用程序有关计划关闭,然后以编程方式停止流式传输(但不是从关闭挂钩)。命令 yarn application -kill 如果通知的应用程序在定义的超时后没有停止,则应仅用作最后的手段。

可以使用HDFS上的标记文件(最简单的方法)或使用驱动程序上公开的简单Socket / HTTP端点(复杂方式)通知应用程序计划关闭。

因为我喜欢KISS原理,下面你可以找到用于使用标记文件启动/停止Spark Streaming应用程序的shell脚本伪代码:

start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

在Spark Streaming应用程序中,后台线程应该监视标记文件,当文件消失时停止上下文调用

streamingContext.stop(stopSparkContext = true, stopGracefully = true)

你也可以参考 http://blog.parseconsulting.com/2017/02/how-to-shutdown-spark-streaming-job.html


0
2017-11-14 06:25