Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource leak hero scenario. #455

Merged
merged 11 commits into from
Oct 18, 2022
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,5 @@ significant modifications will be credited to OpenTelemetry Authors.
[#432](https://github.com/open-telemetry/opentelemetry-demo/pull/432)
* Replaced the Jaeger exporter to the OTLP exporter in the OTel Collector
([#435](https://github.com/open-telemetry/opentelemetry-demo/pull/435))
* Added cache scenario to recommendation service
([#455](https://github.com/open-telemetry/opentelemetry-demo/pull/455))
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,11 @@ services:
depends_on:
- productcatalogservice
- otelcol
- featureflagservice
environment:
- RECOMMENDATION_SERVICE_PORT
- PRODUCT_CATALOG_SERVICE_ADDR
- FEATURE_FLAG_GRPC_SERVICE_ADDR
- OTEL_PYTHON_LOG_CORRELATION=true
- OTEL_TRACES_EXPORTER=otlp
- OTEL_METRICS_EXPORTER=otlp
Expand All @@ -341,6 +343,7 @@ services:
- OTEL_SERVICE_NAME=recommendationservice
- PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python
logging: *logging
restart: on-failure

# ShippingService
shippingservice:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ defmodule Featureflagservice.Repo.Migrations.CreateFeatureflags do
name: "shippingFailure",
description: "Fail shipping service when shipping a product to a non-USA address",
enabled: false})

repo().insert(%Featureflagservice.FeatureFlags.FeatureFlag{
name: "recommendationCache",
description: "Cache recommendations",
enabled: false})
end

defp execute_down do
repo().delete(%Featureflagservice.FeatureFlags.FeatureFlag{name: "productCatalogFailure"})
repo().delete(%Featureflagservice.FeatureFlags.FeatureFlag{name: "shippingFailure"})
repo().delete(%Featureflagservice.FeatureFlags.FeatureFlag{name: "recommendationCache"})
end
end
2 changes: 1 addition & 1 deletion src/recommendationservice/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.10-slim
FROM python:3.10

WORKDIR /usr/src/app/

Expand Down
40 changes: 35 additions & 5 deletions src/recommendationservice/recommendation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@
init_metrics
)

cached_ids = []
first_run = True

class RecommendationService(demo_pb2_grpc.RecommendationServiceServicer):
def ListRecommendations(self, request, context):
prod_list = get_product_list(request.product_ids)
Expand All @@ -60,16 +63,35 @@ def Watch(self, request, context):


def get_product_list(request_product_ids):
global first_run
global cached_ids
with tracer.start_as_current_span("get_product_list") as span:
max_responses = 5

# Formulate the list of characters to list of strings
request_product_ids_str = ''.join(request_product_ids)
request_product_ids = request_product_ids_str.split(',')

# Fetch list of products from product catalog stub
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
product_ids = [x.id for x in cat_response.products]
# Feature flag scenario - Cache Leak
if check_feature_flag("recommendationCache"):
if random.random() < 0.219 or first_run:
first_run = False
span.set_attribute("app.cache_hit", False)
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
logger.info("cache miss")
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
response_ids = [x.id for x in cat_response.products]
cached_ids = cached_ids + response_ids
cached_ids = cached_ids + cached_ids
austinlparker marked this conversation as resolved.
Show resolved Hide resolved
span.set_attribute("app.cache_size", len(cached_ids))
product_ids = cached_ids
else:
span.set_attribute("app.cache_hit", True)
logger.info("cache hit")
product_ids = cached_ids
else:
cat_response = product_catalog_stub.ListProducts(demo_pb2.Empty())
product_ids = [x.id for x in cat_response.products]

span.set_attribute("app.products.count", len(product_ids))

# Create a filtered list of products excluding the products received as input
Expand All @@ -94,6 +116,11 @@ def must_map_env(key: str):
raise Exception(f'{key} environment variable must be set')
return value

def check_feature_flag(flag_name: str):
flag = feature_flag_stub.GetFlag(demo_pb2.GetFlagRequest(name=flag_name)).flag
logger.info(flag)
return flag.enabled

if __name__ == "__main__":
# Initialize Traces and Metrics
tracer = trace.get_tracer_provider().get_tracer("recommendationservice")
Expand All @@ -102,9 +129,12 @@ def must_map_env(key: str):

port = must_map_env('RECOMMENDATION_SERVICE_PORT')
catalog_addr = must_map_env('PRODUCT_CATALOG_SERVICE_ADDR')
ff_addr = must_map_env('FEATURE_FLAG_GRPC_SERVICE_ADDR')

channel = grpc.insecure_channel(catalog_addr)
product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(channel)
pc_channel = grpc.insecure_channel(catalog_addr)
ff_channel = grpc.insecure_channel(ff_addr)
product_catalog_stub = demo_pb2_grpc.ProductCatalogServiceStub(pc_channel)
feature_flag_stub = demo_pb2_grpc.FeatureFlagServiceStub(ff_channel)

# Create gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
Expand Down