diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy index 6c3ab85..2370548 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/config/NomadClientOpts.groovy @@ -37,6 +37,11 @@ class NomadClientOpts{ final String address final String token + final connectionTimeout + + //NOTE: For now, these are not exposed. + final readTimeout = 6000 + final writeTimeout = 6000 NomadClientOpts(Map nomadClientOpts, Map env=null){ assert nomadClientOpts!=null @@ -50,6 +55,7 @@ class NomadClientOpts{ address +="/" this.address = address + API_VERSION this.token = nomadClientOpts.token ?: sysEnv.get('NOMAD_TOKEN') + this.connectionTimeout = nomadClientOpts.connectionTimeout ?: 6000 //TODO: Add mTLS properties and env vars // https://developer.hashicorp.com/nomad/docs/commands#mtls-environment-variables diff --git a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy index 21cc80c..1adbdeb 100644 --- a/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy +++ b/plugins/nf-nomad/src/main/nextflow/nomad/executor/NomadService.groovy @@ -40,7 +40,7 @@ import java.nio.file.Path @Slf4j @CompileStatic -class NomadService implements Closeable{ +class NomadService implements Closeable { NomadConfig config @@ -49,20 +49,27 @@ class NomadService implements Closeable{ NomadService(NomadConfig config) { this.config = config - //TODO: Accommodate these connection level options in clientOpts() - final CONNECTION_TIMEOUT_MILLISECONDS = 60000 - final READ_TIMEOUT_MILLISECONDS = 60000 - final WRITE_TIMEOUT_MILLISECONDS = 60000 + ApiClient apiClient = new ApiClient(connectTimeout: config.clientOpts().connectionTimeout, + readTimeout: config.clientOpts().readTimeout, + writeTimeout: config.clientOpts().writeTimeout) - ApiClient apiClient = new ApiClient( connectTimeout: CONNECTION_TIMEOUT_MILLISECONDS, readTimeout: READ_TIMEOUT_MILLISECONDS, writeTimeout: WRITE_TIMEOUT_MILLISECONDS) - apiClient.basePath = config.clientOpts().address - log.debug "[NOMAD] Client Address: ${config.clientOpts().address}" - if( config.clientOpts().token ){ - log.debug "[NOMAD] Client Token: ${config.clientOpts().token?.take(5)}.." - apiClient.apiKey = config.clientOpts().token + try { + apiClient.basePath = config.clientOpts().address + log.debug "[NOMAD] Client Address: ${config.clientOpts().address}" + + if (config.clientOpts().token) { + log.debug "[NOMAD] Client Token: ${config.clientOpts().token?.take(5)}.." + apiClient.apiKey = config.clientOpts().token + } + this.jobsApi = new JobsApi(apiClient) + + } catch (Exception e) { + + log.warn "[NOMAD] Cannot establish connection with the server | ${e.message}" + } - this.jobsApi = new JobsApi(apiClient) + } protected Resources getResources(TaskRun task) { @@ -70,8 +77,8 @@ class NomadService implements Closeable{ final DEFAULT_MEMORY = "500.MB" final taskCfg = task.getConfig() - final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer - final taskMemory = taskCfg.get("memory") ? new MemoryUnit( taskCfg.get("memory") as String ) : new MemoryUnit(DEFAULT_MEMORY) + final taskCores = !taskCfg.get("cpus") ? DEFAULT_CPUS : taskCfg.get("cpus") as Integer + final taskMemory = taskCfg.get("memory") ? new MemoryUnit(taskCfg.get("memory") as String) : new MemoryUnit(DEFAULT_MEMORY) final res = new Resources() .cores(taskCores) @@ -84,7 +91,7 @@ class NomadService implements Closeable{ void close() throws IOException { } - String submitTask(String id, TaskRun task, List args, Mapenv, Path saveJsonPath=null){ + String submitTask(String id, TaskRun task, List args, Map env, Path saveJsonPath = null) { Job job = new Job(); job.ID = id job.name = task.name @@ -99,10 +106,10 @@ class NomadService implements Closeable{ JobRegisterRequest jobRegisterRequest = new JobRegisterRequest() jobRegisterRequest.setJob(job) - if( saveJsonPath ) try { + if (saveJsonPath) try { saveJsonPath.text = job.toString() } - catch( Exception e ) { + catch (Exception e) { log.debug "WARN: unable to save request json -- cause: ${e.message ?: e}" } @@ -117,26 +124,26 @@ class NomadService implements Closeable{ } - TaskGroup createTaskGroup(TaskRun taskRun, List args, Mapenv){ + TaskGroup createTaskGroup(TaskRun taskRun, List args, Map env) { //NOTE: Force a single-allocation with no-retries per nomad job definition final TASK_RESCHEDULE_ATTEMPTS = 0 final TASK_RESTART_ATTEMPTS = 0 - final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy().attempts(TASK_RESCHEDULE_ATTEMPTS) - final RestartPolicy taskRestartPolicy = new RestartPolicy().attempts(TASK_RESTART_ATTEMPTS) + final ReschedulePolicy taskReschedulePolicy = new ReschedulePolicy().attempts(TASK_RESCHEDULE_ATTEMPTS) + final RestartPolicy taskRestartPolicy = new RestartPolicy().attempts(TASK_RESTART_ATTEMPTS) def task = createTask(taskRun, args, env) def taskGroup = new TaskGroup( name: "group", - tasks: [ task ], + tasks: [task], reschedulePolicy: taskReschedulePolicy, restartPolicy: taskRestartPolicy ) - if( config.jobOpts().volumeSpec ) { + if (config.jobOpts().volumeSpec) { taskGroup.volumes = [:] - config.jobOpts().volumeSpec.eachWithIndex { volumeSpec , idx-> + config.jobOpts().volumeSpec.eachWithIndex { volumeSpec, idx -> if (volumeSpec && volumeSpec.type == JobVolume.VOLUME_CSI_TYPE) { taskGroup.volumes["vol_${idx}".toString()] = new VolumeRequest( type: volumeSpec.type, @@ -159,7 +166,7 @@ class NomadService implements Closeable{ return taskGroup } - Task createTask(TaskRun task, List args, Mapenv) { + Task createTask(TaskRun task, List args, Map env) { final DRIVER = "docker" final DRIVER_PRIVILEGED = true @@ -190,18 +197,18 @@ class NomadService implements Closeable{ return taskDef } - protected Task volumes(TaskRun task, Task taskDef, String workingDir){ - if( config.jobOpts().dockerVolume){ + protected Task volumes(TaskRun task, Task taskDef, String workingDir) { + if (config.jobOpts().dockerVolume) { String destinationDir = workingDir.split(File.separator).dropRight(2).join(File.separator) taskDef.config.mount = [ - type : "volume", - target : destinationDir, - source : config.jobOpts().dockerVolume, - readonly : false + type : "volume", + target : destinationDir, + source : config.jobOpts().dockerVolume, + readonly: false ] } - if( config.jobOpts().volumeSpec){ + if (config.jobOpts().volumeSpec) { taskDef.volumeMounts = [] config.jobOpts().volumeSpec.eachWithIndex { volumeSpec, idx -> String destinationDir = volumeSpec.workDir ? @@ -236,16 +243,16 @@ class NomadService implements Closeable{ taskDef } - protected Task constraint(TaskRun task, Task taskDef){ - if( config.jobOpts().constraintSpec ){ + protected Task constraint(TaskRun task, Task taskDef) { + if (config.jobOpts().constraintSpec) { def constraint = new Constraint() - if(config.jobOpts().constraintSpec.attribute){ + if (config.jobOpts().constraintSpec.attribute) { constraint.ltarget(config.jobOpts().constraintSpec.attribute) } constraint.operand(config.jobOpts().constraintSpec.operator ?: "=") - if(config.jobOpts().constraintSpec.value){ + if (config.jobOpts().constraintSpec.value) { constraint.rtarget(config.jobOpts().constraintSpec.value) } taskDef.constraints([constraint]) @@ -254,15 +261,15 @@ class NomadService implements Closeable{ taskDef } - protected Task constraints(TaskRun task, Task taskDef){ + protected Task constraints(TaskRun task, Task taskDef) { def constraints = [] as List - if( config.jobOpts().constraintsSpec ){ + if (config.jobOpts().constraintsSpec) { def list = ConstraintsBuilder.constraintsSpecToList(config.jobOpts().constraintsSpec) constraints.addAll(list) } - if( task.processor?.config?.get(TaskDirectives.CONSTRAINTS) && + if (task.processor?.config?.get(TaskDirectives.CONSTRAINTS) && task.processor?.config?.get(TaskDirectives.CONSTRAINTS) instanceof Closure) { Closure closure = task.processor?.config?.get(TaskDirectives.CONSTRAINTS) as Closure JobConstraints constraintsSpec = JobConstraints.parse(closure) @@ -270,33 +277,32 @@ class NomadService implements Closeable{ constraints.addAll(list) } - if( constraints.size()) { + if (constraints.size()) { taskDef.constraints(constraints) } taskDef } - - protected Job assignDatacenters(TaskRun task, Job job){ + protected Job assignDatacenters(TaskRun task, Job job) { def datacenters = task.processor?.config?.get(TaskDirectives.DATACENTERS) - if( datacenters ){ - if( datacenters instanceof List) { - job.datacenters( datacenters as List) + if (datacenters) { + if (datacenters instanceof List) { + job.datacenters(datacenters as List) return job; } - if( datacenters instanceof Closure) { + if (datacenters instanceof Closure) { String str = datacenters.call().toString() - job.datacenters( [str]) + job.datacenters([str]) return job; } - job.datacenters( [datacenters.toString()] as List) + job.datacenters([datacenters.toString()] as List) return job } job } - String getJobState(String jobId){ + String getJobState(String jobId) { try { List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) AllocationListStub last = allocations?.sort { @@ -305,31 +311,30 @@ class NomadService implements Closeable{ String currentState = last?.taskStates?.values()?.last()?.state log.debug "Task $jobId , state=$currentState" currentState ?: "Unknown" - }catch(Exception e){ + } catch (Exception e) { log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e) "dead" } } - - boolean checkIfRunning(String jobId){ + boolean checkIfRunning(String jobId) { try { Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) log.debug "[NOMAD] checkIfRunning jobID=$job.ID; status=$job.status" job.status == "running" - }catch (Exception e){ + } catch (Exception e) { log.debug("[NOMAD] Failed to get jobState ${jobId} -- Cause: ${e.message ?: e}", e) false } } - boolean checkIfDead(String jobId){ - try{ + boolean checkIfDead(String jobId) { + try { Job job = jobsApi.getJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null) log.debug "[NOMAD] checkIfDead jobID=$job.ID; status=$job.status" job.status == "dead" - }catch (Exception e){ + } catch (Exception e) { log.debug("[NOMAD] Failed to get job ${jobId} -- Cause: ${e.message ?: e}", e) true } @@ -339,28 +344,28 @@ class NomadService implements Closeable{ purgeJob(jobId, false) } - void jobPurge(String jobId){ + void jobPurge(String jobId) { purgeJob(jobId, true) } - protected void purgeJob(String jobId, boolean purge){ + protected void purgeJob(String jobId, boolean purge) { log.debug "[NOMAD] purgeJob with jobId=${jobId}" try { jobsApi.deleteJob(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, purge, true) - }catch(Exception e){ + } catch (Exception e) { log.debug("[NOMAD] Failed to delete job ${jobId} -- Cause: ${e.message ?: e}", e) } } String getClientOfJob(String jobId) { - try{ + try { List allocations = jobsApi.getJobAllocations(jobId, config.jobOpts().region, config.jobOpts().namespace, null, null, null, null, null, null, null, null) - if( !allocations ){ + if (!allocations) { return null } AllocationListStub jobAllocation = allocations.first() return jobAllocation.nodeName - }catch (Exception e){ + } catch (Exception e) { log.debug("[NOMAD] Failed to get job allocations ${jobId} -- Cause: ${e.message ?: e}", e) throw new ProcessSubmitException("[NOMAD] Failed to get alloactions ${jobId} -- Cause: ${e.message ?: e}", e) }