问题 Spark集群在更大的输入上失败,适用于小型


我正在玩Spark。它是来自网站的默认预构建分发版(0.7.0),具有默认配置,群集模式,一个工作者(我的本地主机)。我阅读了有关安装的文档,一切似乎都很好。

我有一个CSV文件(各种大小,1000到100万行)。如果我用小输入文件(例如1000行)运行我的应用程序,一切都很好,程序在几秒钟内完成并产生预期的输出。 但是当我提供更大的文件(100.000行,或100万行)时,执行失败。我试图挖掘日志,但没有多大帮助(它重复整个过程大约9-10次,然后在失败后退出。此外,还有一些与从某些空源获取失败相关的错误)。

第一个JavaRDD返回的结果Iterable对我来说是可疑的。如果我返回一个硬编码的单例列表(如res.add(“something”);返回res;),一切都很好,即使有一百万行。但是如果我添加我想要的所有键(28个字符串,长度为6-20个字符),则该过程失败 只要 有了很大的投入。 问题是,我需要所有这些密钥,这是实际的业务逻辑。

我正在使用Linux amd64,四核,8GB内存。最新的Oracle Java7 JDK。 Spark配置:

SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar

我必须提一下,当我启动该程序时,它说:

13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address

这是我的计划。它基于JavaWordCount示例,经过最低限度的修改。

public final class JavaWordCount
{
    public static void main(final String[] args) throws Exception
    {
        final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
            System.getenv("SPARK_HOME"), new String[] {"....jar" });

        final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {

            @Override
            public Iterable<String> call(final String s)
            {
                // parsing "s" as the line, computation, building res (it's a List<String>)
                return res;
            }
        });

        final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(final String s)
            {
                return new Tuple2<String, Integer>(s, 1);
            }
        });
        final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(final Integer i1, final Integer i2)
            {
                return i1 + i2;
            }
        });

        counts.collect();

        for (Tuple2<?, ?> tuple : counts.collect()) {
            System.out.println(tuple._1 + ": " + tuple._2);
        }
    }
}

11220
2018-05-30 09:26


起源

在更改Spark系统属性之前,您的作业失败了什么异常/错误? - Josh Rosen
在spark-users组中,我得到了答案:.collect()将触发每个(临时)RDD的集合。那是真正的问题。解决方案的线程: stackoverflow.com/questions/16832429/... - gyorgyabraham
我用谷歌搜索了多年,试图找到问题的解决方案,这个问题的答案解决了我的问题,所以请编辑你的问题,在你的问题中加入“org.apache.spark.SparkException:与MapOutputTracker沟通时出错”谷歌将来更容易谷歌搜索。 - samthebest


答案:


我设法通过设置属性来修复它 spark.mesos.coarse 为真。更多信息 这里

更新:我和Spark一起玩了几个小时。这些设置对我有所帮助,但似乎在一台机器上处理大约1000万行文本几乎是不可能的。

System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load

注意:增加帧大小似乎特别有助于防止: org.apache.spark.SparkException: Error communicating with MapOutputTracker


13
2018-05-30 10:56



该 spark.akka.frameSize 也解决了我的问题 org.apache.spark.SparkException: Error communicating with MapOutputTracker 问题。 - samthebest
System.setProperty()也可以在spark-shell中工作吗?我无法获得frameSize设置 - Brian Dolan


在较新的火花版本中,应该使用:

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

根据 http://spark.apache.org/docs/latest/tuning.html#data-serialization


3
2017-08-15 02:40