Reputation: 470
Is it possible to refer to different classes on each pass of an iteration?
I have a substantial number of Hadoop Hive tables, and will be processing them with Spark. Each of the tables has an auto-generated class, and I would like to loop through the tables, instead of the tedious, non-code reuse copy/paste/handCodeIndividualTableClassNames technique resorted to first.
import myJavaProject.myTable0Class
import myJavaProject.myTable1Class
object rawMaxValueSniffer extends Logging {
/* tedious sequential: it works, and sometimes a programmer's gotta do... */
def tedious(args: Array[String]): Unit = {
val tablePaths = List("path0_string_here","path1_string")
var maxIds = ArrayBuffer[Long]()
FileInputFormat.setInputPaths(conf, tablePaths(0))
AvroReadSupport.setAvroReadSchema(conf.getConfiguration, myTable0Class.getClassSchema)
ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[myTable0Class]])
val records = sc.newAPIHadoopRDD(conf.getConfiguration,
classOf[ParquetInputFormat[myTable0Class]],
classOf[Void],
classOf[myTable0Class]).map(x => x._2)
maxIds += records.map(_.getId).collect().max
FileInputFormat.setInputPaths(conf, tablePaths(1))
AvroReadSupport.setAvroReadSchema(conf.getConfiguration, myTable1Class.getClassSchema)
ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[myTable1Class]])
val records = sc.newAPIHadoopRDD(conf.getConfiguration,
classOf[ParquetInputFormat[myTable1Class]],
classOf[Void],
classOf[myTable1Class]).map(x => x._2)
maxIds += records.map(_.getId).collect().max
}
/* class as variable, used in a loop. I have seen the mountain... */
def hopedFor(args: Array[String]): Unit = {
val tablePaths = List("path0_string_here","path1_string")
var maxIds = ArrayBuffer[Long]()
val tableClasses = List(classOf[myTable0Class],classOf[myTable1Class]) /* error free, but does not get me where I'm trying to go */
var counter=0
tableClasses.foreach { tc =>
FileInputFormat.setInputPaths(conf, tablePaths(counter))
AvroReadSupport.setAvroReadSchema(conf.getConfiguration, tc.getClassSchema)
ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[tc]])
val records = sc.newAPIHadoopRDD(conf.getConfiguration,
classOf[ParquetInputFormat[tc]],
classOf[Void],
classOf[tc]).map(x => x._2)
maxIds += records.map(_.getId).collect().max /* all the myTableXXX classes have getId() */
counter += 1
}
}
}
/* the classes being referenced... */
@org.apache.avro.specific.AvroGenerated
public class myTable0Class extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"rsivr_surveyquestiontypes\",\"namespace\":\"myJavaProject\",\"fields\":[{\"name\":\"id\",\"type\":\"in t\"},{\"name\":\"description\",\"type\":\"st,ing\"},{\"name\":\"scale_range\",\"type\":\"int\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public int id;
yada.yada.yada0
}
@org.apache.avro.specific.AvroGenerated
public class myTable1Class extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"rsivr_surveyresultdetails\",\"namespace\":\"myJavaProject\",\"fields\":[{\"name\":\"id\",\"type\":\"in t\"},{\"name\":\"survey_dts\",\"type\":\"string\"},{\"name\":\"survey_id\",\"type\":\"int\"},{\"name\":\"question\",\"type\":\"int\"},{\"name\":\"caller_id\",\"type\":\"string\"},{\"name\":\"rec_msg\",\"type\":\"string\"},{\"name\ ":\"note\",\"type\":\"string\"},{\"name\":\"lang\",\"type\":\"string\"},{\"name\":\"result\",\"type\":\"string\"}]}");
public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
@Deprecated public int id;
yada.yada.yada1
}
Upvotes: 0
Views: 572
Reputation: 40510
Something like this, perhaps:
def doStuff[T <: SpecificRecordBase : ClassTag](index: Int, schema: => Schema, clazz: Class[T]) = {
FileInputFormat.setInputPaths(conf, tablePaths(index))
AvroReadSupport.setAvroReadSchema(conf.getConfiguration, schema)
ParquetInputFormat.setReadSupportClass(conf, classOf[AvroReadSupport[T]])
val records = sc.newAPIHadoopRDD(conf.getConfiguration,
classOf[ParquetInputFormat[T]],
classOf[Void],
clazz).map(x => x._2)
maxIds += records.map(_.getId).collect().max
}
Seq(
(classOf[myTable0Class], myTable0Class.getClassSchema _),
(classOf[myTable1Class], myTable1Class.getClassSchema _)
).zipWithIndex
.foreach { case ((clazz, schema), index) => doStuff(index, schema, clazz) }
You could use reflection to invoke getClassSchema
instead (clazz.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema]
), then you would not need to pass it in as a aprameter, just clazz would be enough, but that's kinda cheating ... I like this approach a bit better.
Upvotes: 2