Skip to content

Commit

Permalink
[receiver/azureeventhub] Fix memory leak on shutdown (#32401)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
As explained
[here](#30438 (comment)),
components need to cancel all their work and background tasks before
returning from `Shutdown`. My original [change for this
component](#32364)
to add `goleak` testing worked around the proper behavior. This PR fully
addresses the `Shutdown` functionality to ensure the component behaves
the way the spec requires.

**Link to tracking Issue:** <Issue number if applicable>
#30438

**Testing:** <Describe what testing was performed and which tests were
added.>
Tests are passing.
  • Loading branch information
crobert-1 authored Apr 16, 2024
1 parent 51677d3 commit 62df686
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 14 deletions.
27 changes: 27 additions & 0 deletions .chloggen/goleak_azureeventhubrec_fix.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: azureeventhubreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak on shutdown

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32401]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
6 changes: 6 additions & 0 deletions receiver/azureeventhubreceiver/eventhubhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ type eventhubHandler struct {
dataConsumer dataConsumer
config *Config
settings receiver.CreateSettings
cancel context.CancelFunc
}

// Implement eventHandler Interface
var _ eventHandler = (*eventhubHandler)(nil)

func (h *eventhubHandler) run(ctx context.Context, host component.Host) error {
ctx, h.cancel = context.WithCancel(ctx)

storageClient, err := adapter.GetStorageClient(ctx, host, h.config.StorageID, h.settings.ID)
if err != nil {
Expand Down Expand Up @@ -177,6 +179,10 @@ func (h *eventhubHandler) close(ctx context.Context) error {
}
h.hub = nil
}
if h.cancel != nil {
h.cancel()
}

return nil
}

Expand Down
19 changes: 5 additions & 14 deletions receiver/azureeventhubreceiver/eventhubhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,6 @@ func (m *mockDataConsumer) consume(ctx context.Context, event *eventhub.Event) e
}

func TestEventhubHandler_Start(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := createDefaultConfig()
config.(*Config).Connection = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"

Expand All @@ -97,17 +94,11 @@ func TestEventhubHandler_Start(t *testing.T) {
}
ehHandler.hub = &mockHubWrapper{}

err := ehHandler.run(ctx, componenttest.NewNopHost())
assert.NoError(t, err)

err = ehHandler.close(ctx)
assert.NoError(t, err)
assert.NoError(t, ehHandler.run(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, ehHandler.close(context.Background()))
}

func TestEventhubHandler_newMessageHandler(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := createDefaultConfig()
config.(*Config).Connection = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"

Expand All @@ -131,11 +122,10 @@ func TestEventhubHandler_newMessageHandler(t *testing.T) {
}
ehHandler.hub = &mockHubWrapper{}

err = ehHandler.run(ctx, componenttest.NewNopHost())
assert.NoError(t, err)
assert.NoError(t, ehHandler.run(context.Background(), componenttest.NewNopHost()))

now := time.Now()
err = ehHandler.newMessageHandler(ctx, &eventhub.Event{
err = ehHandler.newMessageHandler(context.Background(), &eventhub.Event{
Data: []byte("hello"),
PartitionKey: nil,
Properties: map[string]any{"foo": "bar"},
Expand All @@ -158,4 +148,5 @@ func TestEventhubHandler_newMessageHandler(t *testing.T) {
read, ok := sink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().Get("foo")
assert.True(t, ok)
assert.Equal(t, "bar", read.AsString())
assert.NoError(t, ehHandler.close(context.Background()))
}

0 comments on commit 62df686

Please sign in to comment.