Reputation: 641
I'm trying to create a Row (org.apache.spark.sql.catalyst.expressions.Row
) based on the user input. I'm not able to create a Row randomly.
Is there any functionality to create a Row from List
or Array
.
For eg., If I have a .csv
file with the following format,
"91xxxxxxxxxx,21.31,15,0,0"
If the user input [1, 2]
then I need to take only 2nd column and 3rd column along with the customer_id
which is the first column
I try to parse it with the code:
val l3 = sc.textFile("/SparkTest/abc.csv").map(_.split(" ")).map(r => (foo(input,r(0)))) `
where foo is defined as
def f(n: List[Int], s: String) : Row = {
val n = input.length
var out = new Array[Any](n+1)
var r = s.split(",")
out(0) = r(0)
for (i <- 1 to n)
out(i) = r(input(i-1)).toDouble
Row(out)
}
and input is a List say
val input = List(1,2)
Executing this code I get l3 as:
Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@234d2916])
But what I want is:
Array[org.apache.spark.sql.catalyst.expressions.Row] = Array([9xxxxxxxxxx,21.31,15])`
This has to be passed to create a schema in Spark SQL
Upvotes: 11
Views: 59587
Reputation: 514
You can also try:
Row.fromSeq(line(0).toString ++ line(1).toDouble ++ line(2).toDouble ++ line.slice(2, line.size).map(value => value.toString))
Upvotes: 1
Reputation: 5018
You are missing creation of the StructField and StructType. Refer to the official guide http://spark.apache.org/docs/latest/sql-programming-guide.html, part Programmatically Specifying the Schema
I'm not a Scala specialist, but in Python it would look like this:
from pyspark.sql import *
sqlContext = SQLContext(sc)
input = [1,2]
def parse(line):
global input
l = line.split(',')
res = [l[0]]
for ind in input:
res.append(l[ind])
return res
csv = sc.textFile("file:///tmp/inputfile.csv")
rows = csv.map(lambda x: parse(x))
fieldnum = len(input) + 1
fields = [StructField("col"+str(i), StringType(), True) for i in range(fieldnum)]
schema = StructType(fields)
csvWithSchema = sqlContext.applySchema(rows, schema)
csvWithSchema.registerTempTable("test")
sqlContext.sql("SELECT * FROM test").collect()
In short, you should not directly convert them to Row objects, just leave as RDD and apply schema to it with applySchema
Upvotes: 2
Reputation: 436
Something like the following should work:
import org.apache.spark.sql._
def f(n: List[Int], s: String) : Row =
Row.fromSeq(s.split(",").zipWithIndex.collect{case (a,b) if n.contains(b) => a}.toSeq)
Upvotes: 18