Joseph Hwang
Joseph Hwang

Reputation: 1421

How to transform CSV type string to dataframe in Spark SQL Java?

I make Spark Java client codes with Spark structured streaming API. These code extract CSV type string from Kafka

SparkSession spark = SparkSession.builder().master("local[*]").appName("KafkaMongoStream").getOrCreate();
        
Dataset<Row> df = spark.read().format("kafka").option("kafka.bootstrap.servers", "localhost:9092"))
            .option("subscribe", "topicForMongoDB")
            .option("startingOffsets", "earliest")
            .load()
            .selectExpr("CAST(value AS STRING)");
            
df.show();

The returned results are successful. Those codes print the CSV type string.

+--------------------+
|               value|
+--------------------+
|realtime_start,re...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|
|2021-01-27,2021-0...|

Then I try to transform these strings to Spark dataframe in Spark SQL. First, below codes are the Java POJO class

public class EntityMongoDB implements Serializable {

    private Date date;
    private float value;
    private String id;
    private String title;
    private String state;
    private String frequency_short;
    private String units_short;
    private String seasonal_adjustment_short;
    
    private static StructType structType = DataTypes.createStructType(new StructField[] {
              
              DataTypes.createStructField("date", DataTypes.DateType, false),
              DataTypes.createStructField("value", DataTypes.FloatType, false),
              DataTypes.createStructField("id", DataTypes.StringType, false),
              DataTypes.createStructField("title", DataTypes.StringType, false),
              DataTypes.createStructField("state", DataTypes.StringType, false),
              DataTypes.createStructField("frequency_short", DataTypes.StringType, false),
              DataTypes.createStructField("units_short", DataTypes.StringType, false),
              DataTypes.createStructField("seasonal_adjustment_short", DataTypes.StringType, false)
    });
    
    public static StructType getStructType() {
        return structType;
    }
}

And I make codes to transform those CSV type string to dataframe

Dataset<Row> dfs = df.select(from_json(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.date", "entityMongoDB.value", "entityMongoDB.id", 
                "entityMongoDB.title", "entityMongoDB.state", "entityMongoDB.frequency_short", 
                "entityMongoDB.units_short", "entityMongoDB.seasonal_adjustment_short").toDF();

dfs.show();
dfs.printSchema();

The printed schema is correct.

 |-- date: date (nullable = true)
 |-- value: float (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- state: string (nullable = true)
 |-- frequency_short: string (nullable = true)
 |-- units_short: string (nullable = true)
 |-- seasonal_adjustment_short: string (nullable = true)

But the generated columns are full with null values:

+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|date|value|  id|title|state|frequency_short|units_short|seasonal_adjustment_short|
+----+-----+----+-----+-----+---------------+-----------+-------------------------+
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|
|null| null|null| null| null|           null|       null|                     null|

I think the schema of dataframe is generated correctly but extracting data parts have some problems.

Upvotes: 0

Views: 428

Answers (1)

blackbishop
blackbishop

Reputation: 32660

The strings you have in the value column aren't valid JSON, so from_json won't work here.

For Spark 3+, you can use from_csv as pointed out in the comments by @mck :

Dataset<Row> dfs = df.select(from_csv(col("value"), EntityMongoDB.getStructType())
        .as("entityMongoDB"))
        .selectExpr("entityMongoDB.*").toDF(); 

For Spark versions prior to 3, you can split the values by comma then transfrom the resulting array into multiple columns:

Dataset<Row> dfs = df.select(split(col("value"), ",").as("values"))
        .select(IntStream.range(0, 7).map(i -> col("values").getItem(i)).toArray())
        .toDF("date", "value", "id", "title", "state", "frequency_short", "units_short", "seasonal_adjustment_short"); 

Also, it seems you have column names in the values, you can filter out that line.

Upvotes: 2

Related Questions