Skip to content

Commit 70a63d1

Browse files
committed
Clarifies cancellation token behavior
Explains how cancellation tokens are used during resource creation and publish/subscribe operations to prevent unexpected behavior, such as prematurely aborting infrastructure setup. This ensures that resource creation only aborts when the message bus or queue is disposed, not due to individual caller cancellations. Provides guidance for implementation authors regarding the use of `DisposedCancellationToken` for setup operations and linked tokens for message sending and queue operations.
1 parent 7b581c2 commit 70a63d1

4 files changed

Lines changed: 87 additions & 1 deletion

File tree

docs/guide/messaging.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,39 @@ await messageBus.SubscribeAsync<OrderCreated>(async order =>
755755
});
756756
```
757757

758+
## Cancellation Token Behavior
759+
760+
Understanding how cancellation tokens are handled internally is important for building reliable publishers and subscribers.
761+
762+
### Resource Creation Uses Disposal Token
763+
764+
When you call `PublishAsync` or `SubscribeAsync`, the message bus may need to create infrastructure (e.g., Azure Service Bus topics, RabbitMQ exchanges, SQS topics). These setup operations use an internal disposal token — **not** the caller's cancellation token. This means:
765+
766+
- **Topic and subscription creation only abort when the message bus is disposed**, never because a single caller cancelled their operation.
767+
- A cancelled publish will not leave topic infrastructure in a half-created state.
768+
- Multiple concurrent publishers/subscribers cannot interfere with each other's setup.
769+
770+
### Linked Cancellation for Publish
771+
772+
The caller's cancellation token is combined with the disposal token into a linked token for the actual publish operation. This means:
773+
774+
- Publish cancels when **either** the caller cancels **or** the message bus is disposed.
775+
- Graceful shutdown via `Dispose()` cancels all in-flight publishes promptly.
776+
777+
```csharp
778+
// Topic creation always completes (unless disposed), even if the publish is cancelled
779+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
780+
await messageBus.PublishAsync(new OrderCreated { OrderId = 123 }, cancellationToken: cts.Token);
781+
```
782+
783+
### For Implementation Authors
784+
785+
If you are writing a custom `IMessageBus` implementation by extending `MessageBusBase<TOptions>`:
786+
787+
- **`EnsureTopicCreatedAsync`** always receives `DisposedCancellationToken`. Use it for all setup operations (lock acquisition, API calls, etc.).
788+
- **`EnsureTopicSubscriptionAsync`** always receives `DisposedCancellationToken`. Use it for subscription infrastructure setup.
789+
- **`PublishImplAsync`** receives a linked token (caller + disposal). Respect it for the actual message send.
790+
758791
## Best Practices
759792

760793
### 1. Use Immutable Messages

docs/guide/queues.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,42 @@ catch (QueueException ex)
788788
}
789789
```
790790

791+
## Cancellation Token Behavior
792+
793+
Understanding how cancellation tokens are handled internally is important for building reliable queue consumers.
794+
795+
### Resource Creation Uses Disposal Token
796+
797+
When you call `EnqueueAsync`, `DequeueAsync`, or `GetDeadletterItemsAsync`, the queue may need to create infrastructure (e.g., SQS queues, Azure Service Bus queues, Redis streams). These setup operations use an internal disposal token — **not** the caller's cancellation token. This means:
798+
799+
- **Queue creation only aborts when the queue is disposed**, never because a single caller cancelled their operation.
800+
- A cancelled `DequeueAsync` call (e.g., from a zero timeout) will not prevent queue creation from completing.
801+
- Multiple concurrent callers cannot interfere with each other's setup.
802+
803+
### Linked Cancellation for Operations
804+
805+
The caller's cancellation token is combined with the disposal token into a linked token for the actual operation (dequeue, deadletter retrieval, etc.). This means:
806+
807+
- Operations cancel when **either** the caller cancels **or** the queue is disposed.
808+
- Graceful shutdown via `Dispose()` cancels all in-flight operations promptly.
809+
810+
```csharp
811+
// This will never prevent queue creation, even though it times out immediately
812+
var entry = await queue.DequeueAsync(TimeSpan.Zero);
813+
814+
// The cancellation token only affects the dequeue wait, not infrastructure setup
815+
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
816+
var entry = await queue.DequeueAsync(cts.Token);
817+
```
818+
819+
### For Implementation Authors
820+
821+
If you are writing a custom `IQueue<T>` implementation by extending `QueueBase<T>`:
822+
823+
- **`EnsureQueueCreatedAsync`** always receives `DisposedCancellationToken`. Use it for all setup operations (lock acquisition, API calls, etc.).
824+
- **`DequeueImplAsync`** receives a linked token (caller + disposal). Respect it for the wait/poll operation.
825+
- **`EnqueueImplAsync`** does not receive a cancellation token — keep enqueue fast and non-blocking.
826+
791827
## Best Practices
792828

793829
### 1. Proper Resource Disposal

src/Foundatio/Messaging/MessageBusBase.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ protected CancellationTokenSource GetLinkedDisposableCancellationTokenSource(Can
6565
TimeProvider IHaveTimeProvider.TimeProvider => _timeProvider;
6666
IResiliencePolicyProvider IHaveResiliencePolicyProvider.ResiliencePolicyProvider => _resiliencePolicyProvider;
6767

68+
/// <summary>
69+
/// Called before publishing to ensure the topic exists. The <paramref name="cancellationToken"/>
70+
/// is always <see cref="MaintenanceBase.DisposedCancellationToken"/>; topic creation should only
71+
/// abort when the message bus is being disposed, never due to an individual caller's cancellation.
72+
/// </summary>
6873
protected virtual Task EnsureTopicCreatedAsync(CancellationToken cancellationToken) => Task.CompletedTask;
6974

7075
protected abstract Task PublishImplAsync(string messageType, object message, MessageOptions options, CancellationToken cancellationToken);
@@ -84,12 +89,13 @@ public async Task PublishAsync(Type messageType, object message, MessageOptions
8489
options.Properties.Add("TraceState", Activity.Current.TraceStateString);
8590
}
8691

87-
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
8892
try
8993
{
9094
// Use DisposedCancellationToken for setup: topic creation should only abort on disposal,
9195
// not due to an individual caller's cancellation token.
9296
await EnsureTopicCreatedAsync(DisposedCancellationToken).AnyContext();
97+
98+
using var linkedCancellationTokenSource = GetLinkedDisposableCancellationTokenSource(cancellationToken);
9399
await PublishImplAsync(GetMappedMessageType(messageType), message, options, linkedCancellationTokenSource.Token).AnyContext();
94100
}
95101
catch (Exception ex) when (ex is not OperationCanceledException and not MessageBusException)
@@ -147,6 +153,12 @@ protected virtual Type GetMappedMessageType(string messageType)
147153
}
148154

149155
protected virtual Task RemoveTopicSubscriptionAsync() => Task.CompletedTask;
156+
/// <summary>
157+
/// Called after subscribing to ensure the topic subscription infrastructure exists. The
158+
/// <paramref name="cancellationToken"/> is always <see cref="MaintenanceBase.DisposedCancellationToken"/>;
159+
/// subscription setup should only abort when the message bus is being disposed, never due to
160+
/// an individual caller's cancellation.
161+
/// </summary>
150162
protected virtual Task EnsureTopicSubscriptionAsync(CancellationToken cancellationToken) => Task.CompletedTask;
151163

152164
protected virtual Task SubscribeImplAsync<T>(Func<T, CancellationToken, Task> handler, CancellationToken cancellationToken) where T : class

src/Foundatio/Queues/QueueBase.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ public void AttachBehavior(IQueueBehavior<T> behavior)
123123
behavior.Attach(this);
124124
}
125125

126+
/// <summary>
127+
/// Called before queue operations to ensure the queue exists. The <paramref name="cancellationToken"/>
128+
/// is always <see cref="MaintenanceBase.DisposedCancellationToken"/>; queue creation should only
129+
/// abort when the queue is being disposed, never due to an individual caller's cancellation.
130+
/// </summary>
126131
protected abstract Task EnsureQueueCreatedAsync(CancellationToken cancellationToken = default);
127132

128133
protected abstract Task<string> EnqueueImplAsync(T data, QueueEntryOptions options);

0 commit comments

Comments
 (0)