Reputation: 53
Im trying to implement UDF function to process various source / input files. Input files differ by the number of columns. My intention is to have generic UDF function. Each run of pig script processes one type of the input file (same number of records delimited by '|'.
The UDF function should read all input records separated by delimiter (|) and produce one bag with two tuples based on some conditions, eg. input (1,2,3,4,5,6) output a) {(1,3), (2,4,5,6)} or b) {(2,3,4), (1,5,6)}
I am unable to extend outputSchema method to handle creation of tuples with different size. There is no way to pass extra argument to outputSchema method. Its not possible to use temporary variable defined as part of the EvalFunc class definition because its value is null-ed by each run.
Any hint? Thank you
UPDATE:
I execute the command below using GRUNT, the inputSchema is provided as you can see after "AS"
sourceData = foreach sourceData generate com.pig.Data('test.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray)));
The UDF code is here ...
public Schema outputSchema(Schema input) {
(line 233) System.out.println("----------------------" + input.getFields().size());
Error:
Pig Stack Trace
---------------
ERROR 1200: java.lang.NullPointerException
Failed to parse: java.lang.NullPointerException
at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:201)
at org.apache.pig.PigServer$Graph.validateQuery(PigServer.java:1707)
at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1680)
at org.apache.pig.PigServer.registerQuery(PigServer.java:623)
at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:1082)
at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:505)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:230)
at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:205)
at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:66)
at org.apache.pig.Main.run(Main.java:565)
at org.apache.pig.Main.main(Main.java:177)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.RuntimeException: java.lang.NullPointerException
at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:306)
at org.apache.pig.newplan.logical.expression.UserFuncExpression.getFieldSchema(UserFuncExpression.java:244)
at org.apache.pig.newplan.logical.optimizer.FieldSchemaResetter.execute(SchemaResetter.java:264)
at org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor.visit(AllSameExpressionVisitor.java:143)
at org.apache.pig.newplan.logical.expression.UserFuncExpression.accept(UserFuncExpression.java:113)
at org.apache.pig.newplan.ReverseDependencyOrderWalker.walk(ReverseDependencyOrderWalker.java:70)
at org.apache.pig.newplan.PlanVisitor.visit(PlanVisitor.java:52)
at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visitAll(SchemaResetter.java:67)
at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:122)
at org.apache.pig.newplan.logical.relational.LOGenerate.accept(LOGenerate.java:245)
at org.apache.pig.newplan.DependencyOrderWalker.walk(DependencyOrderWalker.java:75)
at org.apache.pig.newplan.logical.optimizer.SchemaResetter.visit(SchemaResetter.java:114)
at org.apache.pig.parser.LogicalPlanBuilder.buildForeachOp(LogicalPlanBuilder.java:1055)
at org.apache.pig.parser.LogicalPlanGenerator.foreach_clause(LogicalPlanGenerator.java:15896)
at org.apache.pig.parser.LogicalPlanGenerator.op_clause(LogicalPlanGenerator.java:1933)
at org.apache.pig.parser.LogicalPlanGenerator.statement(LogicalPlanGenerator.java:560)
at org.apache.pig.parser.LogicalPlanGenerator.query(LogicalPlanGenerator.java:421)
at org.apache.pig.parser.QueryParserDriver.parse(QueryParserDriver.java:191)
... 16 more
Caused by: java.lang.NullPointerException
at com.mortardata.pig.DataSpliter.outputSchema(DataSpliter.java:233)
... 34 more
================================================================================
UPDATE2:
ok, input schema is propagated from the previous pig command...
sourceData = load 'test.csv' using PigStorage(',') as (VIN: chararray,Birthdate: chararray, name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray);
sourceData = foreach sourceData generate com.pig.Data'test_data_desc.json', *) as (t:(s:(VIN: chararray,Birthdate: chararray), n:(name: chararray,customerId: chararray,Mileage: chararray,Fuel_Consumption: chararray)));
Which is not useful -( because its not possible to propagate any additional attributes or its not possible to create any other more complex logic inside outputSchema method ;-(
Upvotes: 1
Views: 603
Reputation: 2485
In the outputSchema function you can access to the input schema, and use the input schema information to generate your output schema dynamically based on the input (if the input somehow reflects the expected output). Example:
public Schema outputSchema(Schema input) {
Schema mySchema = new Schema();
if (input.getFields().size() == 3) {
mySchema.add(new Schema.FieldSchema("data1", DataType.DOUBLE));
mySchema.add(new Schema.FieldSchema("data2", DataType.DOUBLE));
mySchema.add(new Schema.FieldSchema("data3", DataType.DOUBLE));
} else {
mySchema.add(new Schema.FieldSchema("data", DataType.CHARARRAY));
}
return mySchema;
}
I hope this helps.
Upvotes: 0