Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Workflow ETL to use BookmarkService #513

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
get_merge_to_deploy_broker_utils_service,
MergeToDeployBrokerUtils,
)
from mhq.store.models.code import OrgRepo, CodeBookmarkType, Bookmark, PullRequest
from mhq.store.models.code import OrgRepo, PullRequest
from mhq.store.repos.code import CodeRepoService
from mhq.utils.log import LOG
from mhq.service.settings.models import DefaultSyncDaysSetting
from mhq.service.bookmark import BookmarkService, BookmarkType, get_bookmark_service


class CodeETLHandler:

def __init__(
self,
code_repo_service: CodeRepoService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@


class IncidentsETLHandler:

def __init__(
self,
provider: IncidentProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,22 @@ def get_workflow_runs(
self,
org_repo: OrgRepo,
repo_workflow: RepoWorkflow,
bookmark: RepoWorkflowRunsBookmark,
) -> Tuple[List[RepoWorkflowRuns], RepoWorkflowRunsBookmark]:
bookmark: datetime,
) -> Tuple[List[RepoWorkflowRuns], datetime]:
"""
This method returns all workflow runs of a repo's workflow. After the bookmark date.
:param org_repo: OrgRepo object to get workflow runs for
:param repo_workflow: RepoWorkflow object to get workflow runs for
:param bookmark: Bookmark object to get all workflow runs after this date
:return: Workflow runs, Bookmark object
:param bookmark: datetime object to get all workflow runs after this date
:return: Workflow runs, datetime object
"""
bookmark_time_stamp = datetime.fromisoformat(bookmark.bookmark)

try:
github_workflow_runs = self._api.get_workflow_runs(
org_repo.org_name,
org_repo.name,
repo_workflow.provider_workflow_id,
bookmark_time_stamp,
bookmark,
)
except Exception as e:
raise Exception(
Expand All @@ -81,9 +81,7 @@ def get_workflow_runs(
)
return [], bookmark

bookmark.bookmark = self._get_new_bookmark_time_stamp(
github_workflow_runs
).isoformat()
bookmark = self._get_new_bookmark_time_stamp(github_workflow_runs)

repo_workflow_runs = [
self._adapt_github_workflows_to_workflow_runs(
Expand Down
58 changes: 25 additions & 33 deletions backend/analytics_server/mhq/service/workflows/sync/etl_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from os import getenv
from datetime import timedelta
from datetime import datetime
from typing import List, Tuple
from uuid import uuid4

from mhq.service.settings.configuration_settings import (
SettingsService,
Expand All @@ -25,7 +24,8 @@
from mhq.store.repos.code import CodeRepoService
from mhq.store.repos.workflows import WorkflowRepoService
from mhq.utils.log import LOG
from mhq.utils.time import time_now
from mhq.service.settings.models import DefaultSyncDaysSetting
from mhq.service.bookmark import BookmarkService, BookmarkType, get_bookmark_service


class WorkflowETLHandler:
Expand All @@ -40,11 +40,13 @@ def __init__(
workflow_repo_service: WorkflowRepoService,
etl_factory: WorkflowETLFactory,
settings_service: SettingsService,
bookmark_service: BookmarkService,
):
self.code_repo_service = code_repo_service
self.workflow_repo_service = workflow_repo_service
self.etl_factory = etl_factory
self.settings_service = settings_service
self.bookmark_service = bookmark_service

def sync_org_workflows(self, org_id: str):
active_repo_workflows: List[Tuple[OrgRepo, RepoWorkflow]] = (
Expand Down Expand Up @@ -98,52 +100,38 @@ def _sync_repo_workflow(self, org_repo: OrgRepo, repo_workflow: RepoWorkflow):
LOG.error("Invalid PAT for code provider")
return
try:
default_sync_days_setting = (
default_sync_days_setting: DefaultSyncDaysSetting = (
self.settings_service.get_or_set_default_settings(
setting_type=SettingType.DEFAULT_SYNC_DAYS_SETTING,
entity_type=EntityType.ORG,
entity_id=str(org_repo.org_id),
)
)
default_sync_days = (
default_sync_days_setting.specific_settings.default_sync_days
).specific_settings
)
bookmark: RepoWorkflowRunsBookmark = self.__get_repo_workflow_bookmark(
repo_workflow, default_sync_days
default_sync_days = default_sync_days_setting.default_sync_days

bookmark: datetime = self.bookmark_service.get_bookmark(
str(repo_workflow.id),
BookmarkType.REPO_WORKFLOW_BOOKMARK,
repo_workflow.provider,
default_sync_days,
)
repo_workflow_runs: List[RepoWorkflowRuns]
repo_workflow_runs, bookmark = etl_service.get_workflow_runs(
org_repo, repo_workflow, bookmark
)
self.workflow_repo_service.save_repo_workflow_runs(repo_workflow_runs)
bookmark.updated_at = time_now()
self.workflow_repo_service.update_repo_workflow_runs_bookmark(bookmark)
self.bookmark_service.update_bookmark(
str(repo_workflow.id),
BookmarkType.REPO_WORKFLOW_BOOKMARK,
repo_workflow.provider,
bookmark,
)
except Exception as e:
LOG.error(
f"Error syncing workflow for repo {repo_workflow.org_repo_id}: {str(e)}"
)
return

def __get_repo_workflow_bookmark(
self, repo_workflow: RepoWorkflow, default_sync_days: int = DEFAULT_SYNC_DAYS
) -> RepoWorkflowRunsBookmark:
repo_workflow_bookmark = (
self.workflow_repo_service.get_repo_workflow_runs_bookmark(repo_workflow.id)
)
if not repo_workflow_bookmark:
bookmark_string = (
time_now() - timedelta(days=default_sync_days)
).isoformat()

repo_workflow_bookmark = RepoWorkflowRunsBookmark(
id=uuid4(),
repo_workflow_id=repo_workflow.id,
bookmark=bookmark_string,
created_at=time_now(),
updated_at=time_now(),
)
return repo_workflow_bookmark


def sync_org_workflows(org_id: str):
workflow_providers: List[str] = (
Expand All @@ -156,6 +144,10 @@ def sync_org_workflows(org_id: str):
workflow_repo_service = WorkflowRepoService()
etl_factory = WorkflowETLFactory(org_id)
workflow_etl_handler = WorkflowETLHandler(
code_repo_service, workflow_repo_service, etl_factory, get_settings_service()
code_repo_service,
workflow_repo_service,
etl_factory,
get_settings_service(),
get_bookmark_service(),
)
workflow_etl_handler.sync_org_workflows(org_id)
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List, Tuple

from mhq.store.models.code import (
Expand All @@ -24,13 +25,13 @@ def get_workflow_runs(
self,
org_repo: OrgRepo,
repo_workflow: RepoWorkflow,
bookmark: RepoWorkflowRunsBookmark,
) -> Tuple[List[RepoWorkflowRuns], RepoWorkflowRunsBookmark]:
bookmark: datetime,
) -> Tuple[List[RepoWorkflowRuns], datetime]:
"""
This method returns all workflow runs of a repo's workflow. After the bookmark date.
:param org_repo: OrgRepo object to get workflow runs for
:param repo_workflow: RepoWorkflow object to get workflow runs for
:param bookmark: Bookmark object to get all workflow runs after this date
:return: List of RepoWorkflowRuns objects, RepoWorkflowRunsBookmark object
:param bookmark: datetime object to get all workflow runs after this date
:return: List of RepoWorkflowRuns objects, datetime object
"""
pass
Loading