Reputation: 13
I've ran into an issue when trying to create a DataFrame in a scala app I'm writing.
Issue I'm having is that compiling scala exits with error that toDF isn't a part of RDD. I've seen answers that suggest moving case class definition out of main and importing implicits after sqlContext declaration, but even that didn't work for me.
This is what I currently have:
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object ErrorParser {
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String) : Array[String] = {
var array:Array[String] = new Array[String](5)
...
return array
}
def filterErrors(errors: Array[Array[String]]) : Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
...
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
errorDF.write.json("hdfs://hadoop-master:9000/results/errorParserResult")
}
}
I'm stumped since in spark-shell things work like this.
I've also seen some answers that suggest changing RDD to an instance of RDD[Row] and then using
sc.createDataFrame(rdd, scheme)
but I can't wrap my head around how I would go around doing that.
Any help would be much appreciated!
This is my .sbt file:
name := "ErrorParserAPI"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.10" % "1.6.1",
"org.apache.spark" % "spark-sql_2.10" % "1.6.1"
)
EDIT: a typo
Upvotes: 1
Views: 2606
Reputation: 4613
I just copied your code and pasted in my eclipse and its working fine without any compilation errors . If you are using eclipse , you may try cleaning and refreshing your project.
import scala.Array.canBuildFrom
import scala.collection.mutable.ArrayBuffer
import scala.reflect.runtime.universe
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ErrorParser {
def filterErrors(errors: Array[Array[String]]): Array[Array[String]] = {
var filteredErrors = ArrayBuffer[Array[String]]()
return filteredErrors.toArray
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("ErrorParserAPI")
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
var logs = sc.textFile("hdfs://hadoop-master:9000/logs/data/logs/server.*")
var errors = logs.filter(line => line.contains("ERROR"))
val errors1 = errors.map(line => splitError(line))
val filteredErrors = filterErrors(errors1.collect)
val dfErrors = filteredErrors.map(p => Error(p(0).split(":")(0) + ":" + p(0).split(":")(1), p(1), p(2), p(3), p(4)))
val filteredRDD = sc.parallelize(dfErrors)
var errorDF = filteredRDD.toDF()
}
case class Error(time: String, status: String, statusType: String, host: String, message: String)
def splitError(line: String): Array[String] = {
var array: Array[String] = new Array[String](5)
return array
}
}
Upvotes: 1