问题 通过pyspark.ml CrossValidator调整隐式pyspark.ml ALS矩阵分解模型的参数


我正在尝试调整使用隐式数据的ALS矩阵分解模型的参数。为此,我正在尝试使用pyspark.ml.tuning.CrossValidator来运行参数网格并选择最佳模型。我相信我的问题在于评估者,但我无法弄明白。

我可以使用回归RMSE评估器为显式数据模型工作,如下所示:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import rand


conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsExplicit = ALS()
defaultModel = alsExplicit.fit(dfRatings)

paramMapExplicit = ParamGridBuilder() \
                    .addGrid(alsExplicit.rank, [8, 12]) \
                    .addGrid(alsExplicit.maxIter, [10, 15]) \
                    .addGrid(alsExplicit.regParam, [1.0, 10.0]) \
                    .build()

evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cvExplicit = CrossValidator(estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR)
cvModelExplicit = cvExplicit.fit(dfRatings)

predsExplicit = cvModelExplicit.bestModel.transform(dfRatingsTest)
predsExplicit.show()

当我尝试为隐式数据执行此操作时(假设视图计数而不是评级),我得到一个我无法弄清楚的错误。这是代码(与上面非常相似):

dfCounts = sqlContext.createDataFrame([(0,0,0), (0,1,12), (0,2,3), (1,0,5), (1,1,9), (1,2,0), (2,0,0), (2,1,11), (2,2,25)],
                                 ["user", "item", "rating"])
dfCountsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCounts)

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

evaluatorB = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="rating")
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR)
cvModel = cv.fit(dfCounts)

predsImplicit = cvModel.bestModel.transform(dfCountsTest)
predsImplicit.show()

我尝试使用RMSE评估程序执行此操作,但是出现错误。据我所知,我还应该能够将AUC度量用于二​​元分类评估器,因为隐式矩阵分解的预测是用于预测二进制矩阵p_ui的置信矩阵c_ui。 根据这篇论文,这是pyspark ALS引用的文档。

使用评估器给我一个错误,我找不到任何关于在线交叉验证隐式ALS模型的富有成效的讨论。我正在查看CrossValidator源代码,试图弄清楚出了什么问题,但遇到了麻烦。我的一个想法是,在该过程将隐式数据矩阵r_ui转换为二进制矩阵p_ui和置信矩阵c_ui之后,我不确定它在评估阶段比较预测的c_ui矩阵是什么。

这是错误:

Traceback (most recent call last):

  File "<ipython-input-16-6c43b997005e>", line 1, in <module>
    cvModel = cv.fit(dfCounts)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 69, in fit
    return self._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\tuning.py", line 239, in _fit
    model = est.fit(train, epm[j])

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 67, in fit
    return self.copy(params)._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 133, in _fit
    java_model = self._fit_java(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 130, in _fit_java
    return self._java_obj.fit(dataset._jdf)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

etc.......

UPDATE

我尝试缩放输入,使其在0到1的范围内并使用RMSE评估器。它似乎工作得很好,直到我尝试将其插入CrossValidator。

以下代码有效。我得到了预测,我从评估员那里得到了一个RMSE值。

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator


conf = SparkConf() \
  .setAppName("ALSPractice") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

# Users 0, 1, 2, 3 - Items 0, 1, 2, 3, 4, 5 - Ratings 0.0-5.0
dfCounts2 = sqlContext.createDataFrame([(0,0,5.0), (0,1,5.0),            (0,3,0.0), (0,4,0.0), 
                                        (1,0,5.0),            (1,2,4.0), (1,3,0.0), (1,4,0.0),
                                        (2,0,0.0),            (2,2,0.0), (2,3,5.0), (2,4,5.0),
                                        (3,0,0.0), (3,1,0.0),            (3,3,4.0)            ],
                                       ["user", "item", "rating"])

dfCountsTest2 = sqlContext.createDataFrame([(0,0), (0,1), (0,2), (0,3), (0,4),
                                            (1,0), (1,1), (1,2), (1,3), (1,4),
                                            (2,0), (2,1), (2,2), (2,3), (2,4),
                                            (3,0), (3,1), (3,2), (3,3), (3,4)], ["user", "item"])

# Normalize rating data to [0,1] range based on max rating
colmax = dfCounts2.select(F.max('rating')).collect()[0].asDict().values()[0]
normalize = udf(lambda x: x/colmax, FloatType())
dfCountsNorm = dfCounts2.withColumn('ratingNorm', normalize(col('rating')))

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCountsNorm)
preds = defaultModelImplicit.transform(dfCountsTest2)

evaluatorR2 = RegressionEvaluator(metricName="rmse", labelCol="ratingNorm")
evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))

preds = defaultModelImplicit.transform(dfCountsTest2)

我不明白为什么以下不起作用。我使用相同的估算器,相同的评估器并拟合相同的数据。为什么这些工作在上面但不在CrossValidator中:

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR2)
cvModel = cv.fit(dfCountsNorm)

3258
2018-05-16 18:36


起源

感谢您发布此问题。在编辑中,为什么要使用以下方法计算RMSE: evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm)) 代替 evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsTest2)) - Archimeow


答案:


忽略技术问题,严格来说,鉴于ALS使用隐式反馈生成的输入,两种方法都不正确。

  • 你不能用 RegressionEvaluator 因为,正如您所知,预测可以解释为置信度值,并表示为范围[0,1]中的浮点数,而标签列只是一个未绑定的整数。这些值显然无法比较。
  • 你不能用 BinaryClassificationEvaluator 因为即使预测可以解释为概率标签也不代表二元决策。此外,预测列具有无效类型,不能直接使用 BinaryClassificationEvaluator

您可以尝试转换其中一列,使输入符合要求,但从理论角度来看,这不是一种合理的方法,并引入了难以调整的其他参数。

  • 将标签列映射到[0,1]范围并使用RMSE。

  • 将标签列转换为具有固定阈值的二进制指示符并扩展 ALS/ ALSModel 返回预期的列类型。假设阈值为1,则可能是这样的

    from pyspark.ml.recommendation import *
    from pyspark.sql.functions import udf, col
    from pyspark.mllib.linalg import DenseVector, VectorUDT
    
    class BinaryALS(ALS):
        def fit(self, df):
            assert self.getImplicitPrefs()
            model = super(BinaryALS, self).fit(df)
            return ALSBinaryModel(model._java_obj)
    
    class ALSBinaryModel(ALSModel):
        def transform(self, df):
            transformed = super(ALSBinaryModel, self).transform(df)
            as_vector = udf(lambda x: DenseVector([1 - x, x]), VectorUDT())
            return transformed.withColumn(
                "rawPrediction", as_vector(col("prediction")))
    
    # Add binary label column
    with_binary = dfCounts.withColumn(
        "label_binary", (col("rating") > 0).cast("double"))
    
    als_binary_model = BinaryALS(implicitPrefs=True).fit(with_binary)
    
    evaluatorB = BinaryClassificationEvaluator(
        metricName="areaUnderROC", labelCol="label_binary")
    
    evaluatorB.evaluate(als_binary_model.transform(with_binary))
    ## 1.0
    

一般来说,关于评估具有隐式反馈的推荐系统的材料在教科书中有点缺失,我建议你阅读一下 eliasah回答 关于评估这些推荐人。


9
2018-05-16 20:11



非常感谢您的回答。我尝试了你的两个建议,并决定采用缩放和RMSE评估方法。它似乎运行良好,除了我插入CrossValidator函数时。我认为CrossValidator以相同的方式使用估计器和求值器,同时自动进行k折交叉验证并迭代网格中的参数组合。我用我的新代码(有效的东西和没有的东西)更新了上面的帖子。你有没有了解CrossValidator正在做什么导致它在fit语句中出错? - ilyab
它与评估者策略无关。简单地(或不那么简单)意味着不可能为给定数据和一组参数求解系统。 - zero323
我尝试将参数网格设置为仅包含默认的估算器参数值。我仍然得到错误,所以我认为它不是参数组合。您是否知道可能存在问题的地方?我想我可以设置自己的嵌套循环来通过参数组合和交叉验证来执行迭代,但我更希望使用Spark的内置函数来实现这一点。 - ilyab
@xenocyon输入可以解释为项目相关的概率。所以它只是创建一对(P(项目不相关),P(项目相关)),这几乎是BinaryClassifcationEvaluator所期望的(P(class(x)= 0),P(class(x)= 1)) - zero323
@ zero323我看到你的AUC为1.0。甚至当我对其进行转换然后运行评估程序时,我在验证数据集上获得的AUC为1.0。从技术上讲,人们不会期望AUC为1.0且略低于优秀模型。这里有什么问题,一个人得到1.0? - Baktaawar


通过隐式反馈,我们没有用户对我们建议的反应。因此,我们不能使用基于精度的指标。

在已经 引文,使用预期的百分位数排名度量。

您可以尝试基于Spark ML lib中的类似度量标准实现Evaluator,并在Cross Validation管道中使用它。


0
2017-12-15 11:09