Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add metrics for ocean core #1260

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

ivankalinovski
Copy link
Contributor

@ivankalinovski ivankalinovski commented Dec 23, 2024

  1. Decorator that can be used to "metricify" a function - accepts phase as input.
  2. Auto record duration.
  3. increment_field method for handling counters.
  4. increment_status method for handling requests.
  5. populate event with MetricAggregator to track metrics.

Type of change

Please leave one option from the following and delete the rest:

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • New Integration (non-breaking change which adds a new integration)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Non-breaking change (fix of existing functionality that will not change current behavior)
  • Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

Core testing checklist

  • Integration able to create all default resources from scratch
  • Resync finishes successfully
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Scheduled resync able to abort existing resync and start a new one
  • Tested with at least 2 integrations from scratch
  • Tested with Kafka and Polling event listeners
  • Tested deletion of entities that don't pass the selector

Integration testing checklist

  • Integration able to create all default resources from scratch
  • Resync able to create entities
  • Resync able to update entities
  • Resync able to detect and delete entities
  • Resync finishes successfully
  • If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the examples folder in the integration directory.
  • If resource kind is updated, run the integration with the example data and check if the expected result is achieved
  • If new resource kind is added or updated, validate that live-events for that resource are working as expected
  • Docs PR link here

Preflight checklist

  • Handled rate limiting
  • Handled pagination
  • Implemented the code in async
  • Support Multi account

Screenshots

Include screenshots from your environment showing how the resources of the integration will look.

API Documentation

Provide links to the API documentation used for this integration.

@ivankalinovski ivankalinovski requested a review from a team as a code owner December 23, 2024 13:23
Ivan Kalinovski added 3 commits December 30, 2024 01:42
1. Add classes to encapsulate metrics data.
2. Add class to encapsulate metric modification.
3. Add Metric event into `event.py`.
4. Add metric helper decorator to provide context with phase and measure time.
5. Add metric wrapper for async generators to measure time.
6. Add increment metric calls to measure stats.
7. Add tests to validate how many entities were extraced and processed.
8. Add test to compare performance of two branches.
@ivankalinovski ivankalinovski force-pushed the PORT-11681-improve-ocean-logs-for-our-metrics-dashboard branch from 69bb09b to daf9cf0 Compare December 30, 2024 12:59
Ivan Kalinovski added 3 commits December 30, 2024 15:01
1. Fix `validate_metrics`.
2. Pass Metric env to tests.
 Add metrics to topological sorter
@@ -72,6 +75,8 @@ async def upsert_entity(
extensions={"retryable": True},
)
if response.is_error:
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
await event.event.increment_metric(MetricFieldType.PORT_API_ERROR_COUNT)


return reduced_entity

@metric(MetricType.LOAD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be on top of the upsert_entity method rather than batch_upsert_entities ?

@@ -167,6 +178,7 @@ async def delete_entity(
)

if response.is_error:
await event.event.increment_metric(MetricFieldType.ERROR_COUNT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this kind of error count be in the upsert_entity as well?

@@ -79,6 +79,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
)
runtime: Runtime = Runtime.OnPrem
resources_path: str = Field(default=".port/resources")
metrics: bool = Field(default=False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest to have the type as some kind of enum for cases where it won't be only true or false but rather might want to only monitor extract metrics

break


def metric(phase: str | None = None, should_capture_time: bool = True) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs

assert metrics, "No 'metrics' key found in the parsed JSON."

assert "fake-person" in metrics, "'fake-person' key missing in metrics data."
fake_person = metrics["fake-person"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what fake-person mean in a metric? I understand that its contains the extract, load transform but I wonder how we can monitor that?

@@ -163,6 +203,7 @@ def _handle_event(triggering_event_id: int) -> None:
dispatcher.connect(_handle_event, event_type)
dispatcher.send(event_type, triggering_event_id=event.id)

is_silent = EventType.METRIC == event_type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is needed for?

Comment on lines 144 to 145
except Exception:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why break? how the exception is being propogated? its not equivalent to the way you are iterating over when if not port_ocean.context.event.event.should_record_metrics

Comment on lines 99 to 108
async def increment_status(self, status_code: str) -> None:
metric = await self.get_metric()
if metric is None or not isinstance(metric, ApiStats):
return None
async with self._lock:

status = metric.requests.get(status_code)
if not status:
metric.requests[status_code] = 0
metric.requests[status_code] = metric.requests.get(status_code, 0) + 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is actually needed 🤔 its sounds too specific, you already implemented increment_field

async def flush(self) -> None:
async with self._lock:
metric_dict = asdict(self.metrics)
logger.info(f"integration metrics {json.dumps(metric_dict)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please share in the pr description the flush output

Ivan Kalinovski added 2 commits January 9, 2025 12:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants