[torqueusers] Job pipeline software for use with Torque?
Kevin Murphy
murphy at genome.chop.edu
Fri Feb 6 12:14:29 MST 2009
Hi,
I'm looking for a layer, preferably in Java, that would sit on top of
Torque and facilitate the construction and execution of large pipelines
of jobs. I've written my own system in Perl, but it lacks some features
I'd like, and I'd really prefer to be using Java, as I mentioned. What
I'm imagining would come with an abstract job class to enforce useful
and required methods; a job manager to instantiate the job objects,
automatically build corresponding scripts and execute them; some way of
providing configuration to the jobs; a way to define dependencies
between jobs; and a way to report on pipeline progress.
To give you an idea of what I'm looking for, I have written my own
pipeline machinery in object-oriented Perl with an abstract Job class
that wraps an invocation of qsub. I handle job dependencies via
Torque's native mechanism, and I have a job manager class that makes it
easy to define dependencies between job objects as they are created.
When the pipeline is built, a root job is submitted first in a held
state, and when the job manager class has finished creating all job
objects and qsubbing the corresponding Torque jobs, it releases the hold
on the root job.
I wanted to make it as easy as possible, and in some cases mandatory,
for computational tasks to "do the right thing". The abstract methods
of the job class are configure, job_label,
verifyRequirementsAtSubmitTime, runtimeOutputsNeedRebuilding,
runtimeInputsExist, and execute. This allows the job manager to ensure
that all task prerequisites exist (on all nodes, by default), and it
allows a generic, automatically generated job script to always check
inputs and outputs, terminate if the outputs are more recent than the
inputs, and of course, to execute. The generic job script also handles
pipeline progress logging without individual job classes needing to
worry about that.
One of the nice things about this pipeline machinery is that it makes it
easy to restart a pipeline after one of the job objects has failed in
the middle. You just fix the problem and blindly rerun the pipeline,
assuming job classes conscientiously implement the abstract methods.
The same set of jobs will be created as in the initial run, but the jobs
that have already executed will detect this fact and won't bother
invoking their 'execute' methods. (Jobs are free to force themselves to
always execute, of course).
Here's one thing I'm missing: ideally the job manager and the job
classes should have persistent memories, probably in a SQL database.
Not only would this help reporting on pipeline progress and historical
pipeline runs, but it would allow the jobs to learn from experience and
report realistic resource requirements to Torque, which could help
Torque to make better scheduling decisions (maybe).
I'm sure there are other features I'd like, but it's been a while since
I messed with this.
-Kevin Murphy
P.S.: some code snippets to illustrate the above.
Excerpt from the master pipeline script that wires job objects together:
...
my $pipeline = Fable::Pipeline::JobManager->new(config=>$fableConfig,
std_args => \%std_args, %extraJobManagerOptions)
or die "Error creating JobManager object";
...
my $createDatabaseJobName
= $pipeline->add(
class=>'CreateDatabaseJob',
depend=>$initialNotifyJobName,
); # aborts if error
my $sectionWeightsTableJobName
= $pipeline->add(
class=>'SectionWeightsTableJob',
depend=>$createDatabaseJobName,
noInput=>1,
); # aborts if error
...
$pipeline->run; # aborts if error
# Merely submits and returns; pipeline progress is checked with external
tools
...
@@@@ Sample job class: @@@@
package Fable::Pipeline::CreateDatabaseJob;
use Fable::Pipeline::Job;
use strict;
use warnings;
our @ISA = qw(Fable::Pipeline::Job);
our $VERSION = '0.01';
sub job_label {
return 'FabCreateDB';
}
sub configure {
return 1; # This task has no config of its own
}
sub verifyRequirementsAtSubmitTime {
return 1; # Already checked by the Config
object
}
sub runtimeInputsExist {
return 1; # No inputs, hence trivial
success
}
sub runtimeOutputsNeedRebuilding {
my $self = shift;
my $dbChecker = Fable::Pipeline::PostgresqlChecker->new(config =>
$self->config);
return !$dbChecker->createdb_database_exists;
}
sub execute {
my $self = shift;
my $dbChecker = Fable::Pipeline::PostgresqlChecker->new(config =>
$self->config);
$dbChecker->create_createdb;
return 1;
}
More information about the torqueusers
mailing list