derek
derek

Reputation: 10217

Spark: adding column name to csv file fails

I have "a.txt" which is in csv format and is separated by tabs:

16777216    16777471        -33.4940    143.2104
16777472    16778239    Fuzhou  26.0614 119.3061

Then I run:

sc.textFile("path/to/a.txt").map(line => line.split("\t")).toDF("startIP", "endIP", "City", "Longitude", "Latitude")

THen I got:

java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match. Old column names (1): value New column names (5): startIP, endIP, City, Longitude, Latitude at scala.Predef$.require(Predef.scala:224) at org.apache.spark.sql.Dataset.toDF(Dataset.scala:376) at org.apache.spark.sql.DatasetHolder.toDF(DatasetHolder.scala:40) ... 47 elided

If I just run:

res.map(line => line.split("\t")).take(2)

I got:

rdd: Array[Array[String]] = Array(Array(16777216, 16777471, "", -33.4940, 143.2104), Array(16777472, 16778239, Fuzhou, 26.0614, 119.3061))

What is wrong here?

Upvotes: 1

Views: 3724

Answers (3)

Yoga
Yoga

Reputation: 1

You can try this example:

dataDF = sc.textFile("filepath").map(x=>x.split('\t').toDF();

data = dataDF.selectExpr("_1 as startIP", "_2 as endIP", "_3 as City", "_4 as Longitude", "_5 as Latitude");

Upvotes: 0

Vidya
Vidya

Reputation: 30310

As @user7881163 notes, the error occurs because your split produces a single column whose value (hence the value name given by Spark) is the array of tokens produced by the split.

However, per comments from @zero323, just make sure you use the version of collect @user7881163 uses (the one that takes a partial function) if you are operating at scale because the other, far more commonly used collect will move all your data to the driver and overwhelm that machine. And if you aren't operating at scale, why use Spark at all?

This is a slightly different approach that also allows for missing city data:

sc.textFile("path/to/a.txt")
  .map(_.split("\t"))
  .map {
      case Array(startIP, endIP, city, longitude, latitude) => (startIP, endIP, Some(city), longitude, latitude)
      case Array(startIP, endIP, longitude, latitude) => (startIP, endIP, None, longitude, latitude)
  }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")

Upvotes: 3

user7881163
user7881163

Reputation: 11

Try:

sc
  .textFile("path/to/a.txt")
  .map(line => line.split("\t"))
  .collect { case Array(startIP, endIP, City, Longitude, Latitude) => 
    (startIP, endIP, City, Longitude, Latitude) 
  }.toDF("startIP", "endIP", "City", "Longitude", "Latitude")

or just use csv source:

spark.read.option("delimiter", "\t").csv("path/to/a.txt")

Your current code creates a DataFrame with a single column of type array<string>. This is why it fails when you pass 5 names.

Upvotes: 1

Related Questions