davdic
davdic

Reputation: 249

Oozie - Setting strategy on DistCp through action configuration

I have a workflow with a distCp action, and it's running fairly well. However, now I'm trying to change the copy strategy and am unable to do that through the action arguments. The documentation is fairly slim on this topic and looking at the source code for the distCp action executor did not help.

If running the distCp from the command line I can use the command line argument -strategy {uniformsize|dynamic} to set the copy strategy.

Using that logic I tried to do this in the oozie action.

<action name="distcp-run" retry-max="3" retry-interval="1">
    <distcp xmlns="uri:oozie:distcp-action:0.2">
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapreduce.job.queuename</name>
                <value>${poolName}</value>
            </property>
        </configuration>
        <arg>-Dmapreduce.job.queuename=${poolName}</arg>
        <arg>-Dmapreduce.job.name=distcp-s3-${wf:id()}</arg>
        <arg>-update</arg>
        <arg>-strategy dynamic</arg>
        <arg>${region}/d=${day2HoursAgo}/h=${hour2HoursAgo}</arg>
        <arg>${region2}/d=${day2HoursAgo}/h=${hour2HoursAgo}</arg>
        <arg>${region3}/d=${day2HoursAgo}/h=${hour2HoursAgo}</arg>
        <arg>${nameNode}${rawPath}/${partitionDate}</arg>
    </distcp>
    <ok to="join-distcp-steps"/>
    <error to="error-report"/>
</action>

However, the action fails when I execute.

From stdout:

...>>> Invoking Main class now >>>

Fetching child yarn jobs
tag id : oozie-1d1fa70383587ae625b6495e30a315f7
Child yarn jobs are found - 
Main class        : org.apache.hadoop.tools.DistCp
Arguments         :
                    -Dmapreduce.job.queuename=merged
                    -Dmapreduce.job.name=distcp-s3-0000019-160622133128476-oozie-oozi-W
                    -update
                    -strategy dynamic
                    s3a://myfirstregion/d=21/h=17,s3a://mysecondregion/d=21/h=17,s3a://ttv-logs-eu/tsv/clickstream-clean/y=2016/m=06/d=21/h=17,s3a://mythirdregion/d=21/h=17
                    hdfs://myurl:8020/data/raw/2016062117
found Distcp v2 Constructor
                    public org.apache.hadoop.tools.DistCp(org.apache.hadoop.conf.Configuration,org.apache.hadoop.tools.DistCpOptions) throws java.lang.Exception

<<< Invocation of Main class completed <<<

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, Returned value from distcp is non-zero (-1)
java.lang.RuntimeException: Returned value from distcp is non-zero (-1)
    at org.apache.oozie.action.hadoop.DistcpMain.run(DistcpMain.java:66)...

Looking at the syslog it seems that it grabbed the -strategy dynamic and tried to put it in the array of source paths:

2016-06-22 14:11:18,617 INFO [uber-SubtaskRunner] org.apache.hadoop.tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=true, deleteMissing=false, ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', sourceFileListing=null, sourcePaths=[-strategy dynamic, s3a://myfirstregion/d=21/h=17,s3a:/mysecondregion/d=21/h=17,s3a:/ttv-logs-eu/tsv/clickstream-clean/y=2016/m=06/d=21/h=17,s3a:/mythirdregion/d=21/h=17], targetPath=hdfs://myurl:8020/data/raw/2016062117, targetPathExists=true, preserveRawXattrs=false, filtersFile='null'}
2016-06-22 14:11:18,624 INFO [uber-SubtaskRunner] org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at sandbox/10.191.5.128:8032
2016-06-22 14:11:18,655 ERROR [uber-SubtaskRunner] org.apache.hadoop.tools.DistCp: Invalid input: 
org.apache.hadoop.tools.CopyListing$InvalidInputException: -strategy dynamic doesn't exist

So from the DistCpOptions there is a copyStrategy but it's set to a default uniformsize value. I've tried to move the argument in the first place, but then both -Dmapreduce arguments end up in the source paths (but -update does not).

How can I, through Oozie workflow configuration, set the copy strategy to dynamic?

Thanks.

Upvotes: 1

Views: 1485

Answers (1)

Binary Nerd
Binary Nerd

Reputation: 13927

Looking at the code, it doesn't seem possible to set the strategy via configuration. Instead of using the distcp-action you could use a map-reduce action, that way you can configure it however you want.

The Oozie MapReduce Cookbook has examples.

Looking at the Distcp code the relevant part is around line 237 at createJob().

Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
job.getConfiguration().set(JobContext.NUM_MAPS, String.valueOf(inputOptions.getMaxMaps()));

The code above isn't everything you will need, you'll need to look at the distcp source to work them all out.

So you would need to configure all of the properties yourself in a map-reduce action. This way you could set the InputFormatClass which is where the strategy setting is used.

You can see the available properties for the InputFormatClass in the distcp properties file here.

The one you need is org.apache.hadoop.tools.mapred.lib.DynamicInputFormat.

Upvotes: 1

Related Questions