Skip to content

Commit

Permalink
[Integrations][Gitlab][Improvement] Add more pagination and more logs (
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 authored Oct 11, 2023
1 parent c0e97d1 commit db7d871
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 56 deletions.
11 changes: 10 additions & 1 deletion integrations/gitlab/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

# 0.1.18 (2023-10-02)
0.1.19 (2023-10-11)
===================

### Improvements

- Added more logs to the integration to indecate the current state better (PORT-4930)
- Added pagination to all integration exported types for better performance (PORt-4930)


# 0.1.18 (2023-10-02)

- Changed gitlab resync to async batch iteration (#1)

Expand Down
25 changes: 17 additions & 8 deletions integrations/gitlab/gitlab_integration/core/paging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import os
import typing
import asyncio
import os
from typing import List, Union, Callable, AsyncIterator, TypeVar

from gitlab.base import RESTObject, RESTObjectList
from loguru import logger

T = TypeVar("T", bound=RESTObject)


class AsyncFetcher:
Expand All @@ -9,22 +14,26 @@ def __init__(self, gitlab_client):

@staticmethod
async def fetch(
fetch_func: Callable[..., Union[RESTObjectList, List[RESTObject]]],
batch_size: int = int(os.environ.get("GITLAB_BATCH_SIZE", 100)),
fetch_func: typing.Callable = None, # type: ignore
validation_func: typing.Callable = None, # type: ignore
validation_func: Callable[[T], bool] | None = None,
**kwargs,
) -> typing.AsyncIterator[typing.List[typing.Any]]:
def fetch_batch(page: int):
batch = fetch_func(page=page, per_page=batch_size, get_all=False, **kwargs)
return batch
) -> AsyncIterator[List[T]]:
def fetch_batch(page_idx: int):
logger.info(f"Fetching page {page_idx}. Batch size: {batch_size}")
return fetch_func(
page=page_idx, per_page=batch_size, get_all=False, **kwargs
)

page = 1
while True:
batch = await asyncio.get_running_loop().run_in_executor(
None, fetch_batch, page
)
if not batch:
logger.info(f"No more items to fetch after page {page}")
break
logger.info(f"Queried {len(batch)} items before filtering")
filtered_batch = []
for item in batch:
if validation_func is None or validation_func(item):
Expand Down
136 changes: 119 additions & 17 deletions integrations/gitlab/gitlab_integration/gitlab_service.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
import typing
from typing import List, Tuple, Any, Union, TYPE_CHECKING
from datetime import datetime, timedelta
from typing import List, Tuple, Any, Union, TYPE_CHECKING

import yaml
from gitlab import Gitlab, GitlabList
from gitlab.base import RESTObject
from gitlab.v4.objects import Project
from gitlab.v4.objects import (
Project,
MergeRequest,
Issue,
Group,
ProjectPipeline,
ProjectPipelineJob,
)
from loguru import logger
from yaml.parser import ParserError

from gitlab_integration.core.entities import generate_entity_from_port_yaml
from gitlab_integration.core.paging import AsyncFetcher
from gitlab_integration.core.utils import does_pattern_apply

from port_ocean.context.event import event
from port_ocean.core.models import Entity

Expand Down Expand Up @@ -114,9 +121,23 @@ def should_run_for_path(self, path: str) -> bool:
def should_run_for_project(self, project: Project) -> bool:
return self.should_run_for_path(project.path_with_namespace)

def get_root_groups(self) -> List[RESTObject]:
def should_run_for_merge_request(self, merge_request: MergeRequest) -> bool:
project_path = merge_request.references.get("full").rstrip(
merge_request.references.get("short")
)
return self.should_run_for_path(project_path)

def should_run_for_issue(self, issue: Issue) -> bool:
project_path = issue.references.get("full").rstrip(
issue.references.get("short")
)
return self.should_run_for_path(project_path)

def get_root_groups(self) -> List[Group]:
groups = self.gitlab_client.groups.list(iterator=True)
return [group for group in groups if group.parent_id is None]
return typing.cast(
List[Group], [group for group in groups if group.parent_id is None]
)

def create_webhooks(self) -> list[int | str]:
root_partial_groups = self.get_root_groups()
Expand Down Expand Up @@ -178,9 +199,20 @@ def get_project(self, project_id: int) -> Project | None:

async def get_all_projects(self) -> typing.AsyncIterator[List[Project]]:
logger.info("fetching all projects for the token")
port_app_config = typing.cast("GitlabPortAppConfig", event.port_app_config)
port_app_config: GitlabPortAppConfig = typing.cast(
"GitlabPortAppConfig", event.port_app_config
)

async_fetcher = AsyncFetcher(self.gitlab_client)
cached_projects = event.attributes.setdefault(
PROJECTS_CACHE_KEY, {}
).setdefault(self.gitlab_client.private_token, {})

if cached_projects:
yield cached_projects.values()
return

all_projects = []
async for projects_batch in async_fetcher.fetch(
fetch_func=self.gitlab_client.projects.list,
validation_func=self.should_run_for_project,
Expand All @@ -191,36 +223,97 @@ async def get_all_projects(self) -> typing.AsyncIterator[List[Project]]:
order_by="id",
sort="asc",
):
logger.debug(
logger.info(
f"Queried {len(projects_batch)} projects {[project.path_with_namespace for project in projects_batch]}"
)
all_projects.extend(projects_batch)
yield projects_batch

async def get_all_jobs(self, project: Project):
async_fetcher = AsyncFetcher(self.gitlab_client)
async for issues_batch in async_fetcher.fetch(
event.attributes[PROJECTS_CACHE_KEY][self.gitlab_client.private_token] = {
project.id: project for project in all_projects
}

async def get_all_jobs(
self, project: Project
) -> typing.AsyncIterator[List[ProjectPipelineJob]]:
logger.info(f"fetching jobs for project {project.path_with_namespace}")
async_fetcher: typing.AsyncIterator[List[ProjectPipelineJob]] = AsyncFetcher(
self.gitlab_client
).fetch(
fetch_func=project.jobs.list,
# validation_func=self.should_run_for_issue,
pagination="offset",
order_by="id",
sort="asc",
):
)
async for issues_batch in async_fetcher:
logger.info(
f"Queried {len(issues_batch)} jobs {[job.name for job in issues_batch]}"
)
yield issues_batch

async def get_all_pipelines(self, project: Project):
async def get_all_pipelines(
self, project: Project
) -> typing.AsyncIterator[List[ProjectPipeline]]:
from_time = datetime.now() - timedelta(days=14)
created_after = from_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
async_fetcher = AsyncFetcher(self.gitlab_client)
async for pipelines_batch in async_fetcher.fetch(
logger.info(
f"Fetching pipelines for project {project.path_with_namespace} created after {created_after}"
)
async_fetcher: typing.AsyncIterator[List[ProjectPipeline]] = AsyncFetcher(
self.gitlab_client
).fetch(
fetch_func=project.pipelines.list,
# validation_func=self.should_run_for_issue,
pagination="offset",
order_by="id",
sort="asc",
created_after=created_after,
):
)
async for pipelines_batch in async_fetcher:
logger.info(
f"Queried {len(pipelines_batch)} pipelines {[pipeline.id for pipeline in pipelines_batch]}"
)
yield pipelines_batch

async def get_opened_merge_requests(
self, group: Group
) -> typing.AsyncIterator[List[MergeRequest]]:
async_fetcher = AsyncFetcher(self.gitlab_client)
async for merge_request_batch in async_fetcher.fetch(
fetch_func=group.mergerequests.list,
validation_func=self.should_run_for_merge_request,
pagination="offset",
order_by="created_at",
sort="desc",
state="opened",
):
yield merge_request_batch

async def get_closed_merge_requests(
self, group: Group, updated_after: datetime
) -> typing.AsyncIterator[List[MergeRequest]]:
async_fetcher = AsyncFetcher(self.gitlab_client)
async for merge_request_batch in async_fetcher.fetch(
fetch_func=group.mergerequests.list,
validation_func=self.should_run_for_merge_request,
pagination="offset",
order_by="created_at",
sort="desc",
state=["closed", "locked", "merged"],
updated_after=updated_after.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
):
yield merge_request_batch

async def get_all_issues(self, group: Group) -> typing.AsyncIterator[List[Issue]]:
async_fetcher = AsyncFetcher(self.gitlab_client)
async for issues_batch in async_fetcher.fetch(
fetch_func=group.issues.list,
validation_func=self.should_run_for_issue,
pagination="offset",
order_by="created_at",
sort="desc",
):
yield issues_batch

def get_entities_diff(
self,
project: Project,
Expand All @@ -229,7 +322,16 @@ def get_entities_diff(
after: str,
ref: str,
) -> Tuple[List[Entity], List[Entity]]:
logger.info(
f'Getting entities diff for project {project.path_with_namespace}, in path "{spec_path}", before {before},'
f" after {after}, ref {ref}"
)
entities_before = self._get_entities_by_commit(project, spec_path, before, ref)

logger.info(f"Found {len(entities_before)} entities in the previous state")

entities_after = self._get_entities_by_commit(project, spec_path, after, ref)

logger.info(f"Found {len(entities_after)} entities in the current state")

return entities_before, entities_after
43 changes: 14 additions & 29 deletions integrations/gitlab/gitlab_integration/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from gitlab_integration.bootstrap import setup_application
from gitlab_integration.utils import ObjectKind, get_cached_all_services
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import RAW_RESULT, ASYNC_GENERATOR_RESYNC_TYPE
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE

NO_WEBHOOK_WARNING = "Without setting up the webhook, the integration will not export live changes from the gitlab"

Expand Down Expand Up @@ -51,40 +51,25 @@ async def on_resync(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:


@ocean.on_resync(ObjectKind.MERGE_REQUEST)
async def resync_merge_requests(kind: str) -> RAW_RESULT:
async def resync_merge_requests(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
updated_after = datetime.now() - timedelta(days=14)

result = []
for service in get_cached_all_services():
for group in service.get_root_groups():
merge_requests = group.mergerequests.list(
all=True, state="opened"
) + group.mergerequests.list(
all=True,
state=["closed", "locked", "merged"],
updated_after=updated_after.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
)
for merge_request in merge_requests:
project_path = merge_request.references.get("full").rstrip(
merge_request.references.get("short")
)
if service.should_run_for_path(project_path):
result.append(merge_request.asdict())
return result
async for merge_request_batch in service.get_opened_merge_requests(group):
yield [merge_request.asdict() for merge_request in merge_request_batch]
async for merge_request_batch in service.get_closed_merge_requests(
group, updated_after
):
yield [merge_request.asdict() for merge_request in merge_request_batch]


@ocean.on_resync(ObjectKind.ISSUE)
async def resync_issues(kind: str) -> RAW_RESULT:
issues = []
async def resync_issues(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
for service in get_cached_all_services():
for group in service.get_root_groups():
for issue in group.issues.list(all=True):
project_path = issue.references.get("full").rstrip(
issue.references.get("short")
)
if service.should_run_for_path(project_path):
issues.append(issue.asdict())
return issues
async for issues_batch in service.get_all_issues(group):
yield [issue.asdict() for issue in issues_batch]


@ocean.on_resync(ObjectKind.JOB)
Expand All @@ -93,9 +78,6 @@ async def resync_jobs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async for projects_batch in service.get_all_projects():
for project in projects_batch:
async for jobs_batch in service.get_all_jobs(project):
logger.info(
f"Found {len(jobs_batch)} jobs for project {project.path_with_namespace}"
)
yield [job.asdict() for job in jobs_batch]


Expand All @@ -104,6 +86,9 @@ async def resync_pipelines(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
for service in get_cached_all_services():
async for projects_batch in service.get_all_projects():
for project in projects_batch:
logger.info(
f"Fetching pipelines for project {project.path_with_namespace}"
)
async for pipelines_batch in service.get_all_pipelines(project):
logger.info(
f"Found {len(pipelines_batch)} pipelines for project {project.path_with_namespace}"
Expand Down
2 changes: 1 addition & 1 deletion integrations/gitlab/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gitlab"
version = "0.1.18"
version = "0.1.19"
description = "Gitlab integration for Port using Port-Ocean Framework"
authors = ["Yair Siman-Tov <[email protected]>"]

Expand Down

0 comments on commit db7d871

Please sign in to comment.