Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue Listener Drain Mode fixes #47791

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/nunit.runsettings
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<!-- <Parameter name="DisableAutoRecording" value="true" /> -->

<!-- Change the test mode -->
<!-- <Parameter name="TestMode" value="Record" /> -->
<Parameter name="TestMode" value="Live" />

<!-- Enable running Fiddler -->
<!-- <Parameter name="EnableFiddler" value="true" /> -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public QueueListener(QueueClient queue,
ConcurrencyManager concurrencyManager = null,
string functionId = null,
TimeSpan? maxPollingInterval = null,
IDrainModeManager drainModeManager = null)
IDrainModeManager drainModeManager = null,
CancellationTokenSource shutdownCancellationTokenSource = null,
CancellationTokenSource executionCancellationTokenSource = null)
{
if (queueOptions == null)
{
Expand Down Expand Up @@ -133,8 +135,8 @@ public QueueListener(QueueClient queue,

_delayStrategy = new RandomizedExponentialBackoffStrategy(QueuePollingIntervals.Minimum, maximumInterval);

_shutdownCancellationTokenSource = new CancellationTokenSource();
_executionCancellationTokenSource = new CancellationTokenSource();
_shutdownCancellationTokenSource = shutdownCancellationTokenSource ?? new CancellationTokenSource();
_executionCancellationTokenSource = executionCancellationTokenSource ?? new CancellationTokenSource();

_concurrencyManager = concurrencyManager;

Expand Down Expand Up @@ -169,10 +171,12 @@ public Task StartAsync(CancellationToken cancellationToken)

public async Task StopAsync(CancellationToken cancellationToken)
{
if (_drainModeManager?.IsDrainModeEnabled ?? false)
if (!_drainModeManager?.IsDrainModeEnabled ?? true)
{
// Cancel the execution token when drain mode is not enabled or drain mode manager is not set.
_executionCancellationTokenSource.Cancel();
}

using (cancellationToken.Register(() => _shutdownCancellationTokenSource.Cancel()))
{
ThrowIfDisposed();
Expand Down Expand Up @@ -417,7 +421,7 @@ internal async Task ProcessMessageAsync(QueueMessage message, TimeSpan visibilit
// Use a different cancellation token for shutdown to allow graceful shutdown.
// Specifically, don't cancel the completion or update of the message itself during graceful shutdown.
// Only cancel completion or update of the message if a non-graceful shutdown is requested via _shutdownCancellationTokenSource.
await _queueProcessor.CompleteProcessingMessageAsync(message, result, linkedCts.Token).ConfigureAwait(false);
await _queueProcessor.CompleteProcessingMessageAsync(message, result, _shutdownCancellationTokenSource.Token).ConfigureAwait(false);
}
}
catch (TaskCanceledException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
### Breaking Changes

### Bugs Fixed
- When DrainMode is enabled, calling StopAsync in QueueListener.cs will not cancel the execution cancellation token.
- In QueueListener.cs When ProcessMessageAsync calls QueueProcessor.CompleteProcessingMessageAsync, pass in only the shutdown cancellation token only and not the caller's cancellation token in order to allow for graceful shutdown.

### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Listeners;
using Microsoft.Azure.WebJobs.Extensions.Storage.Common.Tests;
using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Listeners;
using Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Protocols;
Expand Down Expand Up @@ -45,17 +46,17 @@ public class QueueListenerTests : LiveTestBase<WebJobsTestEnvironment>
private TestLoggerProvider _loggerProvider;
private QueuesOptions _queuesOptions;

[OneTimeSetUp]
public void OneTimeSetUp()
{
Fixture = new TestFixture();
}
//[OneTimeSetUp]
//public void OneTimeSetUp()
//{
// Fixture = new TestFixture();
//}

[OneTimeTearDown]
public void OneTimeTearDown()
{
Fixture.Dispose();
}
//[OneTimeTearDown]
//public void OneTimeTearDown()
//{
// Fixture.Dispose();
//}

[SetUp]
public void SetUp()
Expand Down Expand Up @@ -648,6 +649,175 @@ public async Task ProcessMessageAsync_FunctionInvocationFails()
await _listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(10), cancellationToken);
}

private (QueueListener Listener,
TestQueueProcessor Processor,
CancellationTokenSource CallerCts,
CancellationTokenSource SystemShutdownCts)
CreateListenerAndMocks()
{
var callerCts = new CancellationTokenSource();
var systemShutdownCts = new CancellationTokenSource();
QueueProcessorOptions options = new QueueProcessorOptions(_mockQueue.Object, _loggerFactory, _queuesOptions);
var queueProcessor = new TestQueueProcessor(options);

_mockTriggerExecutor.Setup(x => x.ExecuteAsync(It.IsAny<QueueMessage>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new FunctionResult(true));

var exceptionHandlerMock = new Mock<IWebJobsExceptionHandler>();

var listener = new QueueListener(
_mockQueue.Object,
null,
_mockTriggerExecutor.Object,
_mockExceptionDispatcher.Object,
_loggerFactory,
null,
_queuesOptions,
queueProcessor,
new FunctionDescriptor { Id = TestFunctionId },
null,
drainModeManager: null,
shutdownCancellationTokenSource: systemShutdownCts);

return (listener, queueProcessor, callerCts, systemShutdownCts);
}

[Test]
public async Task ProcessMessageAsync_NoTokensCanceled_CallsCompleteProcessingMessageAsync()
{
// Arrange
var (listener, processor, callerCts, shutdownCts) = CreateListenerAndMocks();

// Act
// Neither token is canceled
await listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(2), callerCts.Token);

// TODO verify that the message was processed and deleted
Assert.AreEqual(shutdownCts.Token, processor.CapturedDeleteToken);
}

[Test]
public async Task ProcessMessageAsync_ShutdownCanceled_GracefulShutdownWithCompleteProcessingMessagesAsyncRENANENE()
{
// Arrange
var (listener, processor, callerCts, shutdownCts) = CreateListenerAndMocks();

// Act
// Cancel the caller token
callerCts.Cancel();

await listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(2), callerCts.Token);

// TODO verify that the message was processed and deleted
Assert.AreEqual(shutdownCts.Token, processor.CapturedDeleteToken);
}

[Test]
public async Task ProcessMessageAsync_ShutdownCanceled_GracefulShutdownWithCompleteProcessingMessagesAsync()
{
// Arrange
var (listener, processor, callerCts, shutdownCts) = CreateListenerAndMocks();

// Act
// Cancel the shutdown token
shutdownCts.Cancel();

await listener.ProcessMessageAsync(_queueMessage, TimeSpan.FromMinutes(2), callerCts.Token);

// TODO verify that the message was not deleted because this is a forced shutdown
Assert.AreEqual(shutdownCts.Token, processor.CapturedDeleteToken);
}

[Test]
public async Task StopAsync_WhenDrainModeNotEnabled_ExecutionCancellationTokenIsCanceled()
{
var drainModeManagerMock = new Mock<IDrainModeManager>();
drainModeManagerMock.Setup(d => d.IsDrainModeEnabled).Returns(false);
var executionCancellationTokenSource = new CancellationTokenSource();

var listener = new QueueListener(
_mockQueue.Object,
null,
_mockTriggerExecutor.Object,
_mockExceptionDispatcher.Object,
_loggerFactory,
null,
_queuesOptions,
_mockQueueProcessor.Object,
new FunctionDescriptor { Id = TestFunctionId },
null,
drainModeManager: drainModeManagerMock.Object,
executionCancellationTokenSource: executionCancellationTokenSource);

// Act
await listener.StartAsync(CancellationToken.None);

await listener.StopAsync(CancellationToken.None);

// Assert
Assert.IsTrue(executionCancellationTokenSource.Token.IsCancellationRequested, "Execution token should be canceled when drain mode is not enabled.");
}

[Test]
public async Task StopAsync_WhenDrainModeEnabled_ExecutionCancellationTokenIsNotCanceled()
{
// Arrange
var drainModeManagerMock = new Mock<IDrainModeManager>();
drainModeManagerMock.Setup(d => d.IsDrainModeEnabled).Returns(true);
var executionCancellationTokenSource = new CancellationTokenSource();

var listener = new QueueListener(
_mockQueue.Object,
null,
_mockTriggerExecutor.Object,
_mockExceptionDispatcher.Object,
_loggerFactory,
null,
_queuesOptions,
_mockQueueProcessor.Object,
new FunctionDescriptor { Id = TestFunctionId },
null,
drainModeManager: drainModeManagerMock.Object,
executionCancellationTokenSource: executionCancellationTokenSource);

// Act
await listener.StartAsync(CancellationToken.None);

await listener.StopAsync(CancellationToken.None);

// Assert
Assert.IsFalse(executionCancellationTokenSource.Token.IsCancellationRequested, "Execution token should not be canceled when drain mode is enabled.");
}

[Test]
public async Task StopAsync_ActivatesCancellation_WhenDrainModeManagerNull()
{
// Arrange
var executionCancellationTokenSource = new CancellationTokenSource();

var listener = new QueueListener(
_mockQueue.Object,
null,
_mockTriggerExecutor.Object,
_mockExceptionDispatcher.Object,
_loggerFactory,
null,
_queuesOptions,
_mockQueueProcessor.Object,
new FunctionDescriptor { Id = TestFunctionId },
null,
drainModeManager: null,
executionCancellationTokenSource: executionCancellationTokenSource);

// Act
await listener.StartAsync(CancellationToken.None);

await listener.StopAsync(CancellationToken.None);

// Assert
Assert.IsTrue(executionCancellationTokenSource.Token.IsCancellationRequested, "Execution token should be canceled when drain mode manager is null.");
}

[Test]
public void Get_TargetScale_IsNotNull()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System.Threading;
using System.Threading.Tasks;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.WebJobs.Host.Queues;

namespace Microsoft.Azure.WebJobs.Extensions.Storage.Queues.Tests
{
public class TestQueueProcessor : QueueProcessor
{
public bool DeleteCalled { get; private set; }
public CancellationToken CapturedDeleteToken { get; private set; }

public TestQueueProcessor(QueueProcessorOptions options)
: base(options)
{
}

protected override async Task DeleteMessageAsync(QueueMessage message, CancellationToken cancellationToken)
{
DeleteCalled = true;
CapturedDeleteToken = cancellationToken; // store the exact token used
await base.DeleteMessageAsync(message, cancellationToken);
}
}
}
Loading