Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#61 SSE streaming manager #73

Merged
merged 10 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 75 additions & 10 deletions flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import logging
import typing
from json import JSONDecodeError
from datetime import datetime

import pytz
import requests
from flag_engine import engine
from flag_engine.environments.models import EnvironmentModel
Expand All @@ -15,11 +17,13 @@
from flagsmith.models import DefaultFlag, Flags, Segment
from flagsmith.offline_handlers import BaseOfflineHandler
from flagsmith.polling_manager import EnvironmentDataPollingManager
from flagsmith.streaming_manager import EventStreamManager, StreamEvent
from flagsmith.utils.identities import generate_identities_data

logger = logging.getLogger(__name__)

DEFAULT_API_URL = "https://edge.api.flagsmith.com/api/v1/"
DEFAULT_REALTIME_API_URL = "https://realtime.flagsmith.com/"


class Flagsmith:
Expand All @@ -41,6 +45,7 @@ def __init__(
self,
environment_key: str = None,
api_url: str = None,
realtime_api_url: typing.Optional[str] = None,
custom_headers: typing.Dict[str, typing.Any] = None,
request_timeout_seconds: int = None,
enable_local_evaluation: bool = False,
Expand All @@ -51,11 +56,13 @@ def __init__(
proxies: typing.Dict[str, str] = None,
offline_mode: bool = False,
offline_handler: BaseOfflineHandler = None,
enable_realtime_updates: bool = False,
):
"""
:param environment_key: The environment key obtained from Flagsmith interface.
Required unless offline_mode is True.
:param api_url: Override the URL of the Flagsmith API to communicate with
:param realtime_api_url: Override the URL of the Flagsmith real-time API
:param custom_headers: Additional headers to add to requests made to the
Flagsmith API
:param request_timeout_seconds: Number of seconds to wait for a request to
Expand All @@ -76,12 +83,15 @@ def __init__(
:param offline_handler: provide a handler for offline logic. Used to get environment
document from another source when in offline_mode. Works in place of
default_flag_handler if offline_mode is not set and using remote evaluation.
:param enable_realtime_updates: Use real-time functionality via SSE as opposed to polling the API
"""

self.offline_mode = offline_mode
self.enable_local_evaluation = enable_local_evaluation
self.environment_refresh_interval_seconds = environment_refresh_interval_seconds
self.offline_handler = offline_handler
self.default_flag_handler = default_flag_handler
self.enable_realtime_updates = enable_realtime_updates
self._analytics_processor = None
self._environment = None

Expand All @@ -93,6 +103,11 @@ def __init__(
"Cannot use both default_flag_handler and offline_handler."
)

if enable_realtime_updates and not enable_local_evaluation:
raise ValueError(
"Can only use realtime updates when running in local evaluation mode."
)

if self.offline_handler:
self._environment = self.offline_handler.get_environment()

Expand All @@ -110,6 +125,13 @@ def __init__(
api_url = api_url or DEFAULT_API_URL
self.api_url = api_url if api_url.endswith("/") else f"{api_url}/"

realtime_api_url = realtime_api_url or DEFAULT_REALTIME_API_URL
self.realtime_api_url = (
realtime_api_url
if realtime_api_url.endswith("/")
else f"{realtime_api_url}/"
)

self.request_timeout_seconds = request_timeout_seconds
self.session.mount(self.api_url, HTTPAdapter(max_retries=retries))

Expand All @@ -124,20 +146,60 @@ def __init__(
"in the environment settings page."
)

self.environment_data_polling_manager_thread = (
EnvironmentDataPollingManager(
main=self,
refresh_interval_seconds=environment_refresh_interval_seconds,
daemon=True, # noqa
)
)
self.environment_data_polling_manager_thread.start()
self._initialise_local_evaluation()

if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)

def _initialise_local_evaluation(self) -> None:
if self.enable_realtime_updates:
self.update_environment()
stream_url = f"{self.realtime_api_url}sse/environments/{self._environment.api_key}/stream"

self.event_stream_thread = EventStreamManager(
stream_url=stream_url,
on_event=self.handle_stream_event,
daemon=True,
)

self.event_stream_thread.start()

else:
self.environment_data_polling_manager_thread = (
EnvironmentDataPollingManager(
main=self,
refresh_interval_seconds=self.environment_refresh_interval_seconds,
daemon=True,
)
)

self.environment_data_polling_manager_thread.start()

def handle_stream_event(self, event: StreamEvent) -> None:
try:
event_data = json.loads(event.data)
except json.JSONDecodeError as e:
raise FlagsmithAPIError("Unable to get valid json from event data.") from e

try:
stream_updated_at = datetime.fromtimestamp(event_data.get("updated_at"))
except TypeError as e:
raise FlagsmithAPIError(
"Unable to get valid timestamp from event data."
) from e

if stream_updated_at.tzinfo is None:
stream_updated_at = pytz.utc.localize(stream_updated_at)

environment_updated_at = self._environment.updated_at
if environment_updated_at.tzinfo is None:
environment_updated_at = pytz.utc.localize(environment_updated_at)

if stream_updated_at > environment_updated_at:
self.update_environment()

def get_environment_flags(self) -> Flags:
"""
Get all the default for flags for the current environment.
Expand Down Expand Up @@ -267,7 +329,7 @@ def _get_json_response(self, url: str, method: str, body: dict = None):
response.status_code,
)
return response.json()
except (requests.ConnectionError, JSONDecodeError) as e:
except (requests.ConnectionError, json.JSONDecodeError) as e:
raise FlagsmithAPIError(
"Unable to get valid response from Flagsmith API."
) from e
Expand All @@ -291,3 +353,6 @@ def _build_identity_model(self, identifier: str, **traits):
def __del__(self):
if hasattr(self, "environment_data_polling_manager_thread"):
self.environment_data_polling_manager_thread.stop()

if hasattr(self, "event_stream_thread"):
self.event_stream_thread.stop()
57 changes: 57 additions & 0 deletions flagsmith/streaming_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import threading
from typing import Callable, Generator, Optional, Protocol, cast

import requests
import sseclient

from flagsmith.exceptions import FlagsmithAPIError

logger = logging.getLogger(__name__)


class StreamEvent(Protocol):
data: str


class EventStreamManager(threading.Thread):
def __init__(
self,
*args,
stream_url: str,
on_event: Callable[[StreamEvent], None],
request_timeout_seconds: Optional[int] = None,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self._stop_event = threading.Event()
self.stream_url = stream_url
self.on_event = on_event
self.request_timeout_seconds = request_timeout_seconds

def run(self) -> None:
while not self._stop_event.is_set():
try:
with requests.get(
self.stream_url,
stream=True,
headers={"Accept": "application/json, text/event-stream"},
timeout=self.request_timeout_seconds,
) as response:
sse_client = sseclient.SSEClient(
cast(Generator[bytes, None, None], response)
)
for event in sse_client.events():
self.on_event(event)

except requests.exceptions.ReadTimeout:
pass

except (FlagsmithAPIError, requests.RequestException):
logger.exception("Error handling event stream")

def stop(self) -> None:
self._stop_event.set()

def __del__(self) -> None:
self._stop_event.set()
Loading
Loading