Skip to content

Commit

Permalink
[Core] Add metrics to monitor integration performance
Browse files Browse the repository at this point in the history
Set metrics as feature flag.
  • Loading branch information
Ivan Kalinovski authored and Ivan Kalinovski committed Dec 30, 2024
1 parent a1df9b8 commit daf9cf0
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 12 deletions.
7 changes: 7 additions & 0 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
)
runtime: Runtime = Runtime.OnPrem
resources_path: str = Field(default=".port/resources")
metrics: bool = Field(default=False)

@root_validator()
def validate_integration_config(cls, values: dict[str, Any]) -> dict[str, Any]:
Expand All @@ -101,6 +102,12 @@ def parse_config(model: Type[BaseModel], config: Any) -> BaseModel:

return values

@validator("metrics")
def validate_metrics(cls, value: str) -> bool:
if value == "1":
return True
return False

@validator("runtime")
def validate_runtime(cls, runtime: Runtime) -> Runtime:
if runtime.is_saas_runtime:
Expand Down
13 changes: 13 additions & 0 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from uuid import uuid4

from loguru import logger
from port_ocean.context import ocean
from port_ocean.helpers.metric import MetricAggregator
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from pydispatch import dispatcher # type: ignore
Expand Down Expand Up @@ -75,20 +76,32 @@ def abort(self) -> None:
)
self._aborted = True

async def flush_metric_logs(self) -> None:
if event._metric_aggregator:
await event._metric_aggregator.flush()

async def increment_status(self, status_code: str) -> None:
if not self.should_record_metrics:
return
try:
if self._metric_aggregator:
await self._metric_aggregator.increment_status(status_code)
except Exception:
pass

async def increment_metric(self, metric: str, amount: int | float = 1) -> None:
if not self.should_record_metrics:
return
try:
if self._metric_aggregator:
await self._metric_aggregator.increment_field(metric, amount)
except Exception:
pass

@property
def should_record_metrics(self) -> bool:
return ocean.ocean.config.metrics

@property
def aborted(self) -> bool:
return self._aborted
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ async def sync_raw_all(

logger.error(message, exc_info=error_group)
else:
await event._metric_aggregator.flush() if event._metric_aggregator else print("WHAT`")
await event.flush_metric_logs()
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
)
Expand Down
5 changes: 5 additions & 0 deletions port_ocean/helpers/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ async def reset(self) -> None:
async def timed_generator(
generator: ASYNC_GENERATOR_RESYNC_TYPE,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
if not port_ocean.context.event.event.should_record_metrics:
async for items in generator:
yield items
async with port_ocean.context.event.event_context(
port_ocean.context.event.EventType.METRIC, attributes={"phase": "extract"}
):
Expand All @@ -146,6 +149,8 @@ def metric(phase: str | None = None, should_capture_time: bool = True) -> Any:
def decorator(func: Callable[..., Any]) -> Any:
@wraps(func)
async def wrapper(*args: Any, **kwargs: dict[Any, Any]) -> Any:
if not port_ocean.context.event.event.should_record_metrics:
return await func(*args, **kwargs)
if not phase:
_phase = port_ocean.context.event.event.attributes.get("phase")
async with port_ocean.context.event.event_context(
Expand Down
16 changes: 6 additions & 10 deletions port_ocean/helpers/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,12 @@ async def _retry_operation_async(
attempts_made = 0
response: httpx.Response | None = None
error: Exception | None = None

metric = None
is_event_started = False
try:
metric = (
await event.event._metric_aggregator.get_metric()
if event.event._metric_aggregator
else None
)
event.event.should_record_metrics
is_event_started = True
except EventContextNotFoundError:
pass
is_event_started = False

while True:
if attempts_made > 0:
Expand All @@ -305,7 +301,7 @@ async def _retry_operation_async(
await asyncio.sleep(sleep_time)
(
await event.event.increment_metric(MetricFieldType.RATE_LIMIT)
if metric
if is_event_started
else None
)

Expand All @@ -316,7 +312,7 @@ async def _retry_operation_async(

(
await event.event.increment_status(str(response.status_code))
if metric
if is_event_started
else None
)

Expand Down
1 change: 0 additions & 1 deletion port_ocean/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def __init__(
_integration_config_model=config_factory,
**(config_override or {}),
)

# add the integration sensitive configuration to the sensitive patterns to mask out
sensitive_log_filter.hide_sensitive_strings(
*self.config.get_sensitive_fields_data()
Expand Down

0 comments on commit daf9cf0

Please sign in to comment.