mvpasarel
mvpasarel

Reputation: 785

Process big data using hadoop parquet to CSV output

I have 3 datasets and I want to join and grouped them in order to get a CSV containing aggregated data.

Data is stored in Hadoop as parquet files and I am using Zeppelin to run Apache Spark+Scala for the data processing.

My datasets look like this:

user_actions.show(10)
user_clicks.show(10)
user_options.show(10)

+--------------------+--------------------+
|                  id|             keyword|
+--------------------+--------------------+
|00000000000000000001|               aaaa1|
|00000000000000000002|               aaaa1|
|00000000000000000003|               aaaa2|
|00000000000000000004|               aaaa2|
|00000000000000000005|               aaaa0|
|00000000000000000006|               aaaa4|
|00000000000000000007|               aaaa1|
|00000000000000000008|               aaaa2|
|00000000000000000009|               aaaa1|
|00000000000000000010|               aaaa1|
+--------------------+--------------------+
+--------------------+-------------------+
|           search_id|   selected_user_id|
+--------------------+-------------------+
|00000000000000000001|               1234|
|00000000000000000002|               1234|
|00000000000000000003|               1234|
|00000000000000000004|               1234|
+--------------------+-------------------+

+--------------------+----------+----------+
|           search_id|   user_id|  position|
+--------------------+----------+----------+
|00000000000000000001|      1230|         1|
|00000000000000000001|      1234|         3|
|00000000000000000001|      1232|         2|
|00000000000000000002|      1231|         1|
|00000000000000000002|      1232|         2|
|00000000000000000002|      1233|         3|
|00000000000000000002|      1234|         4|
|00000000000000000003|      1234|         1|
|00000000000000000004|      1230|         1|
|00000000000000000004|      1234|         2|
+--------------------+----------+----------+

What I am trying to achieve is to get for each user id a JSON with the keywords because I need them imported in MySQL and have user_id as PK.

user_id,keywords
1234,"{\"aaaa1\":3.5,\"aaaa2\":0.5}"

If JSON is not out of the box, I can work with tuples or any string:

user_id,keywords
1234,"(aaaa1,0.58333),(aaaa2,1.5)"

What I did so far is:

val user_actions_data = user_actions
                                .join(user_options, user_options("search_id") === user_actions("id"))

val user_actions_full_data = user_actions_data
                                    .join(
                                            user_clicks,
                                            user_clicks("search_id") === user_actions_data("search_id") && user_clicks("selected_user_id") === user_actions_data("user_id"),
                                            "left_outer"
                                        )

val user_actions_data_groupped = user_actions_full_data
                                        .groupBy("user_id", "search")
                                        .agg("search" -> "count", "selected_user_id" -> "count", "position" -> "avg")


def udfScoreForUser = ((position: Double, searches: Long) =>  ( position/searches ))

val search_log_keywords = user_actions_data_groupped.rdd.map({row => row(0) -> (row(1) -> udfScoreForUser(row.getDouble(4), row.getLong(2)))}).groupByKey()


val search_log_keywords_array = search_log_keywords.collect.map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))

val search_log_keywords_df = sc.parallelize(search_log_keywords_array).toDF("user_id","keywords")
    .coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("overwrite")
    .save("hdfs:///Search_log_testing_keywords/")

While this works as expected with a small dataset and my output CSV file is:

user_id,keywords
1234,"(aaaa1,0.58333), (aaaa2,0.5)"

I have problems with it when running against 200+GB of data.

I am fairly new to Spark&Scala but I think I am missing something and I shouldn't be using DF to rdd, collect to map on array, and parallelize it back to DF to export it to CSV.

As a summary, I want to apply a scoring to all keywords and group them by user id and save it to a CSV. What I did so far works with a small dataset but when I apply it to 200GB+ of data, apache spark fails.

Upvotes: 0

Views: 599

Answers (2)

Pramod Sripada
Pramod Sripada

Reputation: 261

HDFS primary goal is to split the file into chunks and store it redundantly. It is better to store the data partitioned in HDFS unless it is absolutely necessary for you to have a single large file.

Upvotes: 0

Glennie Helles Sindholt
Glennie Helles Sindholt

Reputation: 13154

Yes, anything that relies on collect in Spark is generally wrong - unless you are debugging something. When you call collect all data is collected at the driver in an array, so for most big data set this isn't even an option - your driver will throw an OOM and die.

What I don't understand is why are you collecting in the first place? Why not simply do you map on the distributed data set?

search_log_keywords
  .map(r => (r._1.asInstanceOf[Long], r._2.mkString(", ")))
  .toDF("user_id","keywords")
  .coalesce(1)
  .write.format("csv")
  .option("header", "true")
  .mode("overwrite")
  .save("hdfs:///Search_log_testing_keywords/")

That way, everything is carried out in parallel.

With regards to switching between dataframes and rdds, then I would't worry too much about that right now. I know the community mostly advocate using dataframes, but depending on the version of Spark and your use case, rdds may be a better choice.

Upvotes: 1

Related Questions