Life at Triple | How to effectively work with stateful processes in a stateless environment
Skip to content

STORIES

How to effectively work with stateful processes in a stateless environment

Azure functions

This blog is written by Thomas Bleijendaal, .Net Developer at Triple

One of our customers requested for us to build an API platform based on Azure Functions. Generally, Azure Functions are great, especially if you use them for APIs that perform tasks that finish quickly. However, when building a more extensive system, you’ll inevitably run into a challenge that can only be solved by a long-running, stateful process that appears to be impossible to implement using Azure Functions. This blog describes the journey we undertook when implementing a challenging purchase flow that kept throwing us curveballs.

To start out, we implemented a middleware API-layer for a customer that attempts to create a uniform API platform for all the applications, apps and integrations they have created. We opted for Azure Functions as we were anticipating a diverse array of triggers; besides the plain HTTP triggers, we used queue and service bus triggers. As all of the triggers work pretty similar, it seemed a good place to start. Although the consumption-based billing of Azure Functions had its appeal, due to some slow cold starts we chose to move to an Elastic Plan once the API platform grew.

Starting simple

The API I would like to focus on in this article is a purchase API. As I will not elaborate on the exact implementation or show how to use the third-party system for which the purchase API was built, I’ll replace it with an analogous, but fictional purchase API. And, as one would always do when implementing something, I’ll start simple.

The basic implementation of the purchase API is simple. When a client calls our API, we should fetch the order that is going to be purchased, create an invoice for it, and call the inventory API to indicate that the products of the order are being sold.

So let’s start with the most basic version of the API. It receives a POST call containing the order id, which is passed to a simple service that contains the limited business logic for this purchase. [FunctionName("Purchase1")] public async Task<IActionResult> PurchaseAsync( [HttpTrigger(AuthorizationLevel.Function, "post", Route = "purchase/{orderId}")] HttpRequest req, string orderId) { await _purchaseService.PurchaseAsync(orderId); return new OkResult(); } The service is also very simple. public class PurchaseService { public async Task PurchaseAsync(string orderId) { var order = await _orderGateway.GetOrderAsync(orderId); if (order == null) return; await _orderGateway.CreateInvoiceAsync(orderId); foreach (var product in order.Products) { await _orderGateway.MarkProductAsSoldAsync(product); } } } The gateways in these examples are classes that use an HttpClient to send requests to the third-party system. They contain the specific logic to map the Request and Response object to the correct data format for each of the external APIs.

And the testing begins…

When the first implementation was tested, it became clear the initial implementation was a bit too simple. It could use some optimisations that allowed us to remove some repetitive code. We decided to move some of the code to request handlers, a concept from the MediatR package. This allowed us to replace the direct gateway calls with mediator calls and contain the request in records that we dubbed Commands and Queries. Commands for actions that do stuff, and Queries for actions that fetch stuff. While it is not specifically required to use these names when using MediatR, it fitted our way of working nicely. public class MediatorPurchaseService { public async Task PurchaseAsync(string orderId) { var order = await _mediator.Send(new GetOrderQuery(orderId)); await _mediator.Send(new CreateInvoiceCommand(orderId)); foreach (var product in order.Products) { await _mediator.Send(new MarkProductAsSoldCommand(product)); } } } When this implementation was tested, it worked quite well on test. It did the job, although it was a bit slow. Not because our code was inefficient or bad — it was just because the quality of the third-party APIs started to shine through.

However, when the implementation was tested for real, we discovered that the create-invoice call triggered some background process in the invoicing system that should be awaited. If we continue before that process finished, we were under the risk of committing certain changes that would be overwritten by the background process, leaving the purchase in a weird, undocumented state.

An alternative approach

Before we returned to the drawing board, we came up with a quick solution to fix the problem. We would employ an activity that was triggered from the original http request, and that allowed us to wait for the invoice system: [FunctionName("Purchase3")] public async Task<IActionResult> PurchaseAsync( [HttpTrigger(AuthorizationLevel.Function, "post", Route = "example3/purchase/{orderId}")] HttpRequest req, [DurableClient] IDurableOrchestrationClient orchestrationClient, string orderId) { var invoiceId = await _purchaseService.StartPurchaseAsync(orderId); await orchestrationClient.StartNewAsync("Purchase3Orchestrator", new Purchase(orderId, invoiceId)); return new StatusCodeResult(StatusCodes.Status202Accepted); } The activity looked like this: [FunctionName("Purchase3Orchestrator")] public async Task OrchestrateAsync([OrchestrationTrigger]IDurableOrchestrationContext context) => await context.CallActivityWithRetryAsync("Purchase3Activity", new RetryOptions(TimeSpan.FromSeconds(1), 10), context.GetInput<Purchase>()); [FunctionName("Purchase3Activity")] public async Task ActivityAsync([ActivityTrigger]Purchase purchase) => await _purchaseService.CompletePurchaseAsync(purchase.OrderId, purchase.InvoiceId); The purchase call was split: public class SplitMediatorPurchaseService { public async Task<Guid> StartPurchaseAsync(string orderId) { var order = await _mediator.Send(new GetOrderQuery(orderId)); var invoice = await _mediator.Send(new CreateInvoiceCommand(order.Id)); return invoice.Id; } public async Task CompletePurchaseAsync(string orderId, Guid invoiceId) { var invoice = await _mediator.Send(new GetInvoiceQuery(invoiceId)); if (invoice.StillProcessing) throw new InvalidOperationException("Invoice is still processing"); var order = await _mediator.Send(new GetOrderQuery(orderId)); foreach (var product in order.Products) { await _mediator.Send(new MarkProductAsSoldCommand(product)); } } }

When deployed, this worked fine. The invoice system got more breathing room, and the purchases were processed correctly. Because we are using an activity that is triggered by the runtime and not some dangling thread waiting on a Task.Delay, the solution was already quite resilient towards restarts and new deployments. But as we triggered a retry by throwing in an exception, our logs were getting flooded with error logs that were not really errors. As you can probably judge by the code and by the ad-hoc approach we took in this case, this solution was sub-par and could become a maintenance drag.

Back to the drawing board

At Triple we like a challenge. However, we do not like maintenance challenges caused by badly designed code deployed in a rush. So, what are the challenges that such bad design can give us?

First, the loose activity is hard to track. The initial http call triggers an orchestration that triggers an activity. The client won’t see any result from that activity, as it happens asynchronously so if it fails, it fails invisibly.

Furthermore, the reuse of the purchase flow is starting to become difficult. What if the process hits another snag and requires another separate activity? Do we need to create ContinueWithPurchase next to StartPurchase and CompletePurchase? And will we get a ContinueWithPurchase2? Additionally, the unit testing situation is also getting more complicated. If we need to make another split, we need to rework quite some unit tests.

Lastly, I’ve always disliked it when the framework starts to dictate the shape of the solution too much. Durable Functions, which provide orchestration and activities, have the tendency to pull business logic to the actual functions. I see Azure Functions (not only activity trigger and orchestrators, but also http trigger and service bus triggers) as controllers where I might want to validate and map stuff, but I like to put my actual business logic in separate services, handlers and what not. Having a service that has dedicated methods for each of the activity triggers just feels like bad practice.

Orchestration with Durable Functions

Let’s zoom out and start with some basics. At the time I was still planning on solving this challenge with Durable Functions, but with some extras that would make our lives much easier.

Let’s start with the orchestrations and activities that Durable Functions provide us with. They are powerful, as you can write orchestrators that use complex control flow, while getting the durability of automatic replays. Consider the following orchestrator: [FunctionName("Example4")] public async Task OrchestrateAsync([OrchestrationTrigger] IDurableOrchestrationContext context) { try { var data = context.GetInput<Data>(); var productsProcessed = new List<string>(); foreach (var product in data.Products) { await context.CallActivityAsync("ProductActivity", product); productsProcessed.Add(product); } await context.CallActivityAsync("ProductsActivity", productsProcessed); } catch (Exception ex) { await context.CallActivityAsync("FailureActivity", ex); } finally { await context.CallActivityAsync("FinalizingActivity", context.GetInput<string>()); } } The first time this orchestrator is run, it gets into the foreach loop and activates the first activity. After activating it, the orchestrator quits and is restarted after a short while. The second time the orchestrator reaches the activity, it checks whether the activity has run already. If so, it uses the result from that activity and continues. If that activity is still running, the orchestrator quits and retries after a short while. The Azure Table Storage is used as a backing storage for saving the requests to and responses from each of the activities.

The orchestrator replays all the steps it has done in previous invocations and continues where it left off. The entire complex state is replayed and thus restored every invocation, making it very resilient towards failures. If an orchestration gets interrupted, it just replays everything the next invocation.

Activities are functions that triggered by the orchestrator. They only run when triggered, and work the same way as http triggers.

Orchestrators can retry activities where they see fit by just calling them again, or you can indicate how often an activity needs to be retried before considering them as failed. And when an activity fails, the exception from the activity is thrown in the orchestrator, making handling exceptions as part of the orchestrator feel natural.

The downside of Durable Functions is the fact that you need to invoke activities by calling functions and passing in strings. Critical business logic tends to get filled with activity invocations based on strings, with dynamic parameters. Unit testing those is a pain. The correlation between activities and orchestrators is somewhat hard to see as they are just name-based, and Visual Studio can’t really help with its refactor tools when you are updating the signature of an activity.

Orchestrators and activities are not the only things Durable Functions provide though. There is also a feature called Durable Entities, and they provide some interesting features. A Durable Entity is a class that exposes a certain interface, which can be invoked from an orchestrator. Consider this interface as example: public interface IBankAccount { Task AddAsync(decimal amount); Task RemoveAsync(decimal amount); } To talk to this entity, an orchestrator needs to create a proxy to this entity: [FunctionName("Example5")] public async Task OrchestrateAsync([OrchestrationTrigger]IDurableOrchestrationContext context) { var transaction = context.GetInput<Data>(); var account1 = new EntityId("BankAccount", transaction.From); var account2 = new EntityId("BankAccount", transaction.To); var account1Entity = context.CreateEntityProxy<IBankAccount>(account1); var account2Entity = context.CreateEntityProxy<IBankAccount>(account2); using (await context.LockAsync(account1, account2)) { await account1Entity.RemoveAsync(transaction.Amount); await account2Entity.AddAsync(transaction.Amount); } } This proxy emits some IL code that wires up some plumbing that gets the invocation arguments from the orchestrator to the entity trigger, which dispatches those to the correct entity. public class BankAccount : IBankAccount { public decimal CurrentAmount { get; set; } public Task AddAsync(decimal amount) { CurrentAmount += amount; return Task.CompletedTask; } public Task RemoveAsync(decimal amount) { CurrentAmount -= amount; return Task.CompletedTask; } [FunctionName("BankAccount")] public static Task DispatchAsync([EntityTrigger] IDurableEntityContext context) => context.DispatchAsync<BankAccount>(); } If the orchestrator calls Add on the proxy, the entity trigger gets activated. The entity trigger dispatches the request to the entity and calls its Add method. The Durable Entity abstraction hides many of the complexities of Durable Functions. The orchestrator just performs calls to an interface, and from the entity’s perspective, it just gets its Add method called from some context. The entity is not required to implement the interface, as everything is still string based. I consider it good practice to do it anyway, as its contract, implementation and use are synced up automatically.

Orchestrate steps as workflow

If we look back at the implementation of our StartPurchase and CompletePurchase, these methods contain multiple calls to the IMediator interface. Now, what would happen if we replace that interface with an IDurableMediator interface that is a Durable Entity which dispatches any triggers to the real IMediator? This way, we can implement the orchestrator as a series of mediator calls. We’ve dubbed those orchestrators as ‘workflows’ and made the durable mediator to be a transparent pass through to any request handler. public class PurchaseWorkflow { public async Task OrchestrateAsync(IDurableOrchestrationContext context) { var orderId = context.GetInput<string>(); var mediator = context.CreateEntityProxy<IDurableMediator>(new EntityId(nameof(DurableMediator), context.InstanceId)); var order = await mediator.SendAsync(new GetOrderQuery(orderId)); var createdInvoice = await mediator.SendAsync(new CreateInvoiceCommand(order.Id)); do { var invoice = await mediator.SendAsync(new GetInvoiceQuery(createdInvoice.Id)); if (!invoice.StillProcessing) break; await context.CreateTimer(context.CurrentUtcDateTime.AddSeconds(1), CancellationToken.None); } while (true); foreach (var product in order.Products) { await mediator.SendAsync(new MarkProductAsSoldCommand(product)); } } } There are a few very important details that are imposed by the Durable Functions library. Generics in activities and entities are forbidden, and activities can only have one parameter. The request parameters are singular by default, but as the commands gets serialised and de-serialised when being passed from orchestrator to entity, they needed to be serialised with their type-name included. This resulted in having the request and responses to be wrapped in models with the correct JsonProperty configuration, which looked like this: public class DurableMediator : IDurableMediator { private readonly IMediator _mediator; public DurableMediator(IMediator mediator) { _mediator = mediator; } public async Task SendObjectAsync(WorkflowRequest request) { await _mediator.Send(request.Request); } public async Task<WorkflowResponse> SendObjectWithResponseAsync(WorkflowRequestWithResponse request) { // the dynamic is needed for the dynamic dispatch of Send() var result = await _mediator.Send(request.Request); return new WorkflowResponse(result); } } [JsonObject(ItemTypeNameHandling = TypeNameHandling.All)] public record WorkflowRequest(IRequest<Unit> Request); [JsonObject(ItemTypeNameHandling = TypeNameHandling.All)] public record WorkflowRequestWithResponse(dynamic Request); [JsonObject(ItemTypeNameHandling = TypeNameHandling.All)] public record WorkflowResponse(object Response); public static class DurableMediatorExtensions { public static async Task<TResponse> SendAsync<TResponse>(this IDurableMediator mediator, IRequest<TResponse> request) { if (typeof(TResponse) == typeof(Unit)) { await mediator.SendObjectAsync(new WorkflowRequest((IRequest<Unit>)request)); return default!; } var response = await mediator.SendObjectWithResponseAsync(new WorkflowRequestWithResponse((IRequest<object>)request)); if (response == null) throw new InvalidOperationException("Received an empty response"); return (TResponse)response.Response; } } As the purchase workflow is now captured in a separate class that uses several interfaces and writing unit tests that validate and simulate the actual behaviour of the other systems, we were able to fully test some of the exceptional behaviour we encountered during working with the real systems. Note that the wait step in the initial implementation that forced us to split up methods and add more activity triggers, is now reduced to a small implementation detail in the workflow.

The only thing missing is having an orchestrator that runs the workflow. The service can be updated to request an invocation of the purchase workflow. public class WorkflowPurchaseService { private readonly IDurableClient _durableClient; public WorkflowPurchaseService(IDurableClient durableClient) { _durableClient = durableClient; } public async Task<string> PurchaseAsync(string orderId) => await _durableClient.StartNewAsync("PurchaseWorkflowOrchestrator", input: orderId); } To keep this example simple, I’ve omitted the details on how to prevent having to hardcode the orchestrator name there; we solved this in a similar fashion to how MediatR resolves handlers for each of the requests.

To reflect, this approach has given us some benefits that greatly increased the observability of these workflows, and allowed for some nice extras. The workflow and mediator requests are not affected by the structure Durable Functions usually impose. The mediator requests can easily be reused, while the workflows can be tested completely. It is easy to see which requests are used by which workflows.

Implementing retries or delays is free. It is easy to implement a 10-minute delay in a workflow and know that the workflow will resume after that time. Waiting for a specific state is easy, as you can continue the same API call and keep waiting when it is not there yet.

We also added some important extras to help with keeping track of running workflows. First, we added some trace identifiers to the workflow and activities. All the logs from workflows and activities have the same trace identifier, making it easy to correlate multiple logs to one workflow run. Next, the state of the orchestrations can be observed by other APIs. When a purchase workflow is running, we can display that the corresponding order is in the process of being purchased or block a second purchase request when the first one is still being processed.

In our monitoring we include the state of each of the workflows that have run and display a health degradation when workflows are failing to complete. As each exact contents of the request and responses to and from the activities are stored in the history storage, we can display detailed debug information for workflows that have failed.

Drawbacks and solutions

This approach has a few drawbacks that we need to address. First, these Durable Entities are not really meant to be used this way. They have persistent state, and each invocation towards these entities results in some state being stored in the table storage, while the durable mediator does not contain any state. On the other hand, they store the exact content of the request and responses to and from the durable mediator, making debugging after a failed workflow easier. If you implement this with activities, you will lose the input data.

Furthermore, some retry features are not available when dealing with entities. Next to CallActivity, you have CallActivityWithRetry that allows for automatic retries of activities that can potentially fail. Since you communicate with the (proxied) interface of an entity directly, the retry logic is something you must implement yourself. This is usually not a problem, unless you also want to use the rewind feature of Durable Functions. Although this feature is still in preview and has some issues with the automatic retries (https://github.com/Azure/durabletask/issues/811), it is good to explore solutions that do not depend on Durable Entities.

One way of solving this is using string-based activities again but hiding the triggering of those activities behind an interface. Instead of having IDurableMediator as the actual entity proxy, it can also be a simple class that invokes the correct activity when receiving a Send call. This removes the need for a Durable Entity and simplifies the entire setup the workflow system. I’ve been exploring this approach, including the possibility of rewinding failed workflows, and published it as a nuget package (https://github.com/ThomasBleijendaal/DurableMediator) which can be found here. You can find an example showing how to unit test a workflow also there.

Conclusion

The workflow approach has given us a solution for multiple problems we were having. First, it gave us a clear pattern for running slow or error prone processes that cannot be handled within the lifespan of a single http API call. As every developer on the team understands what a workflow entails, when estimating and refining tickets, the notion of ‘put that process in a workflow’ makes it clear how to build something.

The good observability of these workflows allows for it to be used for many different processes and we employ this pattern quite quickly. Our health probes automatically pick up any workflow that failed, making them very visible. We’ve developed a portal that allows us to diagnose an entire workflow run, including the request and responses from each activity.

As the implementation of the activities involves just simple mediator request handlers, the step up to (or down from) workflows is small, and it is easy to convert something into a workflow when something has been developed using request handlers.

As far as the original purchase workflow is concerned, it has evolved substantially. The creation of the invoice proved to be even more convoluted, which warranted the creation of a dedicated create-invoice workflow. This workflow is started by the purchase workflow as a sub-orchestration but is not awaited and runs longer than the purchase workflow. All the API calls to the external service are wrapped in retry logic that retries the call after suspending the workflow for a short minute. The purchase workflow itself is quite simple now and finishes rather swiftly, most of the complexities are diverted to the create invoice-workflow. Besides these two workflows, we have built workflows for purchase cancellation, other types of purchases and subscription rollovers. We also built unit tests that subject the workflows to very unreliable API mocks, testing their resolve towards failure at every step in the process.

All code examples are available as working examples in this repository. All in all, it was an interesting exercise that taught us a lot. We look forward to applying our experience to even more complicated projects!

More Triple stories

Servermanagement through Saltstack

Rogier van der Heide

Tropical Tuesdays keep the creative vibe alive at Triple

Triple culture

WWDC 2022: supporting developers' superpowers

Jeroen Bakker