
Reputation: 412

Spark Scala CSV Input to Nested Json

This is how my input data looks like,


And i want to convert the same to an keyValue RDD, where key is an Integer and Value is an JSON object and the purpose is to write the same to ElasticSearch

2024270, {
"metrics": {
  "date" : 20170201,
  "style_id" : 1234709,
  "revenue" : 1000,
  "list_count" : 1000,
  "pdp_count" : 1000,
  "add_to_cart_count" : 1000

In Python, I am able to do the same using the below piece of code,

metrics_rdd = sc.textFile('s3://myntra/scm-inbound/fifa/poc/size_curve_date_range_old/*').map(format_metrics)

def format_metrics(line):
    tokens = line.split('^')
        return (tokens[1], {
                    'metrics': {
                        'date': tokens[0],
                        'mrp': float(tokens[2]),
                        'revenue': float(tokens[3]),
                        'quantity': int(tokens[4]),
                        'product_discount': float(tokens[5]),
                        'coupon_discount': float(tokens[6]),
                        'total_discount': float(tokens[7]),
                        'list_count': int(tokens[8]),
                        'add_to_cart_count': int(tokens[9]),
                        'pdp_count': int(tokens[10])
                }) if len(tokens) > 1 else ('', dict())

But am not able to figure it out how to achieve the same in Scala and am a newbie to Scala, I managed to get the below output, but not able to wrap the JSON into "metrics" block, any pointers would be really helpful ?

ordersDF.withColumn("key", $"style_id")
        .withColumn("json", to_json(struct($"date", $"style_id", $"mrp")))
        .select("key", "json")

// Exiting paste mode, now interpreting.

|key    |json                                             |

Upvotes: 3

Views: 3793

Answers (1)


Reputation: 412

I tried what @philantrovert has suggested and it worked.

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv")
ordersDF: org.apache.spark.sql.DataFrame = [date: int, style_id: int ... 9 more fields]

scala> :paste
// Entering paste mode (ctrl-D to finish)

ordersDF.withColumn("key", $"style_id")
        .withColumn("metrics", to_json(struct($"date", $"style_id", $"mrp")))
        .select("key", "metrics")

// Exiting paste mode, now interpreting.

|value                                                                              |

I have also tried an other way using Json4s library and that also worked,

def convertRowToJSON(row: Row) = {

    val json =
    ("metrics" ->
      ("date" -> row(1).toString) ~
      ("style_id" -> row.getInt(1)) ~
      ("mrp" -> row.getFloat(2)) ~
      ("revenue" -> row.getFloat(3)) ~
      ("quantity" -> row.getInt(1)) ~
      ("product_discount" -> row.getFloat(3)) ~
      ("coupon_discount" -> row.getFloat(3)) ~
      ("total_discount" -> row.getFloat(3)) ~
      ("list_count" -> row.getInt(1)) ~
      ("add_to_cart_count" -> row.getInt(1)) ~
      ("pdp_count" -> row.getInt(1))

scala> val ordersDF = spark.read.schema(revenue_schema).format("csv").load("s3://myntra/scm-inbound/fifa/pocs/smallMetrics.csv").map(convertRowToJSON)
ordersDF: org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]

scala> ordersDF.show(false)
|_1     |_2                                                                                                                                                                                                                                                |

Upvotes: 2

Related Questions