问题 PySpark投掷错误方法__getnewargs __([])不存在


我有一组文件。文件的路径保存在文件中,例如“all_files.txt”。使用apache spark,我需要对所有文件进行操作并对结果进行管理。

我想要做的步骤是:

  • 通过阅读“all_files.txt”创建RDD
  • 对于“all_files.txt”中的每一行(每行是某个文件的路径), 将每个文件的内容读入单个RDD
  • 然后做一个操作所有内容

这是我为此写的代码:

def return_contents_from_file (file_name):
    return spark.read.text(file_name).rdd.map(lambda  r: r[0])

def run_spark():
    file_name = 'path_to_file'

    spark = SparkSession \
        .builder \
        .appName("PythonWordCount") \
        .getOrCreate()

    counts = spark.read.text(file_name).rdd.map(lambda r: r[0]) \ # this line is supposed to return the paths to each file
        .flatMap(return_contents_from_file) \ # here i am expecting to club all the contents of all files
        .flatMap(do_operation_on_each_line_of_all_files) # here i am expecting do an operation on each line of all files

这是抛出错误:

第323行,在get_return_value中py4j.protocol.Py4JError:错误   在拨打o25时发生了。getnewargs。跟踪:py4j.Py4JException:   方法 getnewargs([])不存在于   py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)     在   py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)     在py4j.Gateway.invoke(Gateway.java:272)at   py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)     在py4j.commands.CallCommand.execute(CallCommand.java:79)at   py4j.GatewayConnection.run(GatewayConnection.java:214)at   java.lang.Thread.run(Thread.java:745)

有人可以告诉我我做错了什么以及我应该如何进一步。提前致谢。


12850
2017-11-07 16:57


起源



答案:


运用 spark 内 flatMap 或者不允许在遗嘱执行人身上发生的任何转变(spark 会话仅适用于驱动程序)。也无法创建RDD的RDD(参见: 是否可以在Apache Spark中创建嵌套的RDD?

但是你可以用另一种方式实现这种转变 - 阅读所有内容 all_files.txt 进入数据帧,使用 本地  map 使他们成为数据帧和 本地  reduce 结合所有,见例子:

>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)

14
2017-11-07 17:33



谢谢您的回复。但是我如何并行化整个过程呢?不会映射(lambda r:spark.read.text(r [0]),文件名)序列化整个过程? - UnderWood
读取文件的过程并行运行,唯一的序列化部分是构建执行计划。试试看! - Mariusz


答案:


运用 spark 内 flatMap 或者不允许在遗嘱执行人身上发生的任何转变(spark 会话仅适用于驱动程序)。也无法创建RDD的RDD(参见: 是否可以在Apache Spark中创建嵌套的RDD?

但是你可以用另一种方式实现这种转变 - 阅读所有内容 all_files.txt 进入数据帧,使用 本地  map 使他们成为数据帧和 本地  reduce 结合所有,见例子:

>>> filenames = spark.read.text('all_files.txt').collect()
>>> dataframes = map(lambda r: spark.read.text(r[0]), filenames)
>>> all_lines_df = reduce(lambda df1, df2: df1.unionAll(df2), dataframes)

14
2017-11-07 17:33



谢谢您的回复。但是我如何并行化整个过程呢?不会映射(lambda r:spark.read.text(r [0]),文件名)序列化整个过程? - UnderWood
读取文件的过程并行运行,唯一的序列化部分是构建执行计划。试试看! - Mariusz