Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: adds AWS s3 mountpoints #177

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.7", "3.9", "3.11"]
python-version: ["3.8", "3.9", "3.11"]
steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## lifebit-ai/cloudos-cli: changelog

## v2.14.0 (2024-12-18)

- Adds the new `--accelerate-file-staging` parameter to job submission to add support for AWS S3 mountpoint for quicker file staging.

## v2.13.0 (2024-12-03)

### Feature
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ Options:
Default=660fae20f93358ad61e0104b
--cost-limit FLOAT Add a cost limit to your job. Default=30.0
(For no cost limit please use -1).
--accelerate-file-staging Enables AWS S3 mountpoint for quicker file
staging.
--verbose Whether to print information messages or
not.
--request-interval INTEGER Time interval to request (in seconds) the
Expand Down
43 changes: 42 additions & 1 deletion cloudos/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def queue():
help='Add a cost limit to your job. Default=30.0 (For no cost limit please use -1).',
type=float,
default=30.0)
@click.option('--accelerate-file-staging',
help='Enables AWS S3 mountpoint for quicker file staging.',
is_flag=True)
@click.option('--verbose',
help='Whether to print information messages or not.',
is_flag=True)
Expand Down Expand Up @@ -257,6 +260,7 @@ def run(apikey,
execution_platform,
hpc_id,
cost_limit,
accelerate_file_staging,
verbose,
request_interval,
disable_ssl_verification,
Expand Down Expand Up @@ -302,6 +306,21 @@ def run(apikey,
wdl_importsfile = None
storage_mode = 'regular'
save_logs = False
if accelerate_file_staging:
if execution_platform != 'aws':
print('[Message] You have selected accelerate file staging, but this function is ' +
'only available when execution platform is AWS. The accelerate file staging ' +
'will not be applied')
use_mountpoints = False
else:
use_mountpoints = True
print('[Message] Enabling AWS S3 mountpoint for accelerated file staging. ' +
'Please, take into consideration the following:\n' +
'\t- It significantly reduces runtime and compute costs but may increase network costs.\n' +
'\t- Requires extra memory. Adjust process memory or optimise resource usage if necessary.\n' +
'\t- This is still a CloudOS BETA feature.\n')
else:
use_mountpoints = False
if verbose:
print('\t...Detecting workflow type')
cl = Cloudos(cloudos_url, apikey, cromwell_token)
Expand Down Expand Up @@ -375,7 +394,8 @@ def run(apikey,
print(f'[Warning] You have specified Nextflow version {nextflow_version}. This version requires the pipeline ' +
'to be written in DSL2 and does not support DSL1.')
print('\nExecuting run...')
print(f'\tNextflow version: {nextflow_version}')
if workflow_type == 'nextflow':
print(f'\tNextflow version: {nextflow_version}')
j_id = j.send_job(job_config=job_config,
parameter=parameter,
git_commit=git_commit,
Expand All @@ -396,6 +416,7 @@ def run(apikey,
workflow_type=workflow_type,
cromwell_id=cromwell_id,
cost_limit=cost_limit,
use_mountpoints=use_mountpoints,
verify=verify_ssl)
print(f'\tYour assigned job id is: {j_id}\n')
j_url = f'{cloudos_url}/app/jobs/{j_id}'
Expand Down Expand Up @@ -489,6 +510,9 @@ def run(apikey,
help='Add a cost limit to your job. Default=30.0 (For no cost limit please use -1).',
type=float,
default=30.0)
@click.option('--accelerate-file-staging',
help='Enables AWS S3 mountpoint for quicker file staging.',
is_flag=True)
@click.option('--wait-completion',
help=('Whether to wait to job completion and report final ' +
'job status.'),
Expand Down Expand Up @@ -527,6 +551,7 @@ def run_curated_examples(apikey,
lustre_size,
execution_platform,
cost_limit,
accelerate_file_staging,
wait_completion,
wait_time,
request_interval,
Expand Down Expand Up @@ -566,6 +591,21 @@ def run_curated_examples(apikey,
'CloudOS\n')
else:
batch = True
if accelerate_file_staging:
if execution_platform != 'aws':
print('[Message] You have selected accelerate file staging, but this function is ' +
'only available when execution platform is AWS. The accelerate file staging ' +
'will not be applied')
use_mountpoints = False
else:
use_mountpoints = True
print('[Message] Enabling AWS S3 mountpoint for accelerated file staging. ' +
'Please, take into consideration the following:\n' +
'\t- It significantly reduces runtime and compute costs but may increase network costs.\n' +
'\t- Requires extra memory. Adjust process memory or optimise resource usage if necessary.\n' +
'\t- This is still a CloudOS BETA feature.\n')
else:
use_mountpoints = False
for workflow in runnable_curated_workflows:
workflow_name = workflow['name']
j = jb.Job(cloudos_url, apikey, None, workspace_id, project_name, workflow_name,
Expand All @@ -582,6 +622,7 @@ def run_curated_examples(apikey,
execution_platform=execution_platform,
workflow_type='nextflow',
cost_limit=cost_limit,
use_mountpoints=use_mountpoints,
verify=verify_ssl)
print(f'\tYour assigned job id is: {j_id}\n')
job_id_list.append(j_id)
Expand Down
2 changes: 1 addition & 1 deletion cloudos/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '2.13.0'
__version__ = '2.14.0'
14 changes: 11 additions & 3 deletions cloudos/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ def convert_nextflow_to_json(self,
hpc_id,
workflow_type,
cromwell_id,
cost_limit):
cost_limit,
use_mountpoints):
"""Converts a nextflow.config file into a json formatted dict.

Parameters
Expand Down Expand Up @@ -265,6 +266,8 @@ def convert_nextflow_to_json(self,
Cromwell server ID.
cost_limit : float
Job cost limit. -1 means no cost limit.
use_mountpoints : bool
Whether to use or not AWS S3 mountpoint for quicker file staging.

Returns
-------
Expand Down Expand Up @@ -406,7 +409,8 @@ def convert_nextflow_to_json(self,
"storageMode": storage_mode,
"revision": revision_block,
"profile": nextflow_profile,
"instanceType": instance_type
"instanceType": instance_type,
"usesFusionFileSystem": use_mountpoints
}
if execution_platform != 'hpc':
params['masterInstance'] = {
Expand Down Expand Up @@ -439,6 +443,7 @@ def send_job(self,
workflow_type='nextflow',
cromwell_id=None,
cost_limit=30.0,
use_mountpoints=False,
verify=True):
"""Send a job to CloudOS.

Expand Down Expand Up @@ -494,6 +499,8 @@ def send_job(self,
Cromwell server ID.
cost_limit : float
Job cost limit. -1 means no cost limit.
use_mountpoints : bool
Whether to use or not AWS S3 mountpoint for quicker file staging.
verify: [bool|string]
Whether to use SSL verification or not. Alternatively, if
a string is passed, it will be interpreted as the path to
Expand Down Expand Up @@ -536,7 +543,8 @@ def send_job(self,
hpc_id,
workflow_type,
cromwell_id,
cost_limit)
cost_limit,
use_mountpoints)
r = retry_requests_post("{}/api/v1/jobs?teamId={}".format(cloudos_url,
workspace_id),
data=json.dumps(params), headers=headers, verify=verify)
Expand Down
2 changes: 1 addition & 1 deletion tests/test_data/convert_nextflow_to_json_params.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job","nextflowVersion": "22.10.8", "resumable": true, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
{"parameters": [{"prefix": "--", "name": "reads", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data"}, {"prefix": "--", "name": "genome", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.Ggal71.500bpflank.fa"}, {"prefix": "--", "name": "annot", "parameterKind": "textValue", "textValue": "s3://lifebit-featured-datasets/pipelines/rnatoy-data/ggal_1_48850000_49020000.bed.gff"}], "project": "6054754029b82f0112762b9c", "workflow": "60b0ca54303ee601a69b42d1", "name": "new_job","nextflowVersion": "22.10.8", "resumable": true, "usesFusionFileSystem": false, "saveProcessLogs": true, "batch": {"dockerLogin": false, "enabled": false, "jobQueue": null}, "cromwellCloudResources": null, "executionPlatform": "aws", "hpc": null ,"storageSizeInGb": 500, "execution": {"computeCostLimit": -1, "optim": "test"}, "lusterFsxStorageSizeInGb": 1200, "storageMode": "regular", "revision": "", "profile": null, "instanceType": "c5.xlarge", "masterInstance": {"requestedInstance": {"type": "c5.xlarge", "asSpot": false}}}
9 changes: 6 additions & 3 deletions tests/test_jobs/test_convert_nextflow_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"hpc_id": None,
"workflow_type": 'nextflow',
"cromwell_id": None,
"cost_limit": -1
"cost_limit": -1,
"use_mountpoints": False
}


Expand Down Expand Up @@ -56,7 +57,8 @@ def test_convert_nextflow_to_json_output_correct():
hpc_id=param_dict["hpc_id"],
workflow_type=param_dict["workflow_type"],
cromwell_id=param_dict["cromwell_id"],
cost_limit=param_dict["cost_limit"]
cost_limit=param_dict["cost_limit"],
use_mountpoints=param_dict["use_mountpoints"]
)
with open(actual_json_file) as json_data:
correct_json = json.load(json_data)
Expand Down Expand Up @@ -89,7 +91,8 @@ def test_convert_nextflow_to_json_badly_formed_config():
hpc_id=param_dict["hpc_id"],
workflow_type=param_dict["workflow_type"],
cromwell_id=param_dict["cromwell_id"],
cost_limit=param_dict["cost_limit"]
cost_limit=param_dict["cost_limit"],
use_mountpoints=param_dict["use_mountpoints"]
)
print(str(excinfo.value))
assert "Please, specify your parameters in\
Expand Down
Loading