Skip to content

Commit

Permalink
feat: sequential execution of middleware (#160)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Kanitz <[email protected]>
  • Loading branch information
Ayush5120 and uniqueg authored Oct 20, 2023
1 parent d13bc92 commit 21a555e
Show file tree
Hide file tree
Showing 31 changed files with 797 additions and 812 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ jobs:
- name: Run integration tests
shell: bash
run: pytest tests/test_integration
- name: Run unit tests
shell: bash
run: pytest tests/unitTest/pro_tes/middleware
- name: Tear down app
run: docker-compose down
publish:
Expand Down
4 changes: 2 additions & 2 deletions pro_tes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

from pathlib import Path

from connexion import FlaskApp
from foca import Foca
from connexion import FlaskApp # type: ignore
from foca import Foca # type: ignore

from pro_tes.ga4gh.tes.service_info import ServiceInfo

Expand Down
2 changes: 1 addition & 1 deletion pro_tes/celery_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pathlib import Path

from celery import Celery
from foca import Foca
from foca import Foca # type: ignore

foca = Foca(
config_file=Path(__file__).resolve().parent / "config.yaml",
Expand Down
6 changes: 5 additions & 1 deletion pro_tes/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ controllers:
wait: 3
attempts: 100
list_tasks:
default_page_size: 256
default_page_size: 5
celery:
monitor:
timeout: 0.1
Expand All @@ -140,3 +140,7 @@ tes:

storeLogs:
execution_trace: True

middlewares:
- - "pro_tes.plugins.middlewares.task_distribution.distance.TaskDistributionDistance"
- "pro_tes.plugins.middlewares.task_distribution.random.TaskDistributionRandom"
30 changes: 11 additions & 19 deletions pro_tes/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""proTES exceptions."""

from connexion.exceptions import (
from connexion.exceptions import ( # type: ignore
BadRequestProblem,
ExtraParameterProblem,
Forbidden,
Unauthorized,
)
from pydantic import ValidationError
from pymongo.errors import PyMongoError
from pymongo.errors import PyMongoError # type: ignore
from werkzeug.exceptions import (
BadRequest,
InternalServerError,
Expand All @@ -29,16 +29,12 @@ class NoTesInstancesAvailable(ValueError):
"""Raised when no TES instances are available."""


class TesUriError(ValueError):
"""Raised when TES URI cannot be parsed."""
class MiddlewareException(ValueError):
"""Raised when a middleware could not be applied."""


class InputUriError(ValueError):
"""Raised when input URI cannot be parsed."""


class IPDistanceCalculationError(ValueError):
"""Raised when IP distance cannot be calculated."""
class InvalidMiddleware(MiddlewareException):
"""Raised when a middleware is invalid."""


exceptions = {
Expand Down Expand Up @@ -90,16 +86,12 @@ class IPDistanceCalculationError(ValueError):
"message": "No valid TES instances available.",
"code": "500",
},
TesUriError: {
"message": "TES URI cannot be parsed",
MiddlewareException: {
"message": "Middleware could not be applied.",
"code": "500",
},
InputUriError: {
"message": "Input URI cannot be parsed.",
"code": "400",
},
IPDistanceCalculationError: {
"message": "IP distance calculation failed.",
InvalidMiddleware: {
"message": "Middleware is invalid.",
"code": "500",
}
},
}
32 changes: 16 additions & 16 deletions pro_tes/ga4gh/tes/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
from typing import Optional

# pragma pylint: disable=no-name-in-module
from pydantic import AnyUrl, BaseModel, Field
Expand Down Expand Up @@ -48,7 +48,7 @@ class TesExecutor(CustomBaseModel):
),
example="ubuntu:20.04",
)
command: List[str] = Field(
command: list[str] = Field(
[""],
description=(
"A sequence of program arguments to execute, where the "
Expand Down Expand Up @@ -101,7 +101,7 @@ class TesExecutor(CustomBaseModel):
),
example="/tmp/stderr.log",
)
env: Optional[Dict[str, str]] = Field(
env: Optional[dict[str, str]] = Field(
None,
description=(
"Enviromental variables to set within the container. "
Expand Down Expand Up @@ -268,7 +268,7 @@ class TesResources(CustomBaseModel):
disk_gb: Optional[float] = Field(
None, description="Requested disk size in gigabytes (GB)", example=40
)
zones: Optional[List[str]] = Field(
zones: Optional[list[str]] = Field(
None,
description=(
"Request that the task be run in these compute zones. How "
Expand Down Expand Up @@ -461,13 +461,13 @@ class Metadata(CustomBaseModel):


class TesTaskLog(CustomBaseModel):
logs: List[TesExecutorLog] = Field(
logs: list[TesExecutorLog] = Field(
..., description="Logs for each executor"
)
metadata: Optional[Metadata] = Field(
None,
description=(
"Arbitrary logging metadataincluded by the implementation."
"Arbitrary logging metadata included by the implementation."
),
example={"host": "worker-001", "slurmm_id": 123456},
)
Expand All @@ -481,14 +481,14 @@ class TesTaskLog(CustomBaseModel):
description="When the task ended, in RFC 3339 format.",
example="2020-10-02T11:00:00-05:00",
)
outputs: List[TesOutputFileLog] = Field(
outputs: list[TesOutputFileLog] = Field(
...,
description=(
"Information about all output files. Directory outputs are "
" \nflattened into separate items."
),
)
system_logs: Optional[List[str]] = Field(
system_logs: Optional[list[str]] = Field(
None,
description=(
"System logs are any logs the system decides are relevant, "
Expand All @@ -509,7 +509,7 @@ class TesServiceType(ServiceType):


class TesServiceInfo(Service):
storage: Optional[List[str]] = Field(
storage: Optional[list[str]] = Field(
None,
description=(
"Lists some, but not necessarily all, storage locations "
Expand Down Expand Up @@ -538,7 +538,7 @@ class TesTask(CustomBaseModel):
" documentation purposes."
),
)
inputs: Optional[List[TesInput]] = Field(
inputs: Optional[list[TesInput]] = Field(
None,
description=(
"Input files that will be used by the task. Inputs will be "
Expand All @@ -547,7 +547,7 @@ class TesTask(CustomBaseModel):
),
example=[{"url": "s3://my-object-store/file1", "path": "/data/file1"}],
)
outputs: Optional[List[TesOutput]] = Field(
outputs: Optional[list[TesOutput]] = Field(
None,
description=(
"Output files.\nOutputs will be uploaded from the executor "
Expand All @@ -562,7 +562,7 @@ class TesTask(CustomBaseModel):
],
)
resources: Optional[TesResources] = None
executors: List[TesExecutor] = Field(
executors: list[TesExecutor] = Field(
[TesExecutor],
description=(
"An array of executors to be run. Each of the executors "
Expand All @@ -574,7 +574,7 @@ class TesTask(CustomBaseModel):
" message.\n\nExecution stops on the first error."
),
)
volumes: Optional[List[str]] = Field(
volumes: Optional[list[str]] = Field(
None,
description=(
"Volumes are directories which may be used to share data "
Expand All @@ -590,7 +590,7 @@ class TesTask(CustomBaseModel):
),
example=["/vol/A/"],
)
tags: Optional[Dict[str, str]] = Field(
tags: Optional[dict[str, str]] = Field(
None,
description=(
"A key-value map of arbitrary tags. These can be used to "
Expand All @@ -601,7 +601,7 @@ class TesTask(CustomBaseModel):
),
example={"WORKFLOW_ID": "cwl-01234", "PROJECT_GROUP": "alice-lab"},
)
logs: Optional[List[TesTaskLog]] = Field(
logs: Optional[list[TesTaskLog]] = Field(
None,
description=(
"Task logging information.\nNormally, this will contain "
Expand All @@ -625,7 +625,7 @@ class Config:


class TesListTasksResponse(CustomBaseModel):
tasks: List[TesTask] = Field(
tasks: list[TesTask] = Field(
...,
description=(
"List of tasks. These tasks will be based on the original "
Expand Down
15 changes: 7 additions & 8 deletions pro_tes/ga4gh/tes/server.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
"""Controllers for GA4GH TES API endpoints."""

import logging
from typing import Dict

from connexion import request
from foca.utils.logging import log_traffic
from connexion import request # type: ignore
from foca.utils.logging import log_traffic # type: ignore

from pro_tes.ga4gh.tes.service_info import ServiceInfo
from pro_tes.ga4gh.tes.task_runs import TaskRuns
Expand All @@ -19,7 +18,7 @@
@log_traffic
def CancelTask(
id, *args, **kwargs # pylint: disable=redefined-builtin
) -> Dict:
) -> dict:
"""Cancel unfinished task.
Args:
Expand All @@ -34,7 +33,7 @@ def CancelTask(

# POST /tasks
@log_traffic
def CreateTask(*args, **kwargs) -> Dict:
def CreateTask(*args, **kwargs) -> dict:
"""Create task.
Args:
Expand All @@ -48,7 +47,7 @@ def CreateTask(*args, **kwargs) -> Dict:

# GET /tasks/service-info
@log_traffic
def GetServiceInfo(*args, **kwargs) -> Dict:
def GetServiceInfo(*args, **kwargs) -> dict:
"""Get service info.
Args:
Expand All @@ -62,7 +61,7 @@ def GetServiceInfo(*args, **kwargs) -> Dict:

# GET /tasks/{id}
@log_traffic
def GetTask(id, *args, **kwargs) -> Dict: # pylint: disable=redefined-builtin
def GetTask(id, *args, **kwargs) -> dict: # pylint: disable=redefined-builtin
"""Get info for individual task.
Args:
Expand All @@ -77,7 +76,7 @@ def GetTask(id, *args, **kwargs) -> Dict: # pylint: disable=redefined-builtin

# GET /tasks
@log_traffic
def ListTasks(*args, **kwargs) -> Dict:
def ListTasks(*args, **kwargs) -> dict:
"""List all available tasks.
Args:
Expand Down
9 changes: 4 additions & 5 deletions pro_tes/ga4gh/tes/service_info.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
"""Controller for the `/service-info route."""

import logging
from typing import Dict

from bson.objectid import ObjectId
from bson.objectid import ObjectId # type: ignore
from flask import current_app
from pymongo.collection import Collection
from pymongo.collection import Collection # type: ignore

from pro_tes.exceptions import NotFound

Expand All @@ -31,7 +30,7 @@ def __init__(self) -> None:
)
self.object_id: str = "000000000000000000000000"

def get_service_info(self) -> Dict:
def get_service_info(self) -> dict:
"""Get latest service info from database.
Returns:
Expand All @@ -48,7 +47,7 @@ def get_service_info(self) -> Dict:
raise NotFound
return service_info

def set_service_info(self, data: Dict) -> None:
def set_service_info(self, data: dict) -> None:
"""Create or update service info.
Arguments:
Expand Down
Loading

0 comments on commit 21a555e

Please sign in to comment.