问题 使用pyspark用以前已知的良好值填充null


有没有办法替换 null pyspark数据帧中的值是否为最后一个有效值?还有额外的东西 timestamp 和 session 列如果您认为需要它们用于Windows分区和排序。更具体地说,我想实现以下转换:

+---------+-----------+-----------+      +---------+-----------+-----------+
| session | timestamp |         id|      | session | timestamp |         id|
+---------+-----------+-----------+      +---------+-----------+-----------+
|        1|          1|       null|      |        1|          1|       null|
|        1|          2|        109|      |        1|          2|        109|
|        1|          3|       null|      |        1|          3|        109|
|        1|          4|       null|      |        1|          4|        109|
|        1|          5|        109| =>   |        1|          5|        109|
|        1|          6|       null|      |        1|          6|        109|
|        1|          7|        110|      |        1|          7|        110|
|        1|          8|       null|      |        1|          8|        110|
|        1|          9|       null|      |        1|          9|        110|
|        1|         10|       null|      |        1|         10|        110|
+---------+-----------+-----------+      +---------+-----------+-----------+

9237
2018-03-31 20:48


起源

你不能。 DataFrames行之间没有顺序。 - eliasah
如果我有订单怎么办? timestamp? - Oleksiy
仍然不可能。这是一个分布式数据结构。它不是为此目的而设计的。 - eliasah
你不能用某种寡妇划分吗?在这种情况下你做什么,一个接一个地手动处理条目并保持状态? - Oleksiy


答案:


这似乎是在使用诀窍 窗口功能

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

def fill_nulls(df):
    df_na = df.na.fill(-1)
    lag = df_na.withColumn('id_lag', func.lag('id', default=-1)\
                           .over(Window.partitionBy('session')\
                                 .orderBy('timestamp')))

    switch = lag.withColumn('id_change',
                            ((lag['id'] != lag['id_lag']) &
                             (lag['id'] != -1)).cast('integer'))


    switch_sess = switch.withColumn(
        'sub_session',
        func.sum("id_change")
        .over(
            Window.partitionBy("session")
            .orderBy("timestamp")
            .rowsBetween(-sys.maxsize, 0))
    )

    fid = switch_sess.withColumn('nn_id',
                           func.first('id')\
                           .over(Window.partitionBy('session', 'sub_session')\
                                 .orderBy('timestamp')))

    fid_na = fid.replace(-1, 'null')

    ff = fid_na.drop('id').drop('id_lag')\
                          .drop('id_change')\
                          .drop('sub_session').\
                          withColumnRenamed('nn_id', 'id')

    return ff

这是完整的 null_test.py


7
2018-04-04 15:37



@eliasah:你能回答一下吗? - Oleksiy
我现在正在看它。 - eliasah
添加测试,如果它有帮助,使生活更轻松 - Oleksiy
我正在写我的测试!谢谢。答案对我来说似乎很干净。有会话是必不可少的,这使得可以分区,从而使用窗口功能! - eliasah
感谢您的代码审查! - Oleksiy


我相信我有一个比接受的更简单的解决方案。它也使用函数,但使用名为'LAST'的函数并忽略空值。

让我们重新创建类似于原始数据的东西:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6}]
df = spark.createDataFrame(d)

这打印:

+-------+---+----+
|session| ts|  id|
+-------+---+----+
|      1|  1|null|
|      1|  2| 109|
|      1|  3|null|
|      1|  4| 110|
|      1|  5|null|
|      1|  6|null|
+-------+---+----+

现在,如果我们使用窗口函数LAST:

df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()

我们得到:

+-------+---+----+
|session| ts|  id|
+-------+---+----+
|      1|  1|null|
|      1|  2| 109|
|      1|  3| 109|
|      1|  4| 110|
|      1|  5| 110|
|      1|  6| 110|
+-------+---+----+

希望能帮助到你!


3
2017-07-28 13:36



需要注意的是:此答案将收集每个会话的所有行到某个执行程序节点。如果某个会话中的行数大于执行程序节点的内存,则会导致作业失败。 - Jordan P


@Oleksiy答案很好,但并没有完全符合我的要求。在一个会话中,如果是多个 nulls被观察到,所有都填充了第一个非null 为会议。我需要的 持续 非null 价值向前传播。

以下调整适用于我的用例:

def fill_forward(df, id_column, key_column, fill_column):

    # Fill null's with last *non null* value in the window
    ff = df.withColumn(
        'fill_fwd',
        func.last(fill_column, True) # True: fill with last non-null
        .over(
            Window.partitionBy(id_column)
            .orderBy(key_column)
            .rowsBetween(-sys.maxsize, 0))
        )

    # Drop the old column and rename the new column
    ff_out = ff.drop(fill_column).withColumnRenamed('fill_fwd', fill_column)

    return ff_out

0
2017-07-06 15:35