Skip to content

Commit

Permalink
new volume configuration to mount in the task
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Aguilera <[email protected]>
  • Loading branch information
jagedn committed Feb 27, 2024
1 parent a395aa0 commit 7c5aca2
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 0 deletions.
32 changes: 32 additions & 0 deletions plugins/nf-nomad/src/main/nextflow/nomad/NomadConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ import groovy.util.logging.Slf4j
class NomadConfig {
final static protected API_VERSION = "v1"

final static public String VOLUME_DOCKER_TYPE = "docker"
final static public String VOLUME_CSI_TYPE = "csi"
final static public String VOLUME_HOST_TYPE = "host"

final static protected String[] VOLUME_TYPES = [
VOLUME_CSI_TYPE, VOLUME_DOCKER_TYPE, VOLUME_HOST_TYPE
]

final NomadClientOpts clientOpts
final NomadJobOpts jobOpts

Expand Down Expand Up @@ -57,6 +65,7 @@ class NomadConfig {
final String region
final String namespace
final String dockerVolume
final VolumeSpec volumeSpec

NomadJobOpts(Map nomadJobOpts){
deleteOnCompletion = nomadJobOpts.containsKey("deleteOnCompletion") ?
Expand All @@ -71,6 +80,29 @@ class NomadConfig {
region = nomadJobOpts.region ?: null
namespace = nomadJobOpts.namespace ?: null
dockerVolume = nomadJobOpts.dockerVolume ?: null
if( dockerVolume ){
log.info "dockerVolume config will be deprecated, use volume type:'docker' name:'name' instead"
}
if( nomadJobOpts.volume && nomadJobOpts.volume instanceof Map){
volumeSpec = new VolumeSpec(nomadJobOpts.volume as Map<String, String>)
}else{
volumeSpec = null
}
}
}

class VolumeSpec{

final String type
final String name

VolumeSpec(Map<String, String> volumeConfig){
if( !VOLUME_TYPES.contains(volumeConfig.type) )
throw new IllegalArgumentException("Volume type $type is not supported")
if( !volumeConfig.name )
throw new IllegalArgumentException("Volume name is required")
this.type = volumeConfig.type
this.name = volumeConfig.name
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
* Copyright 2024, Evaluacion y Desarrollo de Negocios, Spain
* Copyright 2023, Stellenbosch University, South Africa
* Copyright 2022, Center for Medical Genetics, Ghent
*
Expand Down Expand Up @@ -28,6 +29,8 @@ import io.nomadproject.client.models.JobSummary
import io.nomadproject.client.models.Task
import io.nomadproject.client.models.TaskGroup
import io.nomadproject.client.models.TaskGroupSummary
import io.nomadproject.client.models.VolumeMount
import io.nomadproject.client.models.VolumeRequest
import nextflow.nomad.NomadConfig

/**
Expand Down Expand Up @@ -80,6 +83,15 @@ class NomadService implements Closeable{
name: "group",
tasks: [ task ]
)
if( config.jobOpts.volumeSpec){
taskGroup.volumes = [:]
taskGroup.volumes[config.jobOpts.volumeSpec.name]= new VolumeRequest(
type: config.jobOpts.volumeSpec.type,
source: config.jobOpts.volumeSpec.name,
attachmentMode: "file-system",
accessMode: "multi-node-multi-writer"
)
}
return taskGroup
}

Expand All @@ -105,6 +117,13 @@ class NomadService implements Closeable{
readonly : false
]
}
if( config.jobOpts.volumeSpec){
String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator)
task.volumeMounts = [ new VolumeMount(
destination: destinationDir,
volume: config.jobOpts.volumeSpec.name
)]
}
task
}

Expand Down
40 changes: 40 additions & 0 deletions plugins/nf-nomad/src/test/nextflow/nomad/NomadConfigSpec.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,44 @@ class NomadConfigSpec extends Specification {
expect:
config.jobOpts.namespace == "namespace"
}

void "should instantiate a volume spec if specified"() {
when:
def config = new NomadConfig([
jobs: [volume:[type:"docker", name:"test"]]
])

then:
config.jobOpts.volumeSpec
config.jobOpts.volumeSpec.type == NomadConfig.VOLUME_DOCKER_TYPE
config.jobOpts.volumeSpec.name == "test"

when:
def config2 = new NomadConfig([
jobs: [volume:[type:"csi", name:"test"]]
])

then:
config2.jobOpts.volumeSpec
config2.jobOpts.volumeSpec.type == NomadConfig.VOLUME_CSI_TYPE
config2.jobOpts.volumeSpec.name == "test"

when:
def config3 = new NomadConfig([
jobs: [volume:[type:"host", name:"test"]]
])

then:
config3.jobOpts.volumeSpec
config3.jobOpts.volumeSpec.type == NomadConfig.VOLUME_HOST_TYPE
config3.jobOpts.volumeSpec.name == "test"

when:
new NomadConfig([
jobs: [volume:[type:"not-supported", name:"test"]]
])

then:
thrown(IllegalArgumentException)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,62 @@ class NomadServiceSpec extends Specification{
state == "Starting"

}

void "submit a task with a volume"(){
given:
def config = new NomadConfig(
client:[
address : "http://${mockWebServer.hostName}:${mockWebServer.port}"
],
jobs:[
volume: [ name:'test', type:'csi']
]
)
def service = new NomadService(config)

String id = "theId"
String name = "theName"
String image = "theImage"
List<String> args = ["theCommand", "theArgs"]
String workingDir = "a/b/c"
Map<String, String>env = [test:"test"]

mockWebServer.enqueue(new MockResponse()
.setBody(JsonOutput.toJson(["EvalID":"test"]).toString())
.addHeader("Content-Type", "application/json"));
when:

def idJob = service.submitTask(id, name, image, args, workingDir,env)
def recordedRequest = mockWebServer.takeRequest();
def body = new JsonSlurper().parseText(recordedRequest.body.readUtf8())

then:
idJob

and:
recordedRequest.method == "POST"
recordedRequest.path == "/v1/jobs"

and:
body.Job
body.Job.ID == id
body.Job.Name == name
body.Job.Datacenters == []
body.Job.Type == "batch"
body.Job.TaskGroups.size() == 1
body.Job.TaskGroups[0].Name == "group"
body.Job.TaskGroups[0].Tasks.size() == 1
body.Job.TaskGroups[0].Tasks[0].Name == "nf-task"
body.Job.TaskGroups[0].Tasks[0].Driver == "docker"
body.Job.TaskGroups[0].Tasks[0].Config.image == image
body.Job.TaskGroups[0].Tasks[0].Config.work_dir == workingDir
body.Job.TaskGroups[0].Tasks[0].Config.command == args[0]
body.Job.TaskGroups[0].Tasks[0].Config.args == args.drop(1)

body.Job.TaskGroups[0].Volumes.size() == 1
body.Job.TaskGroups[0].Volumes['test'] == [AccessMode:"multi-node-multi-writer", AttachmentMode:"file-system", Source:"test", Type:"csi"]
body.Job.TaskGroups[0].Tasks[0].VolumeMounts.size() == 1
body.Job.TaskGroups[0].Tasks[0].VolumeMounts[0] == [Destination:"a", Volume:"test"]

}
}

0 comments on commit 7c5aca2

Please sign in to comment.