Skip to content

Commit

Permalink
add priority directive in processors
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Nov 18, 2024
1 parent 0fd345f commit 982a2ac
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,22 @@ class JobBuilder {
return this
}

static Job assignDatacenters(TaskRun task, Job job){
JobBuilder withDatacenters(TaskRun task){
def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS)
if( datacenters ){
if( datacenters instanceof List<String>) {
job.datacenters( datacenters as List<String>)
return job;
return this;
}
if( datacenters instanceof Closure) {
String str = datacenters.call().toString()
job.datacenters( [str])
return job;
return this;
}
job.datacenters( [datacenters.toString()] as List<String>)
return job
return this
}
job
this
}

JobBuilder withNamespace(String namespace) {
Expand Down Expand Up @@ -291,7 +291,7 @@ class JobBuilder {
taskDef
}

static Job spreads(TaskRun task, Job jobDef, NomadJobOpts jobOpts){
JobBuilder withSpreads(TaskRun task, NomadJobOpts jobOpts){
def spreads = [] as List<Spread>
if( jobOpts.spreadsSpec ){
def list = SpreadsBuilder.spreadsSpecToList(jobOpts.spreadsSpec)
Expand All @@ -307,10 +307,20 @@ class JobBuilder {
}

spreads.each{
jobDef.addSpreadsItem(it)
job.addSpreadsItem(it)
}
jobDef
this
}

JobBuilder withPriority(int priority){
job.priority = priority
this
}

JobBuilder withPriority(TaskRun task){
if( task.processor?.config?.containsKey(TaskDirectives.PRIORITY) ){
withPriority( task.processor?.config?.get(TaskDirectives.PRIORITY) as int)
}
this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ class NomadService implements Closeable{
.withId(id)
.withName(task.name)
.withType("batch")
// .withDatacenters(task, this.config.jobOpts().datacenters)
.withDatacenters(task)
.withNamespace(this.config.jobOpts().namespace)
.withTaskGroups([JobBuilder.createTaskGroup(task, args, env, this.config.jobOpts())])
.withSpreads(task, this.config.jobOpts())
.withPriority(task)
.build()

JobBuilder.assignDatacenters(task, job)
JobBuilder.spreads(task, job, this.config.jobOpts())

JobRegisterRequest jobRegisterRequest = new JobRegisterRequest()
jobRegisterRequest.setJob(job)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ class TaskDirectives {

public static final String SPREAD = "spread"

public static final String PRIORITY = "priority"

public static final List<String> ALL = [
DATACENTERS,
CONSTRAINTS,
SECRETS,
SPREAD
SPREAD,
PRIORITY
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.nomad.builders

import nextflow.nomad.config.NomadConfig
import nextflow.nomad.config.NomadJobOpts
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import okhttp3.mockwebserver.MockWebServer
import spock.lang.Specification
Expand Down Expand Up @@ -98,5 +99,20 @@ class JobBuilderSpec extends Specification {
taskGroup.tasks[0].env == env
}

def "test priority in task"(){
given:
def taskRun = Mock(TaskRun)
taskRun.container >> "test-container"
taskRun.workDir >> new File("/test/workdir").toPath()
taskRun.getProcessor() >> Mock(TaskProcessor) {
getConfig() >> [priority: 666]
}

when:
def job = new JobBuilder().withPriority(taskRun).build()

then:
job.priority == 666
}

}
9 changes: 9 additions & 0 deletions validation/debug-pipeline.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

./wait-nomad.sh

./nomad system gc

NXF_ASSETS=$(pwd)/nomad_temp/scratchdir/assets \
NXF_CACHE_DIR=$(pwd)/nomad_temp/scratchdir/cache \
nextflow -remote-debug run -w $(pwd)/nomad_temp/scratchdir/ "$@"
1 change: 1 addition & 0 deletions validation/directives/nextflow.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ profiles{
process {
withName: sayHello {
datacenters = ['test-datacenter', 'demo-datacenter']
priority = 66
}
}
}
Expand Down

0 comments on commit 982a2ac

Please sign in to comment.