Warm tip: This article is reproduced from serverfault.com, please click

Write parquet from another parquet with a new schema using pyspark

发布于 2020-12-03 18:53:03

I am using pyspark dataframes, I want to read a parquet file and write it with a different schema from the original file

The original schema is (It have 9.000 variables, I am just putting the first 5 for the example):

[('id', 'string'),
 ('date', 'string'),
 ('option', 'string'),
 ('cel1', 'string'),
 ('cel2', 'string')]

And I want to write:

[('id', 'integer'),
 ('date', 'integer'),
 ('option', 'integer'),
 ('cel1', 'integer'),
 ('cel2', 'integer')]

My code is:

df = sqlContext.read.parquet("PATH")

### SOME OPERATIONS ###

write_schema = StructType([StructField('id'  , IntegerType(), True),
                           StructField('date'  , IntegerType(), True),
                           StructField('option'  , IntegerType(), True),
                           StructField('cel1'  , IntegerType(), True),
                           StructField('cel2'  , IntegerType(), True) ])


df.option("schema",write_schema).write("PATH")

After I run it I still have the same schema from the original data, everything is string, the schema did not changed


Also I tried using

df = sqlContext.read.option("schema",write_schema).parquet(PATH)

This option does not change the schema when I read it, It shows the original one, so I use (suggested in here):

df = sqlContext.read.schema(write_schema).parquet(PATH)

These one works for the reading part, if I see the types I get:

df.dtypes

#>>[('id', 'int'),
#   ('date', 'int'),
#   ('option', 'int'),
#   ('cel1', 'int'),
#   ('cel2', 'int')]

But when I tried to write the parquet I get an error:

Parquet column cannot be converted. Column: [id], Expected: IntegerType, Found: BINARY

Regards

Questioner
Joe
Viewed
0
mck 2020-12-04 03:49:54

Cast your columns to int and then try writing to another parquet file. No schema specification needed.

df = spark.read.parquet("filepath")
df2 = df.select(*map(lambda col: df[col].cast('int'), df.columns))
df2.write.parquet("filepath")