Warm tip: This article is reproduced from serverfault.com, please click

Rewrite UDF to pandas udf with ArrayType column

发布于 2020-11-22 23:14:38

I am trying to rewrite an UDF into pandas UDF.

However when it comes to the column with ArrayType inside. I am struggling to find the right solution.

I have a dataframe as below:

+-----------+--------------------+
|      genre|                 ids|
+-----------+--------------------+
|      Crime|[6, 22, 42, 47, 5...|
|    Romance|[3, 7, 11, 15, 17...|
|   Thriller|[6, 10, 16, 18, 2...|
|  Adventure|[2, 8, 10, 15, 29...|
|   Children|[1, 2, 8, 13, 34,...|
|      Drama|[4, 11, 14, 16, 1...|
|        War|[41, 110, 151, 15...|
|Documentary|[37, 77, 99, 108,...|
|    Fantasy|[2, 56, 60, 126, ...|
|    Mystery|[59, 113, 123, 16...|
+-----------+--------------------+

The following UDF works well:

pairs_udf = udf(lambda x: itertools.combinations(x, 2), transformer.schema)
df = df.select("genre", pairs_udf("ids").alias("ids"))

The output is like:

+-----------+--------------------+
|      genre|                 ids|
+-----------+--------------------+
|      Crime|[[6, 22], [6, 42]...|
|    Romance|[[3, 7], [3, 11],...|
|   Thriller|[[6, 10], [6, 16]...|
|  Adventure|[[2, 8], [2, 10],...|
|   Children|[[1, 2], [1, 8], ...|
|      Drama|[[4, 11], [4, 14]...|
|        War|[[41, 110], [41, ...|
|Documentary|[[37, 77], [37, 9...|
|    Fantasy|[[2, 56], [2, 60]...|
|    Mystery|[[59, 113], [59, ...|
+-----------+--------------------+

However, what would be the equivalent when writing the function in pandas udf.

PS: I understand, alternatively, I can use cross-join to achieve the same results.

But, I am more curious about how do pandas udf handle column with ArrayType.

Questioner
D.J
Viewed
11
D.J 2020-11-28 11:22:28

I am going to share my findings here:

there are 3 aspects in order to make pandas udf work for your project:

1. pandas UDF, or more precisely, Apache Arrow does not support complex types as common udf dose.(as of pyspark 3.0.1, pyarrow 2.0.0)

e.g.:

2. if you are running Java 11, which is the default in (py)Spark 3. you need to add the following as part of your spark config:

spark.driver.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'
spark.executor.extraJavaOptions='-Dio.netty.tryReflectionSetAccessible=true'

this will solve the java.lang.UnsupportedOperationException mentioned above.

3. make sure your virtual environment python path is added to your pyspark_python

i.e. environ['PYSPARK_PYTHON']='./your/virutal/enviroment/path'