Astha Sachdev
Astha Sachdev

Reputation: 49

Facing error while extending scala class with Product interface to overcome limit of 22 fields in spark-shell

I need to create a class schema to support 29 fields. Due to limit of 22 fields with case class I tried extending my class "sdp_d" with the Product interface as follows:

class sdp_d( WID :Option[Int], BATCH_ID :Option[Int], SRC_ID :Option[String], ORG_ID :Option[Int], CLASS_WID :Option[Int],  DESC_TEXT :Option[String], PREMISE_WID :Option[Int], FEED_LOC :Option[String], GPS_LAT :Option[Double], GPS_LONG :Option[Double], PULSE_OUTPUT_BLOCK :Option[String], UDC_ID :Option[String], UNIVERSAL_ID :Option[String], IS_VIRTUAL_FLG :Option[String], SEAL_INFO :Option[String], ACCESS_INFO :Option[String], ALT_ACCESS_INFO :Option[String], LOC_INFO :Option[String], ALT_LOC_INFO :Option[String], TYPE :Option[String], SUB_TYPE :Option[String], TIMEZONE_ID :Option[Int], GIS_ID :Option[String], BILLED_UPTO_TIME :Option[java.sql.Timestamp], POWER_STATUS :Option[String], LOAD_STATUS :Option[String], BILLING_HOLD_STATUS :Option[String], INSERT_TIME :Option[java.sql.Timestamp], LAST_UPD_TIME :Option[java.sql.Timestamp]) extends Product{

@throws(classOf[IndexOutOfBoundsException])
override def productElement(n: Int) = n match 
{
    case 0 => WID
    case 1 => BATCH_ID
    case 2 => SRC_ID
    case 3 => ORG_ID
    case 4 => CLASS_WID
    case 5 => DESC_TEXT
    case 6 => PREMISE_WID
    case 7 => FEED_LOC
    case 8 => GPS_LAT
    case 9 => GPS_LONG
    case 10 => PULSE_OUTPUT_BLOCK
    case 11 => UDC_ID
    case 12 => UNIVERSAL_ID
    case 13 => IS_VIRTUAL_FLG
    case 14 => SEAL_INFO
    case 15 => ACCESS_INFO
    case 16 => ALT_ACCESS_INFO
    case 17 => LOC_INFO
    case 18 => ALT_LOC_INFO
    case 19 => TYPE
    case 20 => SUB_TYPE
    case 21 => TIMEZONE_ID
    case 22 => GIS_ID
    case 23 => BILLED_UPTO_TIME
    case 24 => POWER_STATUS
    case 25 => LOAD_STATUS
    case 26 => BILLING_HOLD_STATUS
    case 27 => INSERT_TIME
    case 28 => LAST_UPD_TIME
    case _ => throw new IndexOutOfBoundsException(n.toString())
}

override def productArity: Int = 29
override def canEqual(that: Any): Boolean = that.isInstanceOf[sdp_d]

}

This defined the class "sdp_d". However when I try to load csv data into with this pre-defined schema and register it as table I get an error:

> scala> import java.text.SimpleDateFormat; val sdf = new SimpleDateFormat("yyyy-mm-dd hh:mm:ss.S"); import java.util.Calendar; import java.util.Date; val calendar = Calendar.getInstance()
import java.text.SimpleDateFormat
sdf: java.text.SimpleDateFormat = java.text.SimpleDateFormat@cce61785
import java.util.Calendar
import java.util.Date
calendar: java.util.Calendar = java.util.GregorianCalendar[time=1424687963209,areFieldsSet=true,areAllFieldsSet=true,lenient=true,zone=sun.util.calendar.ZoneInfo[id="Asia/Kolkata",offset=19800000,dstSavings=0,useDaylight=false,transitions=6,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=1,YEAR=2015,MONTH=1,WEEK_OF_YEAR=9,WEEK_OF_MONTH=4,DAY_OF_MONTH=23,DAY_OF_YEAR=54,DAY_OF_WEEK=2,DAY_OF_WEEK_IN_MONTH=4,AM_PM=1,HOUR=4,HOUR_OF_DAY=16,MINUTE=9,SECOND=23,MILLISECOND=209,ZONE_OFFSET=19800000,DST_OFFSET=0]

    > scala> sc.textFile("hdfs://CDH-Master-1.cdhcluster/user/spark/Sdp_d.csv").map(_.split(",")).map { r =>
         | val upto_time = sdf.parse(r(23).trim);
         | calendar.setTime(upto_time); 
         | val r23 = new java.sql.Timestamp(upto_time.getTime); 
         | 
         | val insert_time = sdf.parse(r(26).trim); 
         | calendar.setTime(insert_time); 
         | val r26 = new java.sql.Timestamp(insert_time.getTime); 
         | 
         | val last_upd_time = sdf.parse(r(27).trim);
         | calendar.setTime(last_upd_time); 
         | val r27 = new java.sql.Timestamp(last_upd_time.getTime); 
         | 
         | sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
         | }.registerAsTable("sdp")
    <console>:36: error: not found: value sdp_d
                  sdp_d(r(0).trim.toInt, r(1).trim.toInt, r(2).trim, r(3).trim.toInt, r(4).trim.toInt, r(5).trim, r(6).trim.toInt, r(7).trim, r(8).trim.toDouble, r(9).trim.toDouble, r(10).trim, r(11).trim, r(12).trim, r(13).trim, r(14).trim, r(15).trim, r(16).trim, r(17).trim, r(18).trim, r(19).trim, r(20).trim, r(21).trim.toInt, r(22).trim, r23, r(24).trim, r(25).trim, r26, r27, r(28).trim)
                  ^`

I am working in spark-shell. Spark version 1.1.0 and scala version 2.10.4.

I don't understand why the error : not found: value sdp_d.

How am I supposed to registerAsTable when I create my own class extending Product interface??

Please help in resolving the error.

Upvotes: 2

Views: 1444

Answers (3)

ngtrkhoa
ngtrkhoa

Reputation: 764

You may:

  1. Instantiate with the new keyword new sdp_d(...)
  2. You declared fields as Option[T], eg Option[Int], so we need to pass Option[T] as parameters (Some or None). new sdp_d(Try(r(0).trim.toInt).toOption, Try(r(1).trim.toInt).toOption, l(2).trim.toOption, ...)

This works for me:

//AirTraffic.scala
        class AirTraffic(Year:Option[Int], Month:Option[Int], DayOfMonth:Option[Int], DayOfWeek:Option[Int],
                     DepTime:Option[Int], CRSDepTime:Option[Int], ArrTime:Option[Int], CRSArrTime:Option[Int],
                     UniqueCarrier:String, FlightNum:Option[Int], TailNum:String, ActualElapsedTime:Option[Int],
                     CRSElapsedTime:Option[Int], AirTime:Option[Int], ArrDelay:Option[Int], DepDelay:Option[Int],
                     Origin:String, Dest:String, Distance:Option[Int], TaxiIn:Option[Int], TaxiOut:Option[Int],
                     Cancelled:Option[Boolean], CancellationCode:String, Diverted:Option[Boolean], CarrierDelay:Option[Int],
                     WeatherDelay:Option[Int], NASDelay:Option[Int], SecurityDelay:Option[Int], LateAircraftDelay:Option[Int]) extends Product {

      // We declare field with Option[T] type to make that field null-able.

      override def productElement(n: Int): Any =
        n match {
          case 0 => Year
          case 1 => Month
          case 2 => DayOfMonth
          case 3 => DayOfWeek
          case 4 => DepTime
          case 5 => CRSDepTime
          case 6 => ArrTime
          case 7 => CRSArrTime
          case 8 => UniqueCarrier
          case 9 => FlightNum
          case 10 => TailNum
          case 11 => ActualElapsedTime
          case 12 => CRSElapsedTime
          case 13 => AirTime
          case 14 => ArrDelay
          case 15 => DepDelay
          case 16 => Origin
          case 17 => Dest
          case 18 => Distance
          case 19 => TaxiIn
          case 20 => TaxiOut
          case 21 => Cancelled
          case 22 => CancellationCode
          case 23 => Diverted
          case 24 => CarrierDelay
          case 25 => WeatherDelay
          case 26 => NASDelay
          case 27 => SecurityDelay
          case 28 => LateAircraftDelay
          case _ => throw new IndexOutOfBoundsException(n.toString)
        }

      override def productArity: Int = 29

      override def canEqual(that: Any): Boolean = that.isInstanceOf[AirTraffic]
    }

//main.scala    
    val data = sparkContext.textFile("local-input/AIRLINE/2008.csv").map(_.split(","))
          .map(l => new AirTraffic(Try(l(0).trim.toInt).toOption, Try(l(1).trim.toInt).toOption, Try(l(2).trim.toInt).toOption, Try(l(3).trim.toInt).toOption,
          Try(l(4).trim.toInt).toOption, Try(l(5).trim.toInt).toOption, Try(l(6).trim.toInt).toOption, Try(l(7).trim.toInt).toOption,
          l(8).trim, Try(l(9).trim.toInt).toOption, l(10).trim, Try(l(11).trim.toInt).toOption,
          Try(l(12).trim.toInt).toOption, Try(l(13).trim.toInt).toOption, Try(l(14).trim.toInt).toOption, Try(l(15).trim.toInt).toOption,
          l(16).trim, l(17).trim, Try(l(18).trim.toInt).toOption, Try(l(19).trim.toInt).toOption, Try(l(20).trim.toInt).toOption,
          Try(l(21).trim.toBoolean).toOption, l(22).trim, Try(l(23).trim.toBoolean).toOption, Try(l(24).trim.toInt).toOption,
          Try(l(25).trim.toInt).toOption, Try(l(26).trim.toInt).toOption, Try(l(27).trim.toInt).toOption, Try(l(28).trim.toInt).toOption)).toDF()

        // register table with SQLContext
        data.registerTempTable("AirTraffic")

    val count = sqlContext.sql("SELECT COUNT(*) FROM AirTraffic").collect()
        count.foreach(print)

If you think it's still ugly, we can do more by:

implicit class StringConverter(val s: String) extends AnyVal {
    def tryGetInt = Try(s.trim.toInt).toOption

    def tryGetString = {
      val res = s.trim
      if (res.isEmpty) None else res
    }

    def tryGetBoolean = Try(s.trim.toBoolean).toOption
  }

then

val data = sparkContext.textFile("local-input/AIRLINE/2008.csv").map(_.split(","))
      .map(l => new AirTraffic(l(0).tryGetInt, l(1).tryGetInt, l(2).tryGetInt, l(3).tryGetInt,
      l(4).tryGetInt, l(5).tryGetInt, l(6).tryGetInt, l(7).tryGetInt,
      l(8).trim, l(9).tryGetInt, l(10).trim, l(11).tryGetInt,
      l(12).tryGetInt, l(13).tryGetInt, l(14).tryGetInt, l(15).tryGetInt,
      l(16).trim, l(17).trim, l(18).tryGetInt, l(19).tryGetInt, l(20).tryGetInt,
      l(21).tryGetBoolean, l(22).trim, l(23).tryGetBoolean, l(24).tryGetInt,
      l(25).tryGetInt, l(26).tryGetInt, l(27).tryGetInt, l(28).tryGetInt)).toDF()

Upvotes: 0

pzecevic
pzecevic

Reputation: 2857

You should just instantiate the class with new:

new sdp_d(r(0).trim.toInt, r(1).trim.toInt, ...

Upvotes: 0

Related Questions