问题 如何在PySpark mllib中滚动自定义估算器


我正在尝试构建一个简单的自定义 Estimator 在PySpark MLlib中。我有 这里 可以写一个自定义的Transformer,但我不知道如何在一个 Estimator。我也不明白什么 @keyword_only 是的,为什么我需要这么多的二传手和吸气剂。 Scikit-learn似乎有适合自定义模型的文档(看这里 但PySpark没有。

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?

10833
2018-05-17 08:04


起源



答案:


一般来说没有文档,因为对于Spark 1.6 / 2.0,大多数相关的API并不是公开的。它应该在Spark 2.1.0中改变(见 SPARK-7146)。

API相对复杂,因为它必须遵循特定的约定才能给出 Transformer 要么 Estimator 兼容 Pipeline API。读取和写入或网格搜索等功能可能需要其中一些方法。其他,比如 keyword_only 只是一个简单的帮助者,并不是严格要求的。

假设您已为平均参数定义了以下混合:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

标准差参数:

class HasStandardDeviation(Params):

    stddev = Param(Params._dummy(), "stddev", "stddev", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(stddev=value)

    def getStddev(self):
        return self.getOrDefault(self.stddev)

和门槛:

class HasCenteredThreshold(Params):

    centered_threshold = Param(Params._dummy(),
            "centered_threshold", "centered_threshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centered_threshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centered_threshold)

你可以创建基本的 Estimator 如下:

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold):

    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return (NormalDeviationModel()
            .setInputCol(c)
            .setMean(mu)
            .setStddev(sigma)
            .setCenteredThreshold(self.getCenteredThreshold())
            .setPredictionCol(self.getPredictionCol()))

class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold):

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)

最后它可以使用如下:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+

11
2018-05-17 14:51



谢谢!所以Estimator的状态也被认为是一个参数? - Hanan Shteingart
你的意思是将估算器的调整参数作为模型的参数吗?如果是这样,以这种方式设计是方便的,但对于基本实现来说并不是一项艰难的要求。 - zero323
好的,有什么希望得到关于如何坚持这样的自定义步骤的一些建议? - Evan Zamir
这是一个非常有用的例子。但是,如果您的变压器/模型具有特定于它的参数而不是估算器,该怎么办?一旦它成为管道中的一个阶段,如何将这些参数传递给模型?当他们与估算器无关时,我不想先将这些参数传递给估算器。我问过这个问题 这里... - snark
谢谢@ zero323 - 有没有更新?我讨厌这种语法,你需要从每个Param继承(Estimator不是Param所以它不应该继承它...) - Hanan Shteingart


答案:


一般来说没有文档,因为对于Spark 1.6 / 2.0,大多数相关的API并不是公开的。它应该在Spark 2.1.0中改变(见 SPARK-7146)。

API相对复杂,因为它必须遵循特定的约定才能给出 Transformer 要么 Estimator 兼容 Pipeline API。读取和写入或网格搜索等功能可能需要其中一些方法。其他,比如 keyword_only 只是一个简单的帮助者,并不是严格要求的。

假设您已为平均参数定义了以下混合:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

标准差参数:

class HasStandardDeviation(Params):

    stddev = Param(Params._dummy(), "stddev", "stddev", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(stddev=value)

    def getStddev(self):
        return self.getOrDefault(self.stddev)

和门槛:

class HasCenteredThreshold(Params):

    centered_threshold = Param(Params._dummy(),
            "centered_threshold", "centered_threshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centered_threshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centered_threshold)

你可以创建基本的 Estimator 如下:

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold):

    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return (NormalDeviationModel()
            .setInputCol(c)
            .setMean(mu)
            .setStddev(sigma)
            .setCenteredThreshold(self.getCenteredThreshold())
            .setPredictionCol(self.getPredictionCol()))

class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold):

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)

最后它可以使用如下:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+

11
2018-05-17 14:51



谢谢!所以Estimator的状态也被认为是一个参数? - Hanan Shteingart
你的意思是将估算器的调整参数作为模型的参数吗?如果是这样,以这种方式设计是方便的,但对于基本实现来说并不是一项艰难的要求。 - zero323
好的,有什么希望得到关于如何坚持这样的自定义步骤的一些建议? - Evan Zamir
这是一个非常有用的例子。但是,如果您的变压器/模型具有特定于它的参数而不是估算器,该怎么办?一旦它成为管道中的一个阶段,如何将这些参数传递给模型?当他们与估算器无关时,我不想先将这些参数传递给估算器。我问过这个问题 这里... - snark
谢谢@ zero323 - 有没有更新?我讨厌这种语法,你需要从每个Param继承(Estimator不是Param所以它不应该继承它...) - Hanan Shteingart