From db7d871695ac3ca4c3c2c0772bf9cf9b689b401e Mon Sep 17 00:00:00 2001 From: Yair Siman Tov <63305203+yairsimantov20@users.noreply.github.com> Date: Wed, 11 Oct 2023 16:58:55 +0300 Subject: [PATCH] [Integrations][Gitlab][Improvement] Add more pagination and more logs (#153) --- integrations/gitlab/CHANGELOG.md | 11 +- .../gitlab/gitlab_integration/core/paging.py | 25 ++-- .../gitlab_integration/gitlab_service.py | 136 +++++++++++++++--- .../gitlab/gitlab_integration/ocean.py | 43 ++---- integrations/gitlab/pyproject.toml | 2 +- 5 files changed, 161 insertions(+), 56 deletions(-) diff --git a/integrations/gitlab/CHANGELOG.md b/integrations/gitlab/CHANGELOG.md index 95e730d4aa..ff8fb5d525 100644 --- a/integrations/gitlab/CHANGELOG.md +++ b/integrations/gitlab/CHANGELOG.md @@ -7,7 +7,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - # 0.1.18 (2023-10-02) +0.1.19 (2023-10-11) +=================== + +### Improvements + +- Added more logs to the integration to indecate the current state better (PORT-4930) +- Added pagination to all integration exported types for better performance (PORt-4930) + + +# 0.1.18 (2023-10-02) - Changed gitlab resync to async batch iteration (#1) diff --git a/integrations/gitlab/gitlab_integration/core/paging.py b/integrations/gitlab/gitlab_integration/core/paging.py index e5c509cb0c..f2d264c4a0 100644 --- a/integrations/gitlab/gitlab_integration/core/paging.py +++ b/integrations/gitlab/gitlab_integration/core/paging.py @@ -1,6 +1,11 @@ -import os -import typing import asyncio +import os +from typing import List, Union, Callable, AsyncIterator, TypeVar + +from gitlab.base import RESTObject, RESTObjectList +from loguru import logger + +T = TypeVar("T", bound=RESTObject) class AsyncFetcher: @@ -9,14 +14,16 @@ def __init__(self, gitlab_client): @staticmethod async def fetch( + fetch_func: Callable[..., Union[RESTObjectList, List[RESTObject]]], batch_size: int = int(os.environ.get("GITLAB_BATCH_SIZE", 100)), - fetch_func: typing.Callable = None, # type: ignore - validation_func: typing.Callable = None, # type: ignore + validation_func: Callable[[T], bool] | None = None, **kwargs, - ) -> typing.AsyncIterator[typing.List[typing.Any]]: - def fetch_batch(page: int): - batch = fetch_func(page=page, per_page=batch_size, get_all=False, **kwargs) - return batch + ) -> AsyncIterator[List[T]]: + def fetch_batch(page_idx: int): + logger.info(f"Fetching page {page_idx}. Batch size: {batch_size}") + return fetch_func( + page=page_idx, per_page=batch_size, get_all=False, **kwargs + ) page = 1 while True: @@ -24,7 +31,9 @@ def fetch_batch(page: int): None, fetch_batch, page ) if not batch: + logger.info(f"No more items to fetch after page {page}") break + logger.info(f"Queried {len(batch)} items before filtering") filtered_batch = [] for item in batch: if validation_func is None or validation_func(item): diff --git a/integrations/gitlab/gitlab_integration/gitlab_service.py b/integrations/gitlab/gitlab_integration/gitlab_service.py index 1d873eeb87..f1eeb33c9e 100644 --- a/integrations/gitlab/gitlab_integration/gitlab_service.py +++ b/integrations/gitlab/gitlab_integration/gitlab_service.py @@ -1,17 +1,24 @@ import typing -from typing import List, Tuple, Any, Union, TYPE_CHECKING from datetime import datetime, timedelta +from typing import List, Tuple, Any, Union, TYPE_CHECKING + import yaml from gitlab import Gitlab, GitlabList from gitlab.base import RESTObject -from gitlab.v4.objects import Project +from gitlab.v4.objects import ( + Project, + MergeRequest, + Issue, + Group, + ProjectPipeline, + ProjectPipelineJob, +) from loguru import logger from yaml.parser import ParserError from gitlab_integration.core.entities import generate_entity_from_port_yaml from gitlab_integration.core.paging import AsyncFetcher from gitlab_integration.core.utils import does_pattern_apply - from port_ocean.context.event import event from port_ocean.core.models import Entity @@ -114,9 +121,23 @@ def should_run_for_path(self, path: str) -> bool: def should_run_for_project(self, project: Project) -> bool: return self.should_run_for_path(project.path_with_namespace) - def get_root_groups(self) -> List[RESTObject]: + def should_run_for_merge_request(self, merge_request: MergeRequest) -> bool: + project_path = merge_request.references.get("full").rstrip( + merge_request.references.get("short") + ) + return self.should_run_for_path(project_path) + + def should_run_for_issue(self, issue: Issue) -> bool: + project_path = issue.references.get("full").rstrip( + issue.references.get("short") + ) + return self.should_run_for_path(project_path) + + def get_root_groups(self) -> List[Group]: groups = self.gitlab_client.groups.list(iterator=True) - return [group for group in groups if group.parent_id is None] + return typing.cast( + List[Group], [group for group in groups if group.parent_id is None] + ) def create_webhooks(self) -> list[int | str]: root_partial_groups = self.get_root_groups() @@ -178,9 +199,20 @@ def get_project(self, project_id: int) -> Project | None: async def get_all_projects(self) -> typing.AsyncIterator[List[Project]]: logger.info("fetching all projects for the token") - port_app_config = typing.cast("GitlabPortAppConfig", event.port_app_config) + port_app_config: GitlabPortAppConfig = typing.cast( + "GitlabPortAppConfig", event.port_app_config + ) async_fetcher = AsyncFetcher(self.gitlab_client) + cached_projects = event.attributes.setdefault( + PROJECTS_CACHE_KEY, {} + ).setdefault(self.gitlab_client.private_token, {}) + + if cached_projects: + yield cached_projects.values() + return + + all_projects = [] async for projects_batch in async_fetcher.fetch( fetch_func=self.gitlab_client.projects.list, validation_func=self.should_run_for_project, @@ -191,36 +223,97 @@ async def get_all_projects(self) -> typing.AsyncIterator[List[Project]]: order_by="id", sort="asc", ): - logger.debug( + logger.info( f"Queried {len(projects_batch)} projects {[project.path_with_namespace for project in projects_batch]}" ) + all_projects.extend(projects_batch) yield projects_batch - async def get_all_jobs(self, project: Project): - async_fetcher = AsyncFetcher(self.gitlab_client) - async for issues_batch in async_fetcher.fetch( + event.attributes[PROJECTS_CACHE_KEY][self.gitlab_client.private_token] = { + project.id: project for project in all_projects + } + + async def get_all_jobs( + self, project: Project + ) -> typing.AsyncIterator[List[ProjectPipelineJob]]: + logger.info(f"fetching jobs for project {project.path_with_namespace}") + async_fetcher: typing.AsyncIterator[List[ProjectPipelineJob]] = AsyncFetcher( + self.gitlab_client + ).fetch( fetch_func=project.jobs.list, - # validation_func=self.should_run_for_issue, pagination="offset", order_by="id", sort="asc", - ): + ) + async for issues_batch in async_fetcher: + logger.info( + f"Queried {len(issues_batch)} jobs {[job.name for job in issues_batch]}" + ) yield issues_batch - async def get_all_pipelines(self, project: Project): + async def get_all_pipelines( + self, project: Project + ) -> typing.AsyncIterator[List[ProjectPipeline]]: from_time = datetime.now() - timedelta(days=14) created_after = from_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") - async_fetcher = AsyncFetcher(self.gitlab_client) - async for pipelines_batch in async_fetcher.fetch( + logger.info( + f"Fetching pipelines for project {project.path_with_namespace} created after {created_after}" + ) + async_fetcher: typing.AsyncIterator[List[ProjectPipeline]] = AsyncFetcher( + self.gitlab_client + ).fetch( fetch_func=project.pipelines.list, - # validation_func=self.should_run_for_issue, pagination="offset", order_by="id", sort="asc", created_after=created_after, - ): + ) + async for pipelines_batch in async_fetcher: + logger.info( + f"Queried {len(pipelines_batch)} pipelines {[pipeline.id for pipeline in pipelines_batch]}" + ) yield pipelines_batch + async def get_opened_merge_requests( + self, group: Group + ) -> typing.AsyncIterator[List[MergeRequest]]: + async_fetcher = AsyncFetcher(self.gitlab_client) + async for merge_request_batch in async_fetcher.fetch( + fetch_func=group.mergerequests.list, + validation_func=self.should_run_for_merge_request, + pagination="offset", + order_by="created_at", + sort="desc", + state="opened", + ): + yield merge_request_batch + + async def get_closed_merge_requests( + self, group: Group, updated_after: datetime + ) -> typing.AsyncIterator[List[MergeRequest]]: + async_fetcher = AsyncFetcher(self.gitlab_client) + async for merge_request_batch in async_fetcher.fetch( + fetch_func=group.mergerequests.list, + validation_func=self.should_run_for_merge_request, + pagination="offset", + order_by="created_at", + sort="desc", + state=["closed", "locked", "merged"], + updated_after=updated_after.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + ): + yield merge_request_batch + + async def get_all_issues(self, group: Group) -> typing.AsyncIterator[List[Issue]]: + async_fetcher = AsyncFetcher(self.gitlab_client) + async for issues_batch in async_fetcher.fetch( + fetch_func=group.issues.list, + validation_func=self.should_run_for_issue, + pagination="offset", + order_by="created_at", + sort="desc", + ): + yield issues_batch + def get_entities_diff( self, project: Project, @@ -229,7 +322,16 @@ def get_entities_diff( after: str, ref: str, ) -> Tuple[List[Entity], List[Entity]]: + logger.info( + f'Getting entities diff for project {project.path_with_namespace}, in path "{spec_path}", before {before},' + f" after {after}, ref {ref}" + ) entities_before = self._get_entities_by_commit(project, spec_path, before, ref) + + logger.info(f"Found {len(entities_before)} entities in the previous state") + entities_after = self._get_entities_by_commit(project, spec_path, after, ref) + logger.info(f"Found {len(entities_after)} entities in the current state") + return entities_before, entities_after diff --git a/integrations/gitlab/gitlab_integration/ocean.py b/integrations/gitlab/gitlab_integration/ocean.py index ec6918afd5..131db245c0 100644 --- a/integrations/gitlab/gitlab_integration/ocean.py +++ b/integrations/gitlab/gitlab_integration/ocean.py @@ -8,7 +8,7 @@ from gitlab_integration.bootstrap import setup_application from gitlab_integration.utils import ObjectKind, get_cached_all_services from port_ocean.context.ocean import ocean -from port_ocean.core.ocean_types import RAW_RESULT, ASYNC_GENERATOR_RESYNC_TYPE +from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE NO_WEBHOOK_WARNING = "Without setting up the webhook, the integration will not export live changes from the gitlab" @@ -51,40 +51,25 @@ async def on_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.MERGE_REQUEST) -async def resync_merge_requests(kind: str) -> RAW_RESULT: +async def resync_merge_requests(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: updated_after = datetime.now() - timedelta(days=14) - result = [] for service in get_cached_all_services(): for group in service.get_root_groups(): - merge_requests = group.mergerequests.list( - all=True, state="opened" - ) + group.mergerequests.list( - all=True, - state=["closed", "locked", "merged"], - updated_after=updated_after.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - ) - for merge_request in merge_requests: - project_path = merge_request.references.get("full").rstrip( - merge_request.references.get("short") - ) - if service.should_run_for_path(project_path): - result.append(merge_request.asdict()) - return result + async for merge_request_batch in service.get_opened_merge_requests(group): + yield [merge_request.asdict() for merge_request in merge_request_batch] + async for merge_request_batch in service.get_closed_merge_requests( + group, updated_after + ): + yield [merge_request.asdict() for merge_request in merge_request_batch] @ocean.on_resync(ObjectKind.ISSUE) -async def resync_issues(kind: str) -> RAW_RESULT: - issues = [] +async def resync_issues(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: for service in get_cached_all_services(): for group in service.get_root_groups(): - for issue in group.issues.list(all=True): - project_path = issue.references.get("full").rstrip( - issue.references.get("short") - ) - if service.should_run_for_path(project_path): - issues.append(issue.asdict()) - return issues + async for issues_batch in service.get_all_issues(group): + yield [issue.asdict() for issue in issues_batch] @ocean.on_resync(ObjectKind.JOB) @@ -93,9 +78,6 @@ async def resync_jobs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async for projects_batch in service.get_all_projects(): for project in projects_batch: async for jobs_batch in service.get_all_jobs(project): - logger.info( - f"Found {len(jobs_batch)} jobs for project {project.path_with_namespace}" - ) yield [job.asdict() for job in jobs_batch] @@ -104,6 +86,9 @@ async def resync_pipelines(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: for service in get_cached_all_services(): async for projects_batch in service.get_all_projects(): for project in projects_batch: + logger.info( + f"Fetching pipelines for project {project.path_with_namespace}" + ) async for pipelines_batch in service.get_all_pipelines(project): logger.info( f"Found {len(pipelines_batch)} pipelines for project {project.path_with_namespace}" diff --git a/integrations/gitlab/pyproject.toml b/integrations/gitlab/pyproject.toml index 86eba06d2b..a9a5943628 100644 --- a/integrations/gitlab/pyproject.toml +++ b/integrations/gitlab/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "gitlab" -version = "0.1.18" +version = "0.1.19" description = "Gitlab integration for Port using Port-Ocean Framework" authors = ["Yair Siman-Tov "]