Ivan
Ivan

Reputation: 20101

Read CSV file in PySpark Streaming with different schema on the same file

I have a csv file that has different lenghts per row, similar to:

left, 10, xdfe, 8992, 0.231
left, 10, xdfk, 8993, 2.231
right, 20, adfk, 8993, 2.231, DDT, 10, 10
right, 30, dfk, 923, 2.231, ADD, 10, 20
center, 923, 2.231, 10, 20
right, 34, efk, 326, 6.21, DDD, 20, 40

where rows that start with a keyword, left, right and center have the same length (the left rows always have the same length as other left rows, for example).

I want to read these files using spark.readStream.csv, do some transformations that can depend on the kind of row, and write the results to parquet. Is there a way to use different schemas based on the value of the first column of each row?

Upvotes: 2

Views: 3107

Answers (1)

Shaido
Shaido

Reputation: 28352

No, you can't use multiple schemas for the same file. The best you can do is to use the schema for the longest row and set the mode to PERMISSIVE, this will give null values in the missing columns for the shorter rows.

Unfortunately, this means that the type and column names will be different if the missing columns are not at the end of the row. E.g. the third column is a string for the right rows can a float for the center rows (looks like it should be the fifth column). One way would be to read everything as strings and then do the conversion, but depending on the data some columns can be read as e.g. float.

schema = StructType().add("a", "string").add("b", "string") \
    .add("c", "string").add("d", "string").add("e", "string") \
    .add("f", "string").add("g", "string").add("h", "string")

df = spark \
    .readStream \
    .option("mode", "PERMISSIVE") \
    .schema(schema) \
    .csv("/path/to/directory")

After this is done, it's possible to make some transformations of the data to get a correct looking dataframe. Below code is in Scala but should be easy to transform to python and adjusted to the actual needs:

val df2 = df.select($"a", 
    when($"a" === "center", null).otherwise($"b").cast(FloatType).as("b"),
    when($"a" === "center", null).otherwise($"c").as("c"),
    when($"a" === "center", $"b").otherwise($"d").cast(FloatType).as("d"),
    when($"a" === "center", $"c").otherwise($"e").cast(FloatType).as("e"),
    $"f", $"g", $"h")

Final result:

+------+----+-----+------+-----+----+----+----+
|     a|   b|    c|     d|    e|   f|   g|   h|
+------+----+-----+------+-----+----+----+----+
|  left|10.0| xdfe|8992.0|0.231|null|null|null|
|  left|10.0| xdfk|8993.0|2.231|null|null|null|
| right|20.0| adfk|8993.0|2.231| DDT|  10|  10|
| right|30.0|  dfk| 923.0|2.231| ADD|  10|  20|
|center|null| null| 923.0|2.231|null|null|null|
| right|34.0|  efk| 326.0| 6.21| DDD|  20|  40|
+------+----+-----+------+-----+----+----+----+

Upvotes: 2

Related Questions