在我通过使用窗口并在一组列上划分来计算pyspark数据帧中每行5行内的数量平均值之后
from pyspark.sql import functions as F
prep_df = ...
window = Window.partitionBy([F.col(x) for x in group_list]).rowsBetween(Window.currentRow, Window.currentRow + 4)
consecutive_df = prep_df.withColumn('aveg', F.avg(prep_df['quantity']).over(window))
我正在尝试按同一组进行分组,然后选择平均值的最大值,如下所示:
grouped_consecutive_df = consecutive_df.groupBy(group_column_list).agg(F.max(consecutive_df['aveg']).alias('aveg'))
但是,当我调试时,我发现计算出的最大值是错误的。对于特定的实例,我看到检索到的最大数量甚至不在“ aveg”列中。
我想问一下我是在采取错误的方法还是错过了一些琐碎的事情。任何意见表示赞赏。
我可以通过类似的解决方法来解决此问题:在聚合之前,我将数量平均值的最大值映射到另一个新列,然后选择了组中的某行。