Skip to content

Commit

Permalink
Feat: adds AWS s3 mountpoints (#177)
Browse files Browse the repository at this point in the history
* mountpoints

* readme update

* correct param

* typo

* message

* updating CI
  • Loading branch information
dapineyro authored Dec 18, 2024
1 parent b4fedcc commit 01ba2e8
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.9", "3.11"]
python-version: ["3.8", "3.9", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## lifebit-ai/cloudos-cli: changelog

## v2.14.0 (2024-12-18)

- Adds the new `--accelerate-file-staging` parameter to job submission to add support for AWS S3 mountpoint for quicker file staging.

## v2.13.0 (2024-12-03)

### Feature
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ Options:
Default=660fae20f93358ad61e0104b
--cost-limit FLOAT Add a cost limit to your job. Default=30.0
(For no cost limit please use -1).
--accelerate-file-staging Enables AWS S3 mountpoint for quicker file
staging.
--verbose Whether to print information messages or
not.
--request-interval INTEGER Time interval to request (in seconds) the
Expand Down
43 changes: 42 additions & 1 deletion cloudos/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def queue():
help='Add a cost limit to your job. Default=30.0 (For no cost limit please use -1).',
type=float,
default=30.0)
@click.option('--accelerate-file-staging',
help='Enables AWS S3 mountpoint for quicker file staging.',
is_flag=True)
@click.option('--verbose',
help='Whether to print information messages or not.',
is_flag=True)
Expand Down Expand Up @@ -257,6 +260,7 @@ def run(apikey,
execution_platform,
hpc_id,
cost_limit,
accelerate_file_staging,
verbose,
request_interval,
disable_ssl_verification,
Expand Down Expand Up @@ -302,6 +306,21 @@ def run(apikey,
wdl_importsfile = None
storage_mode = 'regular'
save_logs = False
if accelerate_file_staging:
if execution_platform != 'aws':
print('[Message] You have selected accelerate file staging, but this function is ' +
'only available when execution platform is AWS. The accelerate file staging ' +
'will not be applied')
use_mountpoints = False
else:
use_mountpoints = True
print('[Message] Enabling AWS S3 mountpoint for accelerated file staging. ' +
'Please, take into consideration the following:\n' +
'\t- It significantly reduces runtime and compute costs but may increase network costs.\n' +
'\t- Requires extra memory. Adjust process memory or optimise resource usage if necessary.\n' +
'\t- This is still a CloudOS BETA feature.\n')
else:
use_mountpoints = False
if verbose:
print('\t...Detecting workflow type')
cl = Cloudos(cloudos_url, apikey, cromwell_token)
Expand Down Expand Up @@ -375,7 +394,8 @@ def run(apikey,
print(f'[Warning] You have specified Nextflow version {nextflow_version}. This version requires the pipeline ' +
'to be written in DSL2 and does not support DSL1.')
print('\nExecuting run...')
print(f'\tNextflow version: {nextflow_version}')
if workflow_type == 'nextflow':
print(f'\tNextflow version: {nextflow_version}')
j_id = j.send_job(job_config=job_config,
parameter=parameter,
git_commit=git_commit,
Expand All @@ -396,6 +416,7 @@ def run(apikey,
workflow_type=workflow_type,
cromwell_id=cromwell_id,
cost_limit=cost_limit,
use_mountpoints=use_mountpoints,
verify=verify_ssl)
print(f'\tYour assigned job id is: {j_id}\n')
j_url = f'{cloudos_url}/app/jobs/{j_id}'
Expand Down Expand Up @@ -489,6 +510,9 @@ def run(apikey,
help='Add a cost limit to your job. Default=30.0 (For no cost limit please use -1).',
type=float,
default=30.0)
@click.option('--accelerate-file-staging',
help='Enables AWS S3 mountpoint for quicker file staging.',
is_flag=True)
@click.option('--wait-completion',
help=('Whether to wait to job completion and report final ' +
'job status.'),
Expand Down Expand Up @@ -527,6 +551,7 @@ def run_curated_examples(apikey,
lustre_size,
execution_platform,
cost_limit,
accelerate_file_staging,
wait_completion,
wait_time,
request_interval,
Expand Down Expand Up @@ -566,6 +591,21 @@ def run_curated_examples(apikey,
'CloudOS\n')
else:
batch = True
if accelerate_file_staging:
if execution_platform != 'aws':
print('[Message] You have selected accelerate file staging, but this function is ' +
'only available when execution platform is AWS. The accelerate file staging ' +
'will not be applied')
use_mountpoints = False
else:
use_mountpoints = True
print('[Message] Enabling AWS S3 mountpoint for accelerated file staging. ' +
'Please, take into consideration the following:\n' +
'\t- It significantly reduces runtime and compute costs but may increase network costs.\n' +
'\t- Requires extra memory. Adjust process memory or optimise resource usage if necessary.\n' +
'\t- This is still a CloudOS BETA feature.\n')
else:
use_mountpoints = False
for workflow in runnable_curated_workflows:
workflow_name = workflow['name']
j = jb.Job(cloudos_url, apikey, None, workspace_id, project_name, workflow_name,
Expand All @@ -582,6 +622,7 @@ def run_curated_examples(apikey,
execution_platform=execution_platform,
workflow_type='nextflow',
cost_limit=cost_limit,
use_mountpoints=use_mountpoints,
verify=verify_ssl)
print(f'\tYour assigned job id is: {j_id}\n')
job_id_list.append(j_id)
Expand Down
2 changes: 1 addition & 1 deletion cloudos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.13.0'
__version__ = '2.14.0'
14 changes: 11 additions & 3 deletions cloudos/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def convert_nextflow_to_json(self,
hpc_id,
workflow_type,
cromwell_id,
cost_limit):
cost_limit,
use_mountpoints):
"""Converts a nextflow.config file into a json formatted dict.
Parameters
Expand Down Expand Up @@ -265,6 +266,8 @@ def convert_nextflow_to_json(self,
Cromwell server ID.
cost_limit : float
Job cost limit. -1 means no cost limit.
use_mountpoints : bool
Whether to use or not AWS S3 mountpoint for quicker file staging.
Returns
-------
Expand Down Expand Up @@ -406,7 +409,8 @@ def convert_nextflow_to_json(self,
"storageMode": storage_mode,
"revision": revision_block,
"profile": nextflow_profile,
"instanceType": instance_type
"instanceType": instance_type,
"usesFusionFileSystem": use_mountpoints
}
if execution_platform != 'hpc':
params['masterInstance'] = {
Expand Down Expand Up @@ -439,6 +443,7 @@ def send_job(self,
workflow_type='nextflow',
cromwell_id=None,
cost_limit=30.0,
use_mountpoints=False,
verify=True):
"""Send a job to CloudOS.
Expand Down Expand Up @@ -494,6 +499,8 @@ def send_job(self,
Cromwell server ID.
cost_limit : float
Job cost limit. -1 means no cost limit.
use_mountpoints : bool
Whether to use or not AWS S3 mountpoint for quicker file staging.
verify: [bool|string]
Whether to use SSL verification or not. Alternatively, if
a string is passed, it will be interpreted as the path to
Expand Down Expand Up @@ -536,7 +543,8 @@ def send_job(self,
hpc_id,
workflow_type,
cromwell_id,
cost_limit)
cost_limit,
use_mountpoints)
r = retry_requests_post("{}/api/v1/jobs?teamId={}".format(cloudos_url,
workspace_id),
data=json.dumps(params), headers=headers, verify=verify)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_data/convert_nextflow_to_json_params.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job","nextflowVersion": "22.10.8", "resumable": true, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job","nextflowVersion": "22.10.8", "resumable": true, "usesFusionFileSystem": false, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
9 changes: 6 additions & 3 deletions tests/test_jobs/test_convert_nextflow_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"hpc_id": None,
"workflow_type": 'nextflow',
"cromwell_id": None,
"cost_limit": -1
"cost_limit": -1,
"use_mountpoints": False
}


Expand Down Expand Up @@ -56,7 +57,8 @@ def test_convert_nextflow_to_json_output_correct():
hpc_id=param_dict["hpc_id"],
workflow_type=param_dict["workflow_type"],
cromwell_id=param_dict["cromwell_id"],
cost_limit=param_dict["cost_limit"]
cost_limit=param_dict["cost_limit"],
use_mountpoints=param_dict["use_mountpoints"]
)
with open(actual_json_file) as json_data:
correct_json = json.load(json_data)
Expand Down Expand Up @@ -89,7 +91,8 @@ def test_convert_nextflow_to_json_badly_formed_config():
hpc_id=param_dict["hpc_id"],
workflow_type=param_dict["workflow_type"],
cromwell_id=param_dict["cromwell_id"],
cost_limit=param_dict["cost_limit"]
cost_limit=param_dict["cost_limit"],
use_mountpoints=param_dict["use_mountpoints"]
)
print(str(excinfo.value))
assert "Please, specify your parameters in\
Expand Down

0 comments on commit 01ba2e8

Please sign in to comment.