Reputation: 6954
I'm using Hadoop 1.2.1 and Spring Hadoop 1.0.2
I wanted to check the Spring autowiring in a Hadoop Mapper. I wrote this configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:hdp="http://www.springframework.org/schema/hadoop"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">
<context:property-placeholder location="configuration.properties"/>
<context:component-scan base-package="it.test"/>
<hdp:configuration id="hadoopConfiguration">
fs.default.name=${hd.fs}
</hdp:configuration>
<hdp:job id="my-job"
mapper="hadoop.mapper.MyMapper"
reducer="hadoop.mapper.MyReducer"
output-path="/root/Scrivania/outputSpring/out"
input-path="/root/Scrivania/outputSpring/in" jar="" />
<hdp:job-runner id="my-job-runner" job-ref="my-job" run-at-startup="true"/>
<hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="${hbase.host}" zk-port="${hbase.port}"/>
<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
<property name="configuration" ref="hbaseConfiguration"/>
</bean>
</beans>
Then I created this Mapper
public class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
private static final Log logger = ....
@Autowired
private IHistoricalDataService hbaseService;
private List<HistoricalDataModel> data;
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected void cleanup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
super.setup(context);
try {
data = hbaseService.findAllHistoricalData();
logger.warn("Data "+data);
} catch (Exception e) {
String message = "Errore nel setup del contesto; messaggio errore: "+e.getMessage();
logger.fatal(message, e);
throw new InterruptedException(message);
}
}
@Override
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
super.map(key, value, context);
}
}
As you can see MyMapper does nothing; the only thing I want to print is the data variable; nothing exception
When I launch it in my IDE (Eclipse Luna) by a JUnit Test I can see only this prints:
16:19:11,902 INFO [XmlBeanDefinitionReader] Loading XML bean definitions from class path resource [application-context.xml]
16:19:12,540 INFO [GenericApplicationContext] Refreshing org.springframework.context.support.GenericApplicationContext@150e804: startup date [Mon Dec 02 16:19:12 CET 2013]; root of context hierarchy
16:19:12,693 INFO [PropertySourcesPlaceholderConfigurer] Loading properties file from class path resource [configuration.properties]
16:19:12,722 INFO [DefaultListableBeanFactory] Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@109f81a: defining beans [org.springframework.context.support.PropertySourcesPlaceholderConfigurer#0,pinfClusteringHistoricalDataDao,historicalDataServiceImpl,clusterAnalysisSvcImpl,org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalRequiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,hadoopConfiguration,clusterAnalysisJob,clusterAnalysisJobRunner,hbaseConfiguration,hbaseTemplate,org.springframework.context.annotation.ConfigurationClassPostProcessor.importAwareProcessor]; root of factory hierarchy
16:19:13,516 INFO [JobRunner] Starting job [clusterAnalysisJob]
16:19:13,568 WARN [NativeCodeLoader] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16:19:13,584 WARN [JobClient] No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
16:19:13,619 INFO [FileInputFormat] Total input paths to process : 0
16:19:13,998 INFO [JobClient] Running job: job_local265750426_0001
16:19:14,065 INFO [LocalJobRunner] Waiting for map tasks
16:19:14,065 INFO [LocalJobRunner] Map task executor complete.
16:19:14,127 INFO [ProcessTree] setsid exited with exit code 0
16:19:14,134 INFO [Task] Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@b1258d
16:19:14,144 INFO [LocalJobRunner]
16:19:14,148 INFO [Merger] Merging 0 sorted segments
16:19:14,149 INFO [Merger] Down to the last merge-pass, with 0 segments left of total size: 0 bytes
16:19:14,149 INFO [LocalJobRunner]
16:19:14,219 INFO [Task] Task:attempt_local265750426_0001_r_000000_0 is done. And is in the process of commiting
16:19:14,226 INFO [LocalJobRunner]
16:19:14,226 INFO [Task] Task attempt_local265750426_0001_r_000000_0 is allowed to commit now
16:19:14,251 INFO [FileOutputCommitter] Saved output of task 'attempt_local265750426_0001_r_000000_0' to /root/Scrivania/outputSpring/out
16:19:14,254 INFO [LocalJobRunner] reduce > reduce
16:19:14,255 INFO [Task] Task 'attempt_local265750426_0001_r_000000_0' done.
16:19:15,001 INFO [JobClient] map 0% reduce 100%
16:19:15,005 INFO [JobClient] Job complete: job_local265750426_0001
16:19:15,007 INFO [JobClient] Counters: 13
16:19:15,007 INFO [JobClient] File Output Format Counters
16:19:15,007 INFO [JobClient] Bytes Written=0
16:19:15,007 INFO [JobClient] FileSystemCounters
16:19:15,007 INFO [JobClient] FILE_BYTES_READ=22
16:19:15,007 INFO [JobClient] FILE_BYTES_WRITTEN=67630
16:19:15,007 INFO [JobClient] Map-Reduce Framework
16:19:15,008 INFO [JobClient] Reduce input groups=0
16:19:15,008 INFO [JobClient] Combine output records=0
16:19:15,008 INFO [JobClient] Reduce shuffle bytes=0
16:19:15,008 INFO [JobClient] Physical memory (bytes) snapshot=0
16:19:15,008 INFO [JobClient] Reduce output records=0
16:19:15,008 INFO [JobClient] Spilled Records=0
16:19:15,008 INFO [JobClient] CPU time spent (ms)=0
16:19:15,009 INFO [JobClient] Total committed heap usage (bytes)=111935488
16:19:15,009 INFO [JobClient] Virtual memory (bytes) snapshot=0
16:19:15,009 INFO [JobClient] Reduce input records=0
16:19:15,009 INFO [JobRunner] Completed job [clusterAnalysisJob]
16:19:15,028 WARN [SpringHadoopTest] Scrivo............ OOOOOOO
It seems that the JOb starts but my Mapper is never executed; can anybody suggest to me where I'm wrong?
Upvotes: 2
Views: 1313
Reputation:
Is it possible that your input file exists, but is empty? With no input splits, no mapper tasks would ever get created. Just a guess...
Upvotes: 0
Reputation: 976
There is no autowiring of mappers or reducers. These classes are loaded by Hadoop so there is no application context associated with them at runtime. The application context is only available as part of the workflow orchestration of the jobs.
I don't know why your setup method isn't logging any messages, are you sure you specified the right class and package for the mapper?
-Thomas
Upvotes: 2