DataQuest5
DataQuest5

Reputation: 59

Reading a .txt file with colon (:) in spark 2.4

i am trying to read .txt file in Spark 2.4 and load it to dataframe. FILE data looks like :-

under a single Manager there is many employee

Manager_21: Employee_575,Employee_2703,
Manager_11: Employee_454,Employee_158,
Manager_4: Employee_1545,Employee_1312

Code i have written in Scala Spark 2.4 :-

val df = spark.read
      .format("csv")
      .option("header", "true") //first line in file has headers
      .option("mode", "DROPMALFORMED")
      .load("D:/path/myfile.txt")

    df.printSchema()

Unfortunately while printing schema it is visible all Employee under single Manager_21.

 root
 |-- Manager_21: servant_575: string (nullable = true)
 |-- Employee_454: string (nullable = true)
 |-- Employee_1312 string (nullable = true)

....... ...... etc

I am not sure if it is possible in spark scala....

Expected Output:

all employee of a manager in same column. for ex: Manager 21 has 2 employee and all are in same column. Or How can we see which all employee are under a particular manager.

Manager_21   |Manager_11   |Manager_4
Employee_575 |Employee_454 |Employee_1545
Employee_2703|Employee_158|Employee_1312

is it possible to do some other way..... please suggest

Thanks

Upvotes: 0

Views: 1027

Answers (1)

notNull
notNull

Reputation: 31470

Try using spark.read.text then using groupBy and .pivot to get the desired result.


Example:

val df=spark.read.text("<path>")
df.show(10,false)
//+--------------------------------------+
//|value                                 |
//+--------------------------------------+
//|Manager_21: Employee_575,Employee_2703|
//|Manager_11: Employee_454,Employee_158 |
//|Manager_4: Employee_1545,Employee_1312|
//+--------------------------------------+

import org.apache.spark.sql.functions._

df.withColumn("mid",monotonically_increasing_id).
withColumn("col1",split(col("value"),":")(0)).
withColumn("col2",split(split(col("value"),":")(1),",")).
groupBy("mid").
pivot(col("col1")).
agg(min(col("col2"))).
select(max("Manager_11").alias("Manager_11"),max("Manager_21").alias("Manager_21") ,max("Manager_4").alias("Manager_4")).
selectExpr("explode(arrays_zip(Manager_11,Manager_21,Manager_4))").
select("col.*").
show()

//+-------------+-------------+--------------+
//|   Manager_11|   Manager_21|     Manager_4|
//+-------------+-------------+--------------+
//| Employee_454| Employee_575| Employee_1545|
//| Employee_158|Employee_2703| Employee_1312|
//+-------------+-------------+--------------+

UPDATE:

val df=spark.read.text("<path>")
val df1=df.withColumn("mid",monotonically_increasing_id).
withColumn("col1",split(col("value"),":")(0)).
withColumn("col2",split(split(col("value"),":")(1),",")).
groupBy("mid").
pivot(col("col1")).
agg(min(col("col2"))).
select(max("Manager_11").alias("Manager_11"),max("Manager_21").alias("Manager_21") ,max("Manager_4").alias("Manager_4")).
selectExpr("explode(arrays_zip(Manager_11,Manager_21,Manager_4))")

//create temp table

df1.createOrReplaceTempView("tmp_table")
sql("select col.* from tmp_table").show(10,false)

//+-------------+-------------+--------------+
//|Manager_11   |Manager_21   |Manager_4     |
//+-------------+-------------+--------------+
//| Employee_454| Employee_575| Employee_1545|
//|Employee_158 |Employee_2703|Employee_1312 |
//+-------------+-------------+--------------+

Upvotes: 1

Related Questions