Long-Running Workflows as Serverless Functions in Azure

Persistence
15
Dec

Long-Running Workflows as Serverless Functions in Azure

Azure Functions have many features that make your work easier. However, they are less suitable for long-running processes. This is where Durable and Entity Functions can help.

Serverless Functions [1] are in my opinion a great extension of Microsoft Azure, which is becoming more and more popular, and not without reason. The reasons: You don’t have to worry about choosing the right number and size of servers or configuring the autoscaling. And you certainly don’t have to worry about keeping virtual machines up-to-date. APIs in the cloud come through serverless technology like the proverbial power from the socket. There, too, there is a tremendous engineering effort behind it to always provide the right amount of power at the right time. It’s the same with Serverless Functions. You package your code, hand it over to Microsoft and let it be their problem to provide the necessary infrastructure for the load at hand. According to the so-called Consumption Plan, users pay for what they actually use, and the costs even drop to zero if no one is using the cloud software [2].

The second special feature of Azure Functions is its programming model: it is event-driven. Events can be the usual HTTP requests if the API to be developed is a Web API. But there is also a large number of other events to which you can react [3]. Here are some examples:

  • A file is uploaded to the blob storage.
  • A data change is done in Cosmos DB.
  • A message comes from an IoT device.
  • A message comes from another micro service via the service bus.
  • A timer informs that a set time has been reached.

The concept of Azure Functions therefore fits perfectly if you want to build software in the form of loosely coupled microservices.

Why Durable Functions?

The classic Azure Functions have two characteristics that must be taken into account in the design. First, they have to do their job in a relatively short time. The default timeout is five minutes (functions with HTTP triggers even have to respond in less than four minutes), but it can be increased up to ten minutes if required [4]. Second, Serverless Azure Functions are stateless. The developer has to take care of storing state himself, for example in other Azure PaaS or serverless services like Azure SQL Database or Cosmos DB.

Due to these two limitations, Azure Functions are not well suited for long-running processes. Imagine your Serverless Function is supposed to communicate with a user via a slack bot during execution. It is not predictable how fast the user will react. It can take minutes or even hours. A function would most likely run into a timeout.

In such situations, Durable Functions and Entity Functions help. They are designed to run for a long time and take care of the state management itself. We will now focus on these variants of Azure Functions and assume that you as a reader have basic knowledge of the classic Azure Functions. If you lack this experience, I recommend that you work through a corresponding tutorial, as you can find it for example under [5].

Programming with Durable Functions

With Durable Functions, you implement long-running workflows. In contrast to other workflow tools, however, no declarative language (e.g. domain-specific language (DSL), XML, JSON) is used for this purpose, but a rather normal C# (Azure Functions also supports other programming languages, but here we limit ourselves to C#). From the code structure, the flow of the workflow can be clearly seen. The individual activities of the workflow, which could possibly take longer, are hidden behind await calls.

However, this alone does not yet make it possible to program long-running workflows in C#. Azure Functions are serverless. The server landscape on which your C# code runs can therefore change constantly. Servers are added or the server on which a workflow instance is currently running is dropped. How do Durable Functions handle this? The answer to this question seems absurd at first glance: Durable Functions are always executed from the beginning.

For this reason, the individual activities within the workflow must be deterministic. This means that they must deliver the same result every time a workflow instance is run with the same input parameters. In accordance with the event sourcing pattern, the Durable Functions Runtime automatically stores each action of a workflow instance and its result in Azure Storage. If the function of a workflow instance starts from the beginning later, the system checks whether the action has already been executed in an earlier run of the instance before calling the respective action. If so, the action is not executed again, but its previously determined result is read and returned. Thus, it does no harm to always start over again from the beginning. Actions that have already been executed are virtually skipped.

Many C# functions are not inherently deterministic. Think of the current time, the generation of a new GUID, external web APIs, random numbers, etc. Such APIs must not be used in Durable Functions. Your API offers alternatives with similar functionality that are compatible with the Durable Functions Runtime [6].

Relaxed waiting

Listing 1 contains an example of a Durable Function implementing the Human Interaction Application Pattern [7]. Figure 1 shows the sequence as a sequence diagram.



Fig. 1: Sequence diagram for Listing 1

In our scenario, a traffic surveillance camera sends speed violations to a normal Azure Function (SpeedViolationRecognition) via an HTTP web API. This function checks the accuracy with which the vehicle license plate is recognized. If this is not high enough, we need the help of a person who looks at the captured image and checks the recognized license plate. The interaction with this person could be done via a messaging system like Slack (only hinted at in the example code of this article). Since it is not foreseeable how quickly the person will react, the interaction logic is in a Durable Function (ManuallyApproveRecognition). It sends the request to Slack after a manual license plate check and waits for Slack to return the response via a Web API function (ProcessSlackApproval). Via an event (ReceiveApprovalResponseEvent) the Durable Function is informed about the arrival of the response and the speed violation can be processed (StoreSpeedViolation).

When looking through the code, pay particular attention to the Orchestration ID, which is used in various places. It clearly identifies the workflow instance. It is used to filter in the Event Sourcing tables in Azure Storage. Figure 2 shows the relationship between the HTTP Web API, the Orchestration ID and the tables in Azure Storage.



Fig. 2: Event source table in Azure Storage
using System.Net;
using System.Net.Http;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;

namespace DurableFunctions
{
  # region Data Transfer Objects
  /// <summary>
  /// Represents a speed violation recognized by a traffic camera
  /// </summary>
  public class SpeedViolation
  {
    /// <summary>
    /// ID of the camera that has recognized the vehicle
    /// </summary>
    public int CameraID { get; set; }

    /// <summary>
    /// License plate number as recognized by the camera
    /// </summary>
    public string LicensePlateNumber { get; set; }
 
    /// <summary>
    /// Accuracy of license plate recognition (value between 0 and 1)
    /// </summary>
    public double RecognitionAccuracy { get; set; }

    /// <summary>
    /// Measured speed of the vehicle
    /// </summary>
    public decimal SpeedKmh { get; set; }
  }

  /// <summary>
  /// Represents a request for manual approval of license plate read
  /// </summary>
  public class ApprovalRequest
  {
    /// <summary>
    /// ID or the long-running orchestration handling the approval process
    /// </summary>
    public string OrchestrationInstanceID { get; set; }
 
 
    /// <summary>
    /// Data about the speed violation to approve
    /// </summary>
    public SpeedViolation SpeedViolation { get; set; }
  }

  /// <summary>
  /// Represents a response of a user concerning a license plate read
  /// </summary>
  public class ApprovalResponse
  {
    /// <summary>
    /// ID or the long-running orchestration handling the approval process
    /// </summary>
    public string OrchestrationInstanceID { get; set; }
 
    /// <summary>
    /// True if license plate read has been confirmed, otherwise false
    /// </summary>
    public bool Approved { get; set; }
  }
  # endregion
 
  public class TrafficSpeedViolation
  {
    /// <summary>
    /// Web API handling incoming speed violations recognized by traffic cameras
    /// </summary>
    /// <returns>
    /// OK if license plate read accuracy was ok, otherwise tracking data for
    /// long-running orchestration handling manual approval of license plate read.
    /// </returns>
    [FunctionName(nameof(SpeedViolationRecognition))]
    public async Task<HttpResponseMessage> SpeedViolationRecognition(
      [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
      [DurableClient] IDurableOrchestrationClient starter,
      ILogger log)
    {
      // Get speed violation data from HTTP body
      var sv = JsonSerializer.Deserialize<SpeedViolation>(await req.Content.ReadAsStringAsync());
 
      // Check if read accuracy was not good enough
      if (sv.RecognitionAccuracy < 0.75d)
      {
        log.LogInformation($"Recognition not accurate enough, starting orchestration to ask human for help");
 
        // Start durable function for manual approval process
        string instanceId = await starter.StartNewAsync(nameof(ManuallyApproveRecognition), sv);
 
        // Return status object with instance ID and URLs for status monitoring
        return starter.CreateCheckStatusResponse(req, instanceId);
      }
 
      // Read accuracy was ok, than store it (assumption: storing speed
      // violation is pretty fast, i. e. a matter of seconds).
      await StoreSpeedViolation(sv, log);
      return new HttpResponseMessage(HttpStatusCode.OK);
    }
 
    private const string ReceiveApprovalResponseEvent = "ReceiveApprovalResponse";
 
    /// <summary>
    /// Web API receiving responses from Slack API
    /// </summary>
    /// <returns>
    /// OK if approval was ok, BadRequest if approval is unknown or no longer running
    /// </returns>
    [FunctionName(nameof(ProcessSlackApproval))]
    public async Task<HttpResponseMessage> ProcessSlackApproval(
      [HttpTrigger(AuthorizationLevel.Anonymous, "post")]HttpRequestMessage req,
      [DurableClient] IDurableOrchestrationClient orchestrationClient,
      ILogger log)
    {
      // Get approval response from HTTP body
      var slackResponse = JsonSerializer.Deserialize<ApprovalResponse>(await req.Content.ReadAsStringAsync());
 
      // Get status based on orchestration Id
      var status = await orchestrationClient.GetStatusAsync(slackResponse.OrchestrationInstanceID);
      if (status.RuntimeStatus == OrchestrationRuntimeStatus.Running || status.RuntimeStatus == OrchestrationRuntimeStatus.Pending)
      {
        log.LogInformation("Received Slack response in time, raising event");
 
                // Raise an event for the given orchestration
                await orchestrationClient.RaiseEventAsync(slackResponse.OrchestrationInstanceID,
                    ReceiveApprovalResponseEvent, slackResponse.Approved);
                return new HttpResponseMessage(HttpStatusCode.OK);
            }
 
            return new HttpResponseMessage(HttpStatusCode.BadRequest);
        }
 
        /// <summary>
        /// Durable function handling long-running approval process
        /// </summary>
        [FunctionName(nameof(ManuallyApproveRecognition))]
        public async Task<bool> ManuallyApproveRecognition([OrchestrationTrigger] DurableOrchestrationContext context)
        {
          // Get speed violation data from orchestration context
          var sv = context.GetInput<SpeedViolation>();
 
          // Call activity that sends approval request to Slack. Note that this
          // activity will not await the human's response. It will only wait until
          // message will have been sent to Slack.
          await context.CallActivityAsync(nameof(SendApprovalRequestViaSlack), new ApprovalRequest
          {
            OrchestrationInstanceID = context.InstanceId,
            SpeedViolation = sv
          });
 
          // We want the human operator to respond within 60 minutes. We setup a
          // timer for that. Note that this is NOT a regular .NET timer. It is a
          // special timer from the Durable Functions runtime!
          using var timeoutCts = new CancellationTokenSource();
          var expiration = context.CurrentUtcDateTime.AddMinutes(60);
          var timeoutTask = context.CreateTimer(expiration, timeoutCts.Token);
 
          // Wait for the event that will be raised once we have received the           // response from Slack.
          var approvalResponse = context.WaitForExternalEvent<bool>(ReceiveApprovalResponseEvent);
 
          // Wait for Slack response or timer, whichever comes first
          var winner = await Task.WhenAny(approvalResponse, timeoutTask);
 
          // Was the Slack task the first task to complete?
          if (winner == approvalResponse && approvalResponse.Result)
          {
            // License plate read approved -> Store speed violation
            await context.CallActivityAsync(nameof(StoreSpeedViolation), sv);
          }
 
          if (!timeoutTask.IsCompleted)
          {
            // All pending timers must be completed or cancelled before the             // function exits.
            timeoutCts.Cancel();
          }
 
          return winner == approvalResponse && approvalResponse.Result;
    }
 
    [FunctionName(nameof(SendApprovalRequestViaSlack))]
    public Task SendApprovalRequestViaSlack([ActivityTrigger] ApprovalRequest req, ILogger log)
    {
      log.LogInformation($"Message regarding {req.SpeedViolation.LicensePlateNumber} sent to Slack " +
        $"(instance ID {req.OrchestrationInstanceID}!");
 
      // Todo: Send data about speed violation to Slack via Slack REST API.
      // Not implemented here, just a demo.
 
      return Task.CompletedTask;
    }
 
    [FunctionName(nameof(StoreSpeedViolation))]
    public Task StoreSpeedViolation([ActivityTrigger] SpeedViolation sv, ILogger log)
    {
      log.LogInformation($"Processing speed violation from camera {sv.CameraID}" +
        $"for LP {sv.LicensePlateNumber} ({sv.SpeedKmh} km/h)");
 
      // Todo: Add code for processing speed violation
      // Not implemented here, just a demo.
 
      return Task.CompletedTask;
    }
  }
}

Entity Functions

Just a few months ago, Azure Durable Functions was extended by a new type of function: Entity Functions. While the state of Durable Functions (aka Orchestrator Functions) is implicitly determined by the control flow and the local variables in C# code, Entity Functions explicitly store it.

The principle can be explained most easily with an example. Listing 2 contains a Durable Entity, which represents a procedure for speed violation (SpeedViolationLawsuit). It uses the class-based syntax. Alternatively, a function-based syntax would be offered, whereby Microsoft recommends the variant with classes in C# [8]. You do not have to write code to persist instances of this class. The Durable Functions Runtime stores the state of the instances as JSON blobs in Azure Storage. The appearance of the JSON can be influenced with the usual C# attributes for JSON serialization.

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System;
using System.Threading.Tasks;
 
namespace DurableFunctions
{
  /// <summary>
  /// Represents a speed violation lawsuit
  /// </summary>
  public class SpeedViolationLawsuit
  {
    public SpeedViolation SpeedViolation { get; set; }
 
    public string Driver { get; set; }
 
    public decimal? Fine { get; set; }
 
    public bool Paid { get; set; }
 
    public void SetSpeedViolation(SpeedViolation sv) => SpeedViolation = sv;
 
    public void StoreDriver(string driver) => Driver = driver;
 
    public async Task SetFine(decimal fine)
    {
      if (string.IsNullOrEmpty(Driver))
      {
        throw new InvalidOperationException();
      }
 
      // Simulate an async operation (e. g. for I/O)
      await Task.Delay(1);
 
      Fine = fine;
    }
 
    public void MarkAsPaid()
    {
      if (!Fine.HasValue)
      {
        throw new InvalidOperationException();
      }
 
      Paid = true;
    }
 
    public void Delete()
    {
      // Note how we access the current entity
      Entity.Current.DeleteState();
    }
 
    [FunctionName(nameof(SpeedViolationLawsuit))]
    public static Task Run([EntityTrigger] IDurableEntityContext ctx)
    {
      // When creating a new entity, make sure it is marked as not paid
      if (!ctx.HasState)
      {
        ctx.SetState(new SpeedViolationLawsuit
        {
          Paid = false
        });
      }
 
      return ctx.DispatchAsync<SpeedViolationLawsuit>();
    }
  }
}

Listing 3 shows how to deal with the Durable Entities from within normal and Durable Functions. In ManuallyApproveRecognition you can see how a new entity is created. In our scenario this would be the case, for example, if the user has confirmed the license plate recognition in the Durable Function. GetLawsuite shows how to read the state of the Durable Entity using its ID (in our case a GUID). Finally, SetDriver is a simplified example for calling a method that changes the state of an entity. I recommend everyone experimenting with Durable Entities to have a look into Azure Storage, as shown in figure 2, to get a deeper understanding of how Azure Functions actually store the state.

public async Task<bool> ManuallyApproveRecognition(
  [OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
  // ...
 
  // Create a new instance of a speed violation lawsuit
  var entityId = Guid.NewGuid();
  var lawsuitId = new EntityId(nameof(SpeedViolationLawsuit), entityId.ToString());
 
  // Store data of speed violation in new entity
  await context.CallEntityAsync(lawsuitId, nameof(SpeedViolationLawsuit.SetSpeedViolation), sv);
 
  // ...
}
 
[FunctionName(nameof(GetLawsuite))]
public async Task<IActionResult> GetLawsuite(
  [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "lawsuite/{entityKey}")] HttpRequestMessage req,
  string entityKey,
  [DurableClient] IDurableEntityClient client,
  ILogger log)
{
  // Get state of speed violation lawsuit using its Id (GUID)
  var svl = await client.ReadEntityStateAsync<SpeedViolationLawsuit>(
    new EntityId(nameof(SpeedViolationLawsuit), entityKey));
 
  // Return current state of lawsuit
  return new OkObjectResult(svl);
}
 
[FunctionName(nameof(SetDriver))]
public async Task<IActionResult> SetDriver(
  [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "lawsuite/{entityKey}/setDriver")] HttpRequestMessage req,
  string entityKey,
  [DurableClient] IDurableEntityClient client,
  ILogger log)
{
  var driverName = JsonSerializer.Deserialize<string>(await req.Content.ReadAsStringAsync());
 
  // Send one-way message to lawsuit entity (not processing result)
  await client.SignalEntityAsync(
    new EntityId(nameof(SpeedViolationLawsuit), entityKey),nameof(SpeedViolationLawsuit.StoreDriver), 
    driverName);
 
  return new StatusCodeResult((int)HttpStatusCode.Accepted);
}

Conclusion

Durable Functions have significantly expanded the scope of Azure Functions. The previous restrictions on execution times no longer apply. The task-based programming model and async/await are well suited for mapping workflows. The C# code is easy to read and reflects the content of the workflows well. Even complex use cases such as parallel execution of several sub-processes (aka fan out/fan in) or the integration of people in long-running workflows are no problem.

Until a few months ago, however, it was difficult to implement the aggregator pattern with Durable Functions. Data concerning a logical, addressable entity comes from different sources over a long period of time and has to be collected. The new Durable Entities are suitable for exactly such cases.

In addition to the functional enhancements of Azure Functions, Microsoft does not neglect the technical development: The current Azure Functions Runtime already allows the use of .NET Core 3.1. The operation of functions in Kubernetes is also supported [9].

It can be seen that Microsoft is pushing the development of Azure Functions at a fast pace. In my opinion, the platform is an interesting option for all those who rely on loosely coupled microservices and event-driven cloud solutions.


Links & Literature

[1] https://azure.microsoft.com/en-us/services/functions/

[2] https://azure.microsoft.com/en-us/pricing/details/functions/

[3] https://docs.microsoft.com/en-us/azure/azure-functions/functions-triggers-bindings#supported-bindings

[4] https://docs.microsoft.com/en-us/azure/azure-functions/functions-scale#timeout

[5] https://docs.microsoft.com/en-us/azure/azure-functions/functions-create-your-first-function-visual-studio

[6] https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-code-constraints#using-deterministic-apis

[7] https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-overview?tabs=csharp#human

[8] https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-entities?tabs=csharp#define-entities

[9] https://docs.microsoft.com/en-us/azure/azure-functions/functions-kubernetes-keda

Stay tuned!
Learn more about Serverless
Architecture Conference 2020

Behind the Tracks

Software Architecture & Design
Software innovation & more
Microservices
Architecture structure & more
Agile & Communication
Methodologies & more
Emerging Technologies
Everything about the latest technologies
DevOps & Continuous Delivery
Delivery Pipelines, Testing & more
Cloud & Modern Infrastructure
Everything about new tools and platforms
Big Data & Machine Learning
Saving, processing & more