rabejens
rabejens

Reputation: 8122

How to programmatically get all running jobs in a Hadoop cluster using the new API?

I have a software component which submits MR jobs to Hadoop. I now want to check if there are other jobs running before submitting it. I found out that there is a Cluster object in the new API which can be used to query the cluster for running jobs, get their configurations and extract the relevant information from them. However I am having problems using this.

Just doing new Cluster(conf) where conf is a valid Configuration which can be used to access this cluster (e.g., to submit jobs to it) leaves the object unconfigured, and the getAllJobStatuses() method of Cluster returns null.

Extracting mapreduce.jobtracker.address from the configuration, constructing an InetSocketAddress from it and using the other constructor of Cluster throws Cannot initialize Cluster. Please check your configuration for mapreduce.framework.name and the correspond server addresses..

Using the old api, doing something like new JobClient(conf).getAllJobs() throws an NPE.

What am I missing here? How can I programmatically get the running jobs?

Upvotes: 2

Views: 1947

Answers (2)

Aravind M
Aravind M

Reputation: 13

I tried it like this, it worked for me, but it is after submitting the job

JobClient jc = new JobClient(job.getConfiguration());

  for(JobStatus js: jc.getAllJobs())
  {
    if(js.getState().getValue() == State.RUNNING.getValue())
    {

    }
  }

  jc.close();

or else we can get the cluster from job api and there are methods which return all the jobs, jobs status

cluster.getAllJobStatuses();

Upvotes: 0

rabejens
rabejens

Reputation: 8122

I investigated even more, and I solved it. Thomas Jungblut was right, it was because of the mini cluster. I used the mini cluster following this blog post which turned out to work for MR jobs, but set up the mini cluster in a deprecated way with an incomplete configuration. The Hadoop Wiki has a page on how to develop unit tests which also explains how to correctly set up a mini cluster.

Essentially, I do the mini cluster setup the following way:

// Create a YarnConfiguration for bootstrapping the minicluster
final YarnConfiguration bootConf = new YarnConfiguration();
// Base directory to store HDFS data in
final File hdfsBase = Files.createTempDirectory("temp-hdfs-").toFile();
bootConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, hdfsBase.getAbsolutePath());
// Start Mini DFS cluster
final MiniDFSCluster hdfsCluster = new MiniDFSCluster.Builder(bootConf).build();
// Configure and start Mini MR YARN cluster
bootConf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 64);
bootConf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class, ResourceScheduler.class);
final MiniMRYarnCluster yarnCluster = new MiniMRYarnCluster("test-cluster", 1);
yarnCluster.init(bootConf);
yarnCluster.start();
// Get the "real" Configuration to use from now on
final Configuration conf = yarnCluster.getConfig();
// Get the filesystem
final FileSystem fs = new Path ("hdfs://localhost:" + hdfsCluster.getNameNodePort() + "/").getFileSystem(conf);

Now, I have conf and fs I can use to submit jobs and access HDFS, and new Cluster(conf) and cluster.getAllJobStatuses works as expected.

When everything is done, to shut down and clean up, I call:

yarnCluster.stop();
hdfsCluster.shutdown();
FileUtils.deleteDirectory(hdfsBase); // from Apache Commons IO

Note: JAVA_HOME must be set for this to work. When building on Jenkins, make sure JAVA_HOME is set for the default JDK. Alternatively you can explicitly state a JDK to use, Jenkins will then set up JAVA_HOME automatically.

Upvotes: 2

Related Questions