Login
Login

Reputation: 153

How to use a custom line break in spark textFile function?

I have the following data in a file:

User: Test

Comment: Test

References: Test1 Test2 Test3

#####

User: Test1

Comment: Test1

References: Test2 Test3 Test4

...

Now I want to check, which user referred to something the most. This means, that a correlation between "User" and "References" needs to exist. When using the following code, each line becomes one element in the RDD:

elements = sc.textFile(path_to_file)

At this point it is no more possible to make a correlation between the "User" and the "References" values with functions like map(), since the elements of the RDD are treated independently.

Is it possible to tell the textFile() function to use a custom delimiter, other than newlines? (In the above case it would be 5 #)

Are there any other solutions to this problem?

Upvotes: 1

Views: 1022

Answers (1)

chlebek
chlebek

Reputation: 2451

You can try this approach. Read as rdd, filter out blank lines and separator 5# then add index for grouping, convert to DF, groupBy and you have user and references in one row.

  import spark.implicits._
  import org.apache.spark.sql.functions._

  val r1 = spark.sparkContext.textFile("data/splithash.txt")
  val rdd = r1.filter(!_.trim().equals(""))
    .filter(!_.equals("#####"))
    .zipWithIndex()
    .map(s => (s._1, Math.ceil(s._2/3).toInt))

  val df = rdd.toDF()
  df.show()

  df.groupBy('_2).agg(collect_list('_1)).show(false)
    +--------------------+---+
    |                  _1| _2|
    +--------------------+---+
    |          User: Test|  0|
    |       Comment: Test|  0|
    |References: Test1...|  0|
    |         User: Test1|  1|
    |      Comment: Test1|  1|
    |References: Test2...|  1|
    +--------------------+---+

    +---+------------------------------------------------------------+
    |_2 |collect_list(_1)                                            |
    +---+------------------------------------------------------------+
    |1  |[User: Test1, Comment: Test1, References: Test2 Test3 Test4]|
    |0  |[User: Test, Comment: Test, References: Test1 Test2 Test3]  |
    +---+------------------------------------------------------------+
import pyspark.sql.functions as f
import math

r1 = spark.sparkContext.textFile("ok.txt")
rdd = r1.filter(lambda x: x.strip() != '') \
    .filter(lambda x: x != '#####') \
    .zipWithIndex() \
    .map(lambda x: (x[0], math.floor(x[1] / 3)))

rdd.foreach(lambda x: print(x))

df = rdd.toDF()
df.show()


df.groupBy(f.col('_2')).agg(f.collect_list(f.col('_1'))).show(truncate=False)
+--------------------+---+
|                  _1| _2|
+--------------------+---+
|          User: Test|  0|
|       Comment: Test|  0|
|References: Test1...|  0|
|         User: Test1|  1|
|      Comment: Test1|  1|
|References: Test2...|  1|
+--------------------+---+

+---+------------------------------------------------------------+
|_2 |collect_list(_1)                                            |
+---+------------------------------------------------------------+
|0  |[User: Test, Comment: Test, References: Test1 Test2 Test3]  |
|1  |[User: Test1, Comment: Test1, References: Test2 Test3 Test4]|
+---+------------------------------------------------------------+

Upvotes: 1

Related Questions