Reputation: 153
I have the following data in a file:
User: Test
Comment: Test
References: Test1 Test2 Test3
#####
User: Test1
Comment: Test1
References: Test2 Test3 Test4
...
Now I want to check, which user referred to something the most. This means, that a correlation between "User" and "References" needs to exist. When using the following code, each line becomes one element in the RDD:
elements = sc.textFile(path_to_file)
At this point it is no more possible to make a correlation between the "User" and the "References" values with functions like map(), since the elements of the RDD are treated independently.
Is it possible to tell the textFile() function to use a custom delimiter, other than newlines? (In the above case it would be 5 #)
Are there any other solutions to this problem?
Upvotes: 1
Views: 1022
Reputation: 2451
You can try this approach. Read as rdd, filter out blank lines and separator 5# then add index for grouping, convert to DF, groupBy and you have user and references in one row.
import spark.implicits._
import org.apache.spark.sql.functions._
val r1 = spark.sparkContext.textFile("data/splithash.txt")
val rdd = r1.filter(!_.trim().equals(""))
.filter(!_.equals("#####"))
.zipWithIndex()
.map(s => (s._1, Math.ceil(s._2/3).toInt))
val df = rdd.toDF()
df.show()
df.groupBy('_2).agg(collect_list('_1)).show(false)
+--------------------+---+
| _1| _2|
+--------------------+---+
| User: Test| 0|
| Comment: Test| 0|
|References: Test1...| 0|
| User: Test1| 1|
| Comment: Test1| 1|
|References: Test2...| 1|
+--------------------+---+
+---+------------------------------------------------------------+
|_2 |collect_list(_1) |
+---+------------------------------------------------------------+
|1 |[User: Test1, Comment: Test1, References: Test2 Test3 Test4]|
|0 |[User: Test, Comment: Test, References: Test1 Test2 Test3] |
+---+------------------------------------------------------------+
import pyspark.sql.functions as f
import math
r1 = spark.sparkContext.textFile("ok.txt")
rdd = r1.filter(lambda x: x.strip() != '') \
.filter(lambda x: x != '#####') \
.zipWithIndex() \
.map(lambda x: (x[0], math.floor(x[1] / 3)))
rdd.foreach(lambda x: print(x))
df = rdd.toDF()
df.show()
df.groupBy(f.col('_2')).agg(f.collect_list(f.col('_1'))).show(truncate=False)
+--------------------+---+
| _1| _2|
+--------------------+---+
| User: Test| 0|
| Comment: Test| 0|
|References: Test1...| 0|
| User: Test1| 1|
| Comment: Test1| 1|
|References: Test2...| 1|
+--------------------+---+
+---+------------------------------------------------------------+
|_2 |collect_list(_1) |
+---+------------------------------------------------------------+
|0 |[User: Test, Comment: Test, References: Test1 Test2 Test3] |
|1 |[User: Test1, Comment: Test1, References: Test2 Test3 Test4]|
+---+------------------------------------------------------------+
Upvotes: 1