使用Pandas_UDF快速改造Pandas代码

 

1. Pandas_UDF介绍

PySpark和Pandas之间改进性能和互操作性的其核心思想是将Apache Arrow作为序列化格式,以减少PySpark和Pandas之间的开销。

Pandas_UDF是在PySpark2.3中新引入的API,由Spark使用Arrow传输数据,使用Pandas处理数据。Pandas_UDF是使用关键字pandas_udf作为装饰器或包装函数来定义的,不需要额外的配置。目前,有两种类型的Pandas_UDF,分别是Scalar(标量映射)和Grouped Map(分组映射)。

1.1 Scalar

Scalar Pandas UDF用于向量化标量操作。常常与select和withColumn等函数一起使用。其中调用的Python函数需要使用pandas.Series作为输入并返回一个具有相同长度的pandas.Series。具体执行流程是,Spark将列分成批,并将每个批作为数据的子集进行函数的调用,进而执行panda UDF,最后将结果连接在一起。

下面的示例展示如何创建一个scalar panda UDF,计算两列的乘积:

复制代码
import pandas as pd  from pyspark.sql.functions import col, pandas_udf from pyspark.sql.types import LongType  # 声明函数并创建UDF
def multiply_func(a, b): return a * b multiply = pandas_udf(multiply_func, returnType=LongType()) x = pd.Series([1, 2, 3])df = spark.createDataFrame(pd.DataFrame(x, columns=["x"])) # Execute function as a Spark vectorized UDFdf.select(multiply(col("x"), col("x"))).show() # +-------------------+ # |multiply_func(x, x)| # +-------------------+ # | 1| # | 4| # | 9| # +-------------------+
复制代码

1.2 Grouped Map

Grouped map(分组映射)panda udf与groupBy().apply()一起使用,后者实现了“split-apply-combine”模式。“split-apply-combine”包括三个步骤:

  1. 使用DataFrame.groupBy将数据分成多个组。
  2. 对每个分组应用一个函数。函数的输入和输出都是pandas.DataFrame。输入数据包含每个组的所有行和列。
  3. 将结果合并到一个新的DataFrame中。

要使用groupBy().apply(),需要定义以下内容:

  • 定义每个分组的Python计算函数,这里可以使用pandas包或者Python自带方法。
  • 一个StructType对象或字符串,它定义输出DataFrame的格式,包括输出特征以及特征类型。

需要注意的是,StructType对象中的Dataframe特征顺序需要与分组中的Python计算函数返回特征顺序保持一致。

此外,在应用该函数之前,分组中的所有数据都会加载到内存,这可能导致内存不足抛出异常。

下面的例子展示了如何使用groupby().apply()从组中的每个值中减去平均:

复制代码
from pyspark.sql.functions import pandas_udf, PandasUDFType  df =
                        
关键字:
50000+
5万行代码练就真实本领
17年
创办于2008年老牌培训机构
1000+
合作企业
98%
就业率

联系我们

电话咨询

0532-85025005

扫码添加微信