[torqueusers] Job pipeline software for use with Torque?

Kevin Murphy murphy at genome.chop.edu
Fri Feb 6 12:14:29 MST 2009


I'm looking for a layer, preferably in Java, that would sit on top of 
Torque and facilitate the construction and execution of large pipelines 
of jobs.  I've written my own system in Perl, but it lacks some features 
I'd like, and I'd really prefer to be using Java, as I mentioned.  What 
I'm imagining would come with an abstract job class to enforce useful 
and required methods; a job manager to instantiate the job objects, 
automatically build corresponding scripts and execute them; some way of 
providing configuration to the jobs;  a way to define dependencies 
between jobs; and a way to report on pipeline progress.

To give you an idea of what I'm looking for, I have written my own 
pipeline machinery in object-oriented Perl with an abstract Job class 
that wraps an invocation of qsub.  I handle job dependencies via 
Torque's native mechanism, and I have a job manager class that makes it 
easy to define dependencies between job objects as they are created.  
When the pipeline is built, a root job is submitted first in a held 
state, and when the job manager class has finished creating all job 
objects and qsubbing the corresponding Torque jobs, it releases the hold 
on the root job.

I wanted to make it as easy as possible, and in some cases mandatory, 
for computational tasks to "do the right thing".  The abstract methods 
of the job class are configure, job_label, 
verifyRequirementsAtSubmitTime, runtimeOutputsNeedRebuilding, 
runtimeInputsExist, and execute.  This allows the job manager to ensure 
that all task prerequisites exist (on all nodes, by default),  and it 
allows a generic, automatically generated job script to always check 
inputs and outputs, terminate if the outputs are more recent than the 
inputs, and of course, to execute.  The generic job script also handles 
pipeline progress logging without individual job classes needing to 
worry about that.

One of the nice things about this pipeline machinery is that it makes it 
easy to restart a pipeline after one of the job objects has failed in 
the middle.  You just fix the problem and blindly rerun the pipeline, 
assuming job classes conscientiously implement the abstract methods.  
The same set of jobs will be created as in the initial run, but the jobs 
that have already executed will detect this fact and won't bother 
invoking their 'execute' methods.  (Jobs are free to force themselves to 
always execute, of course).

Here's one thing I'm missing: ideally the job manager and the job 
classes should have persistent memories, probably in a SQL database.  
Not only would this help reporting on pipeline progress and historical 
pipeline runs, but it would allow the jobs to learn from experience and 
report realistic resource requirements to Torque, which could help 
Torque to make better scheduling decisions (maybe).

I'm sure there are other features I'd like, but it's been a while since 
I messed with this.

-Kevin Murphy

P.S.: some code snippets to illustrate the above.

Excerpt from the master pipeline script that wires job objects together:

my $pipeline = Fable::Pipeline::JobManager->new(config=>$fableConfig, 
std_args => \%std_args, %extraJobManagerOptions)
  or die "Error creating JobManager object";
my $createDatabaseJobName
  = $pipeline->add(
                  );  # aborts if error

my $sectionWeightsTableJobName
  = $pipeline->add(
                  );  # aborts if error
$pipeline->run;   # aborts if error
# Merely submits and returns; pipeline progress is checked with external 

@@@@ Sample job class: @@@@

package Fable::Pipeline::CreateDatabaseJob;
use Fable::Pipeline::Job;
use strict;
use warnings;
our @ISA = qw(Fable::Pipeline::Job);
our $VERSION = '0.01';

sub job_label {
    return 'FabCreateDB';

sub configure {
  return 1;  # This task has no config of its own

sub verifyRequirementsAtSubmitTime {
  return 1;  # Already checked by the Config 

sub runtimeInputsExist {
  return 1;  # No inputs, hence trivial 


sub runtimeOutputsNeedRebuilding {
  my $self = shift;
  my $dbChecker = Fable::Pipeline::PostgresqlChecker->new(config => 
  return !$dbChecker->createdb_database_exists;

sub execute {
  my $self = shift;
  my $dbChecker = Fable::Pipeline::PostgresqlChecker->new(config => 
  return 1;

More information about the torqueusers mailing list