在Spark中,DataFrame是一种基于分布式数据集的分布式数据容器。DataFrame可以被看做是一种具有schema的RDD,而且可以很方便地提供关系型数据库的操作方式。在进行数据操作时,会经常用到排序和分组,下面我将详细讲解如何进行全局排序id和分组后保留最大值行的操作。
全局排序id
全局排序id是指对整个数据集进行排序,并为每一行赋一个唯一的编号。Spark提供了一个很方便的函数monotonically_increasing_id()来实现全局排序id功能。下面是示例代码:
import org.apache.spark.sql.functions.monotonically_increasing_id
val df = Seq((20,"jack",3000),(20,"rose",4000),(21,"mike",5000),(22,"lucy",2000),(22,"amy",4500))
.toDF("age","name","salary")
val resultDF = df.withColumn("rowId", monotonically_increasing_id())
上述代码中,通过toDF()函数将一个元素为元组的序列转换为DataFrame,并在其中添加了一列名为rowId的列,该列使用函数monotonically_increasing_id()赋值。该函数的作用是为每一行添加递增的唯一的长整型值,该值从0开始。
分组后保留最大值行
对于数据集的分组操作,可以使用groupBy()函数,该函数将会返回一个GroupedData对象。对于GroupedData对象,可以进行各种聚合操作,例如sum、avg、max等等。如果我们要保留分组后的最大值行,可以进行以下操作:
import org.apache.spark.sql.functions.{desc, row_number}
import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy("age").orderBy(desc("salary"))
val resultDF = df.withColumn("maxSalaryRow", row_number().over(windowSpec)).filter("maxSalaryRow = 1")
上述代码中,首先定义了一个窗口windowSpec,该窗口根据年龄进行分组,然后按工资由高到低排序。接着使用row_number()函数为每一行赋值一个row_number,并将这一列名为maxSalaryRow。最后,通过filter函数来筛选出maxSalaryRow值为1的行,也就是每个分组中的最大值行。
示例说明
下面我们通过两个实例来进一步说明如何进行全局排序id和分组后保留最大值行的操作:
示例一
假设我们有一份销售数据集,其中包含商品名称、销售日期、销售数量信息。我们需要对该数据集进行全局排序id,并以销售数量由高到低排序。下面是示例代码:
import org.apache.spark.sql.functions.monotonically_increasing_id
val df = Seq(("A","2020-01-01",10),("A","2020-01-02",20),("B","2020-01-01",15),("B","2020-01-02",25))
.toDF("name","date","salesNum")
val resultDF = df.withColumn("rowId", monotonically_increasing_id())
.orderBy(desc("salesNum"))
.drop("rowId")
resultDF.show()
上述代码中,首先使用toDF()函数将元素为元组的序列转换为DataFrame。接着,使用monotonically_increasing_id()函数为每一行添加唯一递增的数值,并将其保存在名为rowId的列中。然后使用orderBy()函数将数据根据销售数量由高到低排序,最后使用drop()函数将rowId列删除。
结果如下所示:
+----+----------+--------+
|name| date|salesNum|
+----+----------+--------+
| B|2020-01-02| 25|
| A|2020-01-02| 20|
| B|2020-01-01| 15|
| A|2020-01-01| 10|
+----+----------+--------+
示例二
假设我们有一份员工信息数据集,其中包含员工姓名、年龄、工资等信息。我们需要对该数据集进行分组并保留每组中工资最高的员工信息。下面是示例代码:
import org.apache.spark.sql.functions.{desc, row_number}
import org.apache.spark.sql.expressions.Window
val df = Seq(("jack",20,3000),("rose",20,4000),("mike",21,5000),("lucy",22,2000),("amy",22,4500))
.toDF("name","age","salary")
val windowSpec = Window.partitionBy("age").orderBy(desc("salary"))
val resultDF = df.withColumn("maxSalaryRow", row_number().over(windowSpec)).filter("maxSalaryRow = 1").drop("maxSalaryRow")
resultDF.show()
上述代码中,首先使用toDF()函数将元素为元组的序列转换为DataFrame。然后定义一个窗口windowSpec来分组并按照工资从高到低排序。接着使用row_number()函数为每一行添加递增的唯一的行号,并将其保存在名为maxSalaryRow的列中。最后使用filter()函数将每个分组中maxSalaryRow值为1的行筛选出来,并删除掉maxSalaryRow列。
结果如下所示:
+----+---+------+
|name|age|salary|
+----+---+------+
|rose| 20| 4000|
|mike| 21| 5000|
| amy| 22| 4500|
+----+---+------+
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spark dataframe全局排序id与分组后保留最大值行 - Python技术站