flowit
flowit

Reputation: 1442

Apache Spark SQL identifier expected exception

My question is quite similar to this one: Apache Spark SQL issue : java.lang.RuntimeException: [1.517] failure: identifier expected But I just can't figure out where my problem lies. I am using SQLite as database backend. Connecting and simple select statements work fine.

The offending line:

val df = tableData.selectExpr(tablesMap(t).toSeq:_*).map(r => myMapFunc(r))

tablesMap contains the table name as key and an array of strings as expressions. Printed, the array looks like this:

WrappedArray([My Col A], [ColB] || [Col C] AS ColB)

The table name is also included in square brackets since it contains spaces. The exception I get:

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: identifier expected

I already made sure not to use any Spark Sql keywords. In my opinion there are 2 possible reasons why this code fails: 1) I somehow handle spaces in column names wrong. 2) I handle concatenation wrong.

I am using a resource file, CSV-like, which contains the expressions I want to be evaluated on my tables. Apart from this file, I want to allow the user to specify additional tables and their respective column expressions at runtime. The file looks like this:

TableName,`Col A`,`ColB`,CONCAT(`ColB`, ' ', `Col C`)

Appartently this does not work. Nevertheless I would like to reuse this file, modified of course. My idea was to map the columns with the expressions from an array of strings, like now, to a sequence of spark columns. (This is the only solution for me I could think of, since I want to avoid pulling in all hive dependecies just for this one feature.) I would introduce a small syntax for my expressions to mark raw column names with a $ and some keywords for functions like concat and as. But how could I do this? I tried something like this but it's far far away from even compiling.

def columnsMapFunc( expr: String) : Column = {
    if(expr(0) == '$')
        return expr.drop(1)
    else
        return concat(extractedColumnNames).as(newName)
}

Upvotes: 1

Views: 5982

Answers (1)

zero323
zero323

Reputation: 330063

Generally speaking using names containing whitespaces is asking for problems but replacing square brackets with backticks should solve the problem:

val df = sc.parallelize(Seq((1,"A"), (2, "B"))).toDF("f o o", "b a r")
df.registerTempTable("foo bar")

df.selectExpr("`f o o`").show

// +-----+
// |f o o|
// +-----+
// |    1|
// |    2|
// +-----+

sqlContext.sql("SELECT `b a r` FROM `foo bar`").show

// +-----+
// |b a r|
// +-----+
// |    A|
// |    B|
// +-----+

For concatenation you have to use concat function:

df.selectExpr("""concat(`f o o`, " ", `b a r`)""").show

// +----------------------+
// |'concat(f o o, ,b a r)|
// +----------------------+
// |                   1 A|
// |                   2 B|
// +----------------------+

but it requires HiveContext in Spark 1.4.0.

In practice I would simply rename columns after loading data

df.toDF("foo", "bar")
// org.apache.spark.sql.DataFrame = [foo: int, bar: string]

and use functions instead of expression strings (concat function is available only in Spark >= 1.5.0, for 1.4 and earlier you'll need an UDF):

import org.apache.spark.sql.functions.concat

df.select($"f o o", concat($"f o o", lit(" "), $"b a r")).show

// +----------------------+
// |'concat(f o o, ,b a r)|
// +----------------------+
// |                   1 A|
// |                   2 B|
// +----------------------+

There is also concat_ws function which takes separator as the first argument:

df.selectExpr("""concat_ws(" ", `f o o`, `b a r`)""")
df.select($"f o o", concat_ws(" ", $"f o o", $"b a r"))

Upvotes: 3

Related Questions