Skip to content

Commit

Permalink
#61 SSE streaming manager (#73)
Browse files Browse the repository at this point in the history
* #61 SSE streaming manager

* Remove extraneous import

* isort fixes

* Self CR

* CR changes

* Simplify Flagsmith.__init__ to keep flake8 happy

* Remove extraneous noqa

* Adds typing to new methods and tests

* Remove typing.TypeAlias

Incompatible with python <= 3.9

* Use Optional as opposed to pipe
  • Loading branch information
bne authored Feb 6, 2024
1 parent 96524b9 commit 5db6d24
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 154 deletions.
85 changes: 75 additions & 10 deletions flagsmith/flagsmith.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
import logging
import typing
from json import JSONDecodeError
from datetime import datetime

import pytz
import requests
from flag_engine import engine
from flag_engine.environments.models import EnvironmentModel
Expand All @@ -15,11 +17,13 @@
from flagsmith.models import DefaultFlag, Flags, Segment
from flagsmith.offline_handlers import BaseOfflineHandler
from flagsmith.polling_manager import EnvironmentDataPollingManager
from flagsmith.streaming_manager import EventStreamManager, StreamEvent
from flagsmith.utils.identities import generate_identities_data

logger = logging.getLogger(__name__)

DEFAULT_API_URL = "https://edge.api.flagsmith.com/api/v1/"
DEFAULT_REALTIME_API_URL = "https://realtime.flagsmith.com/"


class Flagsmith:
Expand All @@ -41,6 +45,7 @@ def __init__(
self,
environment_key: str = None,
api_url: str = None,
realtime_api_url: typing.Optional[str] = None,
custom_headers: typing.Dict[str, typing.Any] = None,
request_timeout_seconds: int = None,
enable_local_evaluation: bool = False,
Expand All @@ -51,11 +56,13 @@ def __init__(
proxies: typing.Dict[str, str] = None,
offline_mode: bool = False,
offline_handler: BaseOfflineHandler = None,
enable_realtime_updates: bool = False,
):
"""
:param environment_key: The environment key obtained from Flagsmith interface.
Required unless offline_mode is True.
:param api_url: Override the URL of the Flagsmith API to communicate with
:param realtime_api_url: Override the URL of the Flagsmith real-time API
:param custom_headers: Additional headers to add to requests made to the
Flagsmith API
:param request_timeout_seconds: Number of seconds to wait for a request to
Expand All @@ -76,12 +83,15 @@ def __init__(
:param offline_handler: provide a handler for offline logic. Used to get environment
document from another source when in offline_mode. Works in place of
default_flag_handler if offline_mode is not set and using remote evaluation.
:param enable_realtime_updates: Use real-time functionality via SSE as opposed to polling the API
"""

self.offline_mode = offline_mode
self.enable_local_evaluation = enable_local_evaluation
self.environment_refresh_interval_seconds = environment_refresh_interval_seconds
self.offline_handler = offline_handler
self.default_flag_handler = default_flag_handler
self.enable_realtime_updates = enable_realtime_updates
self._analytics_processor = None
self._environment = None

Expand All @@ -93,6 +103,11 @@ def __init__(
"Cannot use both default_flag_handler and offline_handler."
)

if enable_realtime_updates and not enable_local_evaluation:
raise ValueError(
"Can only use realtime updates when running in local evaluation mode."
)

if self.offline_handler:
self._environment = self.offline_handler.get_environment()

Expand All @@ -110,6 +125,13 @@ def __init__(
api_url = api_url or DEFAULT_API_URL
self.api_url = api_url if api_url.endswith("/") else f"{api_url}/"

realtime_api_url = realtime_api_url or DEFAULT_REALTIME_API_URL
self.realtime_api_url = (
realtime_api_url
if realtime_api_url.endswith("/")
else f"{realtime_api_url}/"
)

self.request_timeout_seconds = request_timeout_seconds
self.session.mount(self.api_url, HTTPAdapter(max_retries=retries))

Expand All @@ -124,20 +146,60 @@ def __init__(
"in the environment settings page."
)

self.environment_data_polling_manager_thread = (
EnvironmentDataPollingManager(
main=self,
refresh_interval_seconds=environment_refresh_interval_seconds,
daemon=True, # noqa
)
)
self.environment_data_polling_manager_thread.start()
self._initialise_local_evaluation()

if enable_analytics:
self._analytics_processor = AnalyticsProcessor(
environment_key, self.api_url, timeout=self.request_timeout_seconds
)

def _initialise_local_evaluation(self) -> None:
if self.enable_realtime_updates:
self.update_environment()
stream_url = f"{self.realtime_api_url}sse/environments/{self._environment.api_key}/stream"

self.event_stream_thread = EventStreamManager(
stream_url=stream_url,
on_event=self.handle_stream_event,
daemon=True,
)

self.event_stream_thread.start()

else:
self.environment_data_polling_manager_thread = (
EnvironmentDataPollingManager(
main=self,
refresh_interval_seconds=self.environment_refresh_interval_seconds,
daemon=True,
)
)

self.environment_data_polling_manager_thread.start()

def handle_stream_event(self, event: StreamEvent) -> None:
try:
event_data = json.loads(event.data)
except json.JSONDecodeError as e:
raise FlagsmithAPIError("Unable to get valid json from event data.") from e

try:
stream_updated_at = datetime.fromtimestamp(event_data.get("updated_at"))
except TypeError as e:
raise FlagsmithAPIError(
"Unable to get valid timestamp from event data."
) from e

if stream_updated_at.tzinfo is None:
stream_updated_at = pytz.utc.localize(stream_updated_at)

environment_updated_at = self._environment.updated_at
if environment_updated_at.tzinfo is None:
environment_updated_at = pytz.utc.localize(environment_updated_at)

if stream_updated_at > environment_updated_at:
self.update_environment()

def get_environment_flags(self) -> Flags:
"""
Get all the default for flags for the current environment.
Expand Down Expand Up @@ -267,7 +329,7 @@ def _get_json_response(self, url: str, method: str, body: dict = None):
response.status_code,
)
return response.json()
except (requests.ConnectionError, JSONDecodeError) as e:
except (requests.ConnectionError, json.JSONDecodeError) as e:
raise FlagsmithAPIError(
"Unable to get valid response from Flagsmith API."
) from e
Expand All @@ -291,3 +353,6 @@ def _build_identity_model(self, identifier: str, **traits):
def __del__(self):
if hasattr(self, "environment_data_polling_manager_thread"):
self.environment_data_polling_manager_thread.stop()

if hasattr(self, "event_stream_thread"):
self.event_stream_thread.stop()
57 changes: 57 additions & 0 deletions flagsmith/streaming_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import logging
import threading
from typing import Callable, Generator, Optional, Protocol, cast

import requests
import sseclient

from flagsmith.exceptions import FlagsmithAPIError

logger = logging.getLogger(__name__)


class StreamEvent(Protocol):
data: str


class EventStreamManager(threading.Thread):
def __init__(
self,
*args,
stream_url: str,
on_event: Callable[[StreamEvent], None],
request_timeout_seconds: Optional[int] = None,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self._stop_event = threading.Event()
self.stream_url = stream_url
self.on_event = on_event
self.request_timeout_seconds = request_timeout_seconds

def run(self) -> None:
while not self._stop_event.is_set():
try:
with requests.get(
self.stream_url,
stream=True,
headers={"Accept": "application/json, text/event-stream"},
timeout=self.request_timeout_seconds,
) as response:
sse_client = sseclient.SSEClient(
cast(Generator[bytes, None, None], response)
)
for event in sse_client.events():
self.on_event(event)

except requests.exceptions.ReadTimeout:
pass

except (FlagsmithAPIError, requests.RequestException):
logger.exception("Error handling event stream")

def stop(self) -> None:
self._stop_event.set()

def __del__(self) -> None:
self._stop_event.set()
Loading

0 comments on commit 5db6d24

Please sign in to comment.