问题 如何使用Python类处理RDD?


我在Spark中将模型实现为python类,每当我尝试将类方法映射到RDD时,它都会失败。我的实际代码更复杂,但这个简化版本是问题的核心:

class model(object):
    def __init__(self):
        self.data = sc.textFile('path/to/data.csv')
        # other misc setup
    def run_model(self):
        self.data = self.data.map(self.transformation_function)
    def transformation_function(self,row):
        row = row.split(',')
        return row[0]+row[1]

现在,如果我像这样运行模型(例如):

test = model()
test.run_model()
test.data.take(10)

我收到以下错误:

例外:您似乎尝试从广播变量,操作或转换引用SparkContext。 SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用。有关更多信息,请参阅SPARK-5063。

我已经玩了一下这个,当我尝试将类方法映射到类中的RDD时,它似乎可靠地发生。我已经确认,如果我在类结构之外实现,映射函数可以正常工作,所以问题肯定与类有关。有办法解决这个问题吗?


6159
2017-09-10 15:02


起源



答案:


这里的问题比使用更微妙 嵌套RDD或在转换中执行Spark操作。 Spark不允许访问 SparkContext 内部行动或转型。

即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着你的 transformation 方法,参考 self,保持 SparkContext 因此,错误。

处理此问题的一种方法是使用静态方法:

class model(object):
    @staticmethod
    def transformation_function(row):
        row = row.split(',')
        return row[0]+row[1]

    def __init__(self):
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function)

编辑

如果您希望能够访问实例变量,可以尝试这样的方法:

class model(object):
    @staticmethod
    def transformation_function(a_model):
        delim = a_model.delim
        def _transformation_function(row):
            return row.split(delim)
        return _transformation_function

    def __init__(self):
        self.delim = ','
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function(self))

12
2017-09-11 01:30



完美 - 我没想到使用静态方法。唯一的问题是在完整的代码中,我的转换函数需要访问其中的其他变量 model class(不是RDD)。我假设唯一的方法是将它们作为参数传递给静态方法?例如 def transformation_function(row,somevar): return row+somevar - moustachio
换句话说 - 有没有办法访问类变量(self.whatever)从静态方法? - moustachio
(请注意,这些不能是静态变量 - 我肯定希望从静态方法中访问实例变量) - moustachio
如果我有可能你将面临完全相同的问题。你可以做的一件事是从静态方法返回函数。 - zero323
因为 transformation_function 不是“基本上是静态的”。它抓住了 self 在其范围内,并且过渡性地 RDD 对象和 SparkContext。 - zero323


答案:


这里的问题比使用更微妙 嵌套RDD或在转换中执行Spark操作。 Spark不允许访问 SparkContext 内部行动或转型。

即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着你的 transformation 方法,参考 self,保持 SparkContext 因此,错误。

处理此问题的一种方法是使用静态方法:

class model(object):
    @staticmethod
    def transformation_function(row):
        row = row.split(',')
        return row[0]+row[1]

    def __init__(self):
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function)

编辑

如果您希望能够访问实例变量,可以尝试这样的方法:

class model(object):
    @staticmethod
    def transformation_function(a_model):
        delim = a_model.delim
        def _transformation_function(row):
            return row.split(delim)
        return _transformation_function

    def __init__(self):
        self.delim = ','
        self.data = sc.textFile('some.csv')

    def run_model(self):
        self.data = self.data.map(model.transformation_function(self))

12
2017-09-11 01:30



完美 - 我没想到使用静态方法。唯一的问题是在完整的代码中,我的转换函数需要访问其中的其他变量 model class(不是RDD)。我假设唯一的方法是将它们作为参数传递给静态方法?例如 def transformation_function(row,somevar): return row+somevar - moustachio
换句话说 - 有没有办法访问类变量(self.whatever)从静态方法? - moustachio
(请注意,这些不能是静态变量 - 我肯定希望从静态方法中访问实例变量) - moustachio
如果我有可能你将面临完全相同的问题。你可以做的一件事是从静态方法返回函数。 - zero323
因为 transformation_function 不是“基本上是静态的”。它抓住了 self 在其范围内,并且过渡性地 RDD 对象和 SparkContext。 - zero323