我在Spark中将模型实现为python类,每当我尝试将类方法映射到RDD时,它都会失败。我的实际代码更复杂,但这个简化版本是问题的核心:
class model(object):
def __init__(self):
self.data = sc.textFile('path/to/data.csv')
# other misc setup
def run_model(self):
self.data = self.data.map(self.transformation_function)
def transformation_function(self,row):
row = row.split(',')
return row[0]+row[1]
现在,如果我像这样运行模型(例如):
test = model()
test.run_model()
test.data.take(10)
我收到以下错误:
例外:您似乎尝试从广播变量,操作或转换引用SparkContext。 SparkContext只能在驱动程序上使用,而不能在工作程序上运行的代码中使用。有关更多信息,请参阅SPARK-5063。
我已经玩了一下这个,当我尝试将类方法映射到类中的RDD时,它似乎可靠地发生。我已经确认,如果我在类结构之外实现,映射函数可以正常工作,所以问题肯定与类有关。有办法解决这个问题吗?
这里的问题比使用更微妙 嵌套RDD或在转换中执行Spark操作。 Spark不允许访问 SparkContext
内部行动或转型。
即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着你的 transformation
方法,参考 self
,保持 SparkContext
因此,错误。
处理此问题的一种方法是使用静态方法:
class model(object):
@staticmethod
def transformation_function(row):
row = row.split(',')
return row[0]+row[1]
def __init__(self):
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function)
编辑:
如果您希望能够访问实例变量,可以尝试这样的方法:
class model(object):
@staticmethod
def transformation_function(a_model):
delim = a_model.delim
def _transformation_function(row):
return row.split(delim)
return _transformation_function
def __init__(self):
self.delim = ','
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function(self))
这里的问题比使用更微妙 嵌套RDD或在转换中执行Spark操作。 Spark不允许访问 SparkContext
内部行动或转型。
即使您没有显式访问它,它也会在闭包内被引用,并且必须被序列化并随身携带。这意味着你的 transformation
方法,参考 self
,保持 SparkContext
因此,错误。
处理此问题的一种方法是使用静态方法:
class model(object):
@staticmethod
def transformation_function(row):
row = row.split(',')
return row[0]+row[1]
def __init__(self):
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function)
编辑:
如果您希望能够访问实例变量,可以尝试这样的方法:
class model(object):
@staticmethod
def transformation_function(a_model):
delim = a_model.delim
def _transformation_function(row):
return row.split(delim)
return _transformation_function
def __init__(self):
self.delim = ','
self.data = sc.textFile('some.csv')
def run_model(self):
self.data = self.data.map(model.transformation_function(self))