teserecter
teserecter

Reputation: 470

scala: how to use a class like a variable

Is it possible to refer to different classes on each pass of an iteration?

I have a substantial number of Hadoop Hive tables, and will be processing them with Spark. Each of the tables has an auto-generated class, and I would like to loop through the tables, instead of the tedious, non-code reuse copy/paste/handCodeIndividualTableClassNames technique resorted to first.

import myJavaProject.myTable0Class
import myJavaProject.myTable1Class

object rawMaxValueSniffer extends Logging {
    /* tedious sequential:  it works, and sometimes a programmer's gotta do... */
    def tedious(args: Array[String]): Unit = {
        val tablePaths = List("path0_string_here","path1_string")
        var maxIds = ArrayBuffer[Long]()

        FileInputFormat.setInputPaths(conf, tablePaths(0))
        AvroReadSupport.setAvroReadSchema(conf.getConfiguration, myTable0Class.getClassSchema)
        ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[myTable0Class]])
        val records = sc.newAPIHadoopRDD(conf.getConfiguration,
            classOf[ParquetInputFormat[myTable0Class]],
            classOf[Void],
            classOf[myTable0Class]).map(x => x._2)
        maxIds += records.map(_.getId).collect().max

        FileInputFormat.setInputPaths(conf, tablePaths(1))
        AvroReadSupport.setAvroReadSchema(conf.getConfiguration, myTable1Class.getClassSchema)
        ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[myTable1Class]])
        val records = sc.newAPIHadoopRDD(conf.getConfiguration,
            classOf[ParquetInputFormat[myTable1Class]],
            classOf[Void],
            classOf[myTable1Class]).map(x => x._2)
        maxIds += records.map(_.getId).collect().max
    }

    /* class as variable, used in a loop.      I have seen the mountain... */
    def hopedFor(args: Array[String]): Unit = { 
        val tablePaths = List("path0_string_here","path1_string")
        var maxIds = ArrayBuffer[Long]()

        val tableClasses = List(classOf[myTable0Class],classOf[myTable1Class]) /* error free, but does not get me where I'm trying to go */
        var counter=0
        tableClasses.foreach { tc => 
            FileInputFormat.setInputPaths(conf, tablePaths(counter))
            AvroReadSupport.setAvroReadSchema(conf.getConfiguration, tc.getClassSchema)
            ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[tc]])
            val records = sc.newAPIHadoopRDD(conf.getConfiguration,
                classOf[ParquetInputFormat[tc]],
                classOf[Void],
                classOf[tc]).map(x => x._2)
                maxIds += records.map(_.getId).collect().max      /* all the myTableXXX classes have  getId()   */
            counter += 1    
        }       
    }
}       

/* the classes being referenced... */
@org.apache.avro.specific.AvroGenerated
public class myTable0Class extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"rsivr_surveyquestiontypes\",\"namespace\":\"myJavaProject\",\"fields\":[{\"name\":\"id\",\"type\":\"in    t\"},{\"name\":\"description\",\"type\":\"st,ing\"},{\"name\":\"scale_range\",\"type\":\"int\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public int id;

  yada.yada.yada0
}

@org.apache.avro.specific.AvroGenerated
public class myTable1Class extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"rsivr_surveyresultdetails\",\"namespace\":\"myJavaProject\",\"fields\":[{\"name\":\"id\",\"type\":\"in    t\"},{\"name\":\"survey_dts\",\"type\":\"string\"},{\"name\":\"survey_id\",\"type\":\"int\"},{\"name\":\"question\",\"type\":\"int\"},{\"name\":\"caller_id\",\"type\":\"string\"},{\"name\":\"rec_msg\",\"type\":\"string\"},{\"name\    ":\"note\",\"type\":\"string\"},{\"name\":\"lang\",\"type\":\"string\"},{\"name\":\"result\",\"type\":\"string\"}]}");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
  @Deprecated public int id;

    yada.yada.yada1
}

Upvotes: 0

Views: 572

Answers (1)

Dima
Dima

Reputation: 40510

Something like this, perhaps:

def doStuff[T <: SpecificRecordBase : ClassTag](index: Int, schema: => Schema, clazz: Class[T]) = {
  FileInputFormat.setInputPaths(conf, tablePaths(index))
    AvroReadSupport.setAvroReadSchema(conf.getConfiguration, schema) 
    ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[T]])
    val records = sc.newAPIHadoopRDD(conf.getConfiguration,
        classOf[ParquetInputFormat[T]],
        classOf[Void],
        clazz).map(x => x._2)
    maxIds += records.map(_.getId).collect().max
}

Seq(
  (classOf[myTable0Class], myTable0Class.getClassSchema _),
  (classOf[myTable1Class], myTable1Class.getClassSchema _)
).zipWithIndex
.foreach { case ((clazz, schema), index) => doStuff(index, schema, clazz) }

You could use reflection to invoke getClassSchema instead (clazz.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema]), then you would not need to pass it in as a aprameter, just clazz would be enough, but that's kinda cheating ... I like this approach a bit better.

Upvotes: 2

Related Questions