Reputation: 49
I need to create a class schema to support 29 fields. Due to limit of 22 fields with case class I tried extending my class "sdp_d" with the Product interface as follows:
class sdp_d( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int], DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{
@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match
{
case 0 => WID
case 1 => BATCH_ID
case 2 => SRC_ID
case 3 => ORG_ID
case 4 => CLASS_WID
case 5 => DESC_TEXT
case 6 => PREMISE_WID
case 7 => FEED_LOC
case 8 => GPS_LAT
case 9 => GPS_LONG
case 10 => PULSE_OUTPUT_BLOCK
case 11 => UDC_ID
case 12 => UNIVERSAL_ID
case 13 => IS_VIRTUAL_FLG
case 14 => SEAL_INFO
case 15 => ACCESS_INFO
case 16 => ALT_ACCESS_INFO
case 17 => LOC_INFO
case 18 => ALT_LOC_INFO
case 19 => TYPE
case 20 => SUB_TYPE
case 21 => TIMEZONE_ID
case 22 => GIS_ID
case 23 => BILLED_UPTO_TIME
case 24 => POWER_STATUS
case 25 => LOAD_STATUS
case 26 => BILLING_HOLD_STATUS
case 27 => INSERT_TIME
case 28 => LAST_UPD_TIME
case _ => throw new IndexOutOfBoundsException(n.toString())
}
override def productArity: Int = 29
override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d]
}
This defined the class "sdp_d". However when I try to load csv data into with this pre-defined schema and register it as table I get an error:
> scala> import java.text.SimpleDateFormat; val sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss.S"); import java.util.Calendar; import java.util.Date; val calendar = Calendar.getInstance()
import java.text.SimpleDateFormat
sdf: java.text.SimpleDateFormat = java.text.SimpleDateFormat@cce61785
import java.util.Calendar
import java.util.Date
calendar: java.util.Calendar = java.util.GregorianCalendar[time=1424687963209,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Asia/Kolkata",offset=19800000,dstSavings=0,useDaylight=false,transitions=6,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2015,MONTH=1,WEEK_OF_YEAR=9,WEEK_OF_MONTH=4,DAY_OF_MONTH=23,DAY_OF_YEAR=54,DAY_OF_WEEK=2,DAY_OF_WEEK_IN_MONTH=4,AM_PM=1,HOUR=4,HOUR_OF_DAY=16,MINUTE=9,SECOND=23,MILLISECOND=209,ZONE_OFFSET=19800000,DST_OFFSET=0]
> scala> sc.textFile("hdfs://CDH-Master-1.cdhcluster/user/spark/Sdp_d.csv").map(_.split(",")).map { r =>
| val upto_time = sdf.parse(r(23).trim);
| calendar.setTime(upto_time);
| val r23 = new java.sql.Timestamp(upto_time.getTime);
|
| val insert_time = sdf.parse(r(26).trim);
| calendar.setTime(insert_time);
| val r26 = new java.sql.Timestamp(insert_time.getTime);
|
| val last_upd_time = sdf.parse(r(27).trim);
| calendar.setTime(last_upd_time);
| val r27 = new java.sql.Timestamp(last_upd_time.getTime);
|
| sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
| }.registerAsTable("sdp")
<console>:36: error: not found: value sdp_d
sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
^`
I am working in spark-shell. Spark version 1.1.0 and scala version 2.10.4.
I don't understand why the error : not found: value sdp_d.
How am I supposed to registerAsTable when I create my own class extending Product interface??
Please help in resolving the error.
Upvotes: 2
Views: 1444
Reputation: 764
You may:
new sdp_d(...)
new sdp_d(Try(r(0).trim.toInt).toOption, Try(r(1).trim.toInt).toOption, l(2).trim.toOption, ...)
This works for me:
//AirTraffic.scala
class AirTraffic(Year:Option[Int], Month:Option[Int], DayOfMonth:Option[Int], DayOfWeek:Option[Int],
DepTime:Option[Int], CRSDepTime:Option[Int], ArrTime:Option[Int], CRSArrTime:Option[Int],
UniqueCarrier:String, FlightNum:Option[Int], TailNum:String, ActualElapsedTime:Option[Int],
CRSElapsedTime:Option[Int], AirTime:Option[Int], ArrDelay:Option[Int], DepDelay:Option[Int],
Origin:String, Dest:String, Distance:Option[Int], TaxiIn:Option[Int], TaxiOut:Option[Int],
Cancelled:Option[Boolean], CancellationCode:String, Diverted:Option[Boolean], CarrierDelay:Option[Int],
WeatherDelay:Option[Int], NASDelay:Option[Int], SecurityDelay:Option[Int], LateAircraftDelay:Option[Int]) extends Product {
// We declare field with Option[T] type to make that field null-able.
override def productElement(n: Int): Any =
n match {
case 0 => Year
case 1 => Month
case 2 => DayOfMonth
case 3 => DayOfWeek
case 4 => DepTime
case 5 => CRSDepTime
case 6 => ArrTime
case 7 => CRSArrTime
case 8 => UniqueCarrier
case 9 => FlightNum
case 10 => TailNum
case 11 => ActualElapsedTime
case 12 => CRSElapsedTime
case 13 => AirTime
case 14 => ArrDelay
case 15 => DepDelay
case 16 => Origin
case 17 => Dest
case 18 => Distance
case 19 => TaxiIn
case 20 => TaxiOut
case 21 => Cancelled
case 22 => CancellationCode
case 23 => Diverted
case 24 => CarrierDelay
case 25 => WeatherDelay
case 26 => NASDelay
case 27 => SecurityDelay
case 28 => LateAircraftDelay
case _ => throw new IndexOutOfBoundsException(n.toString)
}
override def productArity: Int = 29
override def canEqual(that: Any): Boolean = that.isInstanceOf[AirTraffic]
}
//main.scala
val data = sparkContext.textFile("local-input/AIRLINE/2008.csv").map(_.split(","))
.map(l => new AirTraffic(Try(l(0).trim.toInt).toOption, Try(l(1).trim.toInt).toOption, Try(l(2).trim.toInt).toOption, Try(l(3).trim.toInt).toOption,
Try(l(4).trim.toInt).toOption, Try(l(5).trim.toInt).toOption, Try(l(6).trim.toInt).toOption, Try(l(7).trim.toInt).toOption,
l(8).trim, Try(l(9).trim.toInt).toOption, l(10).trim, Try(l(11).trim.toInt).toOption,
Try(l(12).trim.toInt).toOption, Try(l(13).trim.toInt).toOption, Try(l(14).trim.toInt).toOption, Try(l(15).trim.toInt).toOption,
l(16).trim, l(17).trim, Try(l(18).trim.toInt).toOption, Try(l(19).trim.toInt).toOption, Try(l(20).trim.toInt).toOption,
Try(l(21).trim.toBoolean).toOption, l(22).trim, Try(l(23).trim.toBoolean).toOption, Try(l(24).trim.toInt).toOption,
Try(l(25).trim.toInt).toOption, Try(l(26).trim.toInt).toOption, Try(l(27).trim.toInt).toOption, Try(l(28).trim.toInt).toOption)).toDF()
// register table with SQLContext
data.registerTempTable("AirTraffic")
val count = sqlContext.sql("SELECT COUNT(*) FROM AirTraffic").collect()
count.foreach(print)
If you think it's still ugly, we can do more by:
implicit class StringConverter(val s: String) extends AnyVal {
def tryGetInt = Try(s.trim.toInt).toOption
def tryGetString = {
val res = s.trim
if (res.isEmpty) None else res
}
def tryGetBoolean = Try(s.trim.toBoolean).toOption
}
then
val data = sparkContext.textFile("local-input/AIRLINE/2008.csv").map(_.split(","))
.map(l => new AirTraffic(l(0).tryGetInt, l(1).tryGetInt, l(2).tryGetInt, l(3).tryGetInt,
l(4).tryGetInt, l(5).tryGetInt, l(6).tryGetInt, l(7).tryGetInt,
l(8).trim, l(9).tryGetInt, l(10).trim, l(11).tryGetInt,
l(12).tryGetInt, l(13).tryGetInt, l(14).tryGetInt, l(15).tryGetInt,
l(16).trim, l(17).trim, l(18).tryGetInt, l(19).tryGetInt, l(20).tryGetInt,
l(21).tryGetBoolean, l(22).trim, l(23).tryGetBoolean, l(24).tryGetInt,
l(25).tryGetInt, l(26).tryGetInt, l(27).tryGetInt, l(28).tryGetInt)).toDF()
Upvotes: 0
Reputation: 2857
You should just instantiate the class with new
:
new sdp_d(r(0).trim.toInt, r(1).trim.toInt, ...
Upvotes: 0
Reputation: 2594
Did you happen to have a look at https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
Upvotes: 1