Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom middleware that throws an exception will cause handler starvation #1342

Open
mgoodfellow opened this issue Feb 9, 2024 · 1 comment

Comments

@mgoodfellow
Copy link
Contributor

Adding custom middleware which can throw an exception causes unexpected behaviour, namely, handler starvation.

It seems the default pipeline has the following setup - source

public static HandlerMiddlewareBuilder UseDefaults<TMessage>(
        this HandlerMiddlewareBuilder builder,
        Type handlerType)
        where TMessage : Message
    {
        if (builder == null) throw new ArgumentNullException(nameof(builder));
        if (handlerType == null) throw new ArgumentNullException(nameof(handlerType), "HandlerType is used here to");

        builder.UseMessageContextAccessor();
        builder.Use<LoggingMiddleware>();
        builder.UseStopwatch(handlerType);
        builder.Use<SqsPostProcessorMiddleware>();
        builder.UseErrorHandler();
        builder.UseHandler<TMessage>();

        return builder;
    }

Specifically, the UseErrorHandler middleware added here, converts exceptions into a false result from the handler - source

 protected override async Task<bool> RunInnerAsync(HandleMessageContext context, Func<CancellationToken, Task<bool>> func, CancellationToken stoppingToken)
    {
        try
        {
            return await func(stoppingToken).ConfigureAwait(false);
        }
        catch (Exception e)
        {
            _monitor.HandleException(context.MessageType);
            _monitor.HandleError(e, context.RawMessage);

            context.SetException(e);
            return false;
        }
        finally
        {
            _monitor.Handled(context.Message);
        }
    }

Now, consider a user decides to add some middleware to the outside of the pipeline:

public class BadMiddlewareTest :  MiddlewareBase<HandleMessageContext, bool>
{
    protected override async Task<bool> RunInnerAsync(HandleMessageContext context, Func<CancellationToken, Task<bool>> func, CancellationToken stoppingToken)
    {
        await func(stoppingToken).ConfigureAwait(false);

        throw new Exception();
    }
}

Now I configure this as such:

x.WithSubscriptionGroup("error_messages", cfg => cfg.WithConcurrencyLimit(1));


x.ForTopic<ErrorMessage>(
    cfg =>
    {
        cfg.WithMiddlewareConfiguration(
            m =>
            {
                m.Use<BadMiddlewareTest>();
                m.UseDefaults<ErrorMessage>(typeof(ErrorMessageHandler));
            });

        cfg.WithReadConfiguration(
            r =>
            {
                r.SubscriptionGroupName = "error_messages";
                r.RetryCountBeforeSendingToErrorQueue = 1;
            });
    });

Now a simple handler:

public class ErrorMessageHandler : IHandlerAsync<ErrorMessage>
{
    public static int Counter { get; private set; }

    public async Task<bool> Handle(ErrorMessage message)
    {
        Counter++;
        throw new Exception();
    }
}

We can make a simple test, publish 10 messages, I expect the count to be 10, its actually 1:

    [Test]
    public async Task TestWeDontCrash()
    {
        var messagePublisher = Bootstrapper.Container.GetInstance<IMessagePublisher>();

        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());
        await messagePublisher.PublishAsync(new ErrorMessage());

        await Task.Delay(5000);

        ErrorMessageHandler.Counter.Should().Be(10);
    }

Even if we wait forever, we only ever see a count of 1, however, if we remove the BadMiddlewareTest, we receive the correct count of 10.

In addition, we can also move this middleware deeper in the chain, and the exception being caught will solve this behaviour:

x.ForTopic<ErrorMessage>(
    cfg =>
    {
        cfg.WithMiddlewareConfiguration(
            m =>
            {
                m.UseMessageContextAccessor();
                m.Use<LoggingMiddleware>();
                m.UseStopwatch(handlerType);
                m.Use<SqsPostProcessorMiddleware>();
                m.UseErrorHandler();
                m.Use<BadMiddlewareTest>();
                m.UseHandler<TMessage>();
            });

        cfg.WithReadConfiguration(
            r =>
            {
                r.SubscriptionGroupName = "error_messages";
                r.RetryCountBeforeSendingToErrorQueue = 1;
            });
    });

It has taken us a fair while to work out why we kept suffering from handlers stopping from processing their work and freezing up, and it turns out our UoW middleware we added could throw if it failed to write to the database.

I'm not sure if this is expected behaviour, but if it is, it must be documented clearly when writing middleware. It must not throw an exception under any circumstance unless its added after UseErrorHandler which will handle the issue for you.

@mgoodfellow
Copy link
Contributor Author

It should also be noted just for clarity that whatever you set

x.WithSubscriptionGroup("error_messages", cfg => cfg.WithConcurrencyLimit(1));

to in my original test will be the result of the counter, i.e. each handler will run once and die after the exception isn't caught in the middleware

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant