Felipe Correa
Felipe Correa

Reputation: 693

Input serialization in Durable Function with Orchestrator

Using .NET 8 and azure functions in isolated mode, I have an EventGridTrigger function that starts up an orchestrator while providing a typed input as parameter. The parameter's type is part of the following class hierarchy:

[JsonDerivedType(typeof(IntegrationEventBase), typeDiscriminator: "IntegrationEventBase")]
[JsonDerivedType(typeof(WorkItemIntegrationEventBase), typeDiscriminator: "WorkItemIntegrationEventBase")]
public class IntegrationEventBase
{
    public string? Traceparent { get; set; }
}

[JsonDerivedType(typeof(WorkItemCreatedIntegrationEvent), typeDiscriminator: "WorkItemCreatedIntegrationEvent")]
[JsonDerivedType(typeof(WorkItemProgressedIntegrationEvent), typeDiscriminator: "WorkItemProgressedIntegrationEvent")]
public class WorkItemIntegrationEventBase : IntegrationEventBase
{
    public string? WorkItemId { get; set; }

    public ushort? Quantity { get; set; }
}

public class WorkItemCreatedIntegrationEvent : WorkItemIntegrationEventBase
{
}

public class WorkItemProgressedIntegrationEvent : WorkItemIntegrationEventBase
{
    public WorkItemStatus Status { get; set; }
}

The EventGridTrigger function

This is simply deserializing the event received from Event Grid and passing it as the input of the orchestrator

[Function("WorkItemProgressedIntegrationEventHandler")]
public async Task WorkItemProgressedIntegrationEventHandler([EventGridTrigger] EventGridEvent eventGridEvent,
    [DurableClient] DurableTaskClient client,
    FunctionContext executionContex)
{
    var @event = _serializer.Deserialize<WorkItemProgressedIntegrationEvent>(eventGridEvent.Data.ToString());

    await client.ScheduleNewOrchestrationInstanceAsync(
        Constants.WORKFLOW_ORCHESTRATOR, @event);
}

The Orchestrator

The orchestrator receives a WorkItemIntegrationEventBase and then it will call the entity in one of two different ways based on the actual inner type of the event (polymorphism).

[Function("Orchestrator")]
public static async Task RunOrchestrator(
    [OrchestrationTrigger] TaskOrchestrationContext context, WorkItemIntegrationEventBase @event)
{
    var entityId = BuildEntityIds(@event);

    if (@event is WorkItemCreatedIntegrationEvent createdEvent)
        await context.Entities.SignalEntityAsync(workflowEntityId, nameof(IDurableWorkflow.EnqueueWork),
            WorkItemInput.FromWorkItemIntegrationEvent(createdEvent));
    else if (@event is WorkItemProgressedIntegrationEvent progressedEvent)
        await context.Entities.SignalEntityAsync(workflowEntityId, nameof(IDurableWorkflow.ProgressWork),
            WorkItemInput.FromWorkItemIntegrationEvent(progressedEvent));
}

Issue!: I can't get the @event in the orchestrator to deserialize using the type information of the type that was used in the event trigger meaning that both of the checks above are always false.

I have tried almost everything stated here but no luck, I must be missing something still.

Upvotes: 0

Views: 315

Answers (1)

Vivek Vaibhav Shandilya
Vivek Vaibhav Shandilya

Reputation: 2646

This worked for me.

I used a simple data and I added logger to print the values of if else.

If you are passing the event value to orchestration you need to use in this syntax

input:@event
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(Function1),input:@event);

My code:

Function1.cs:

using Azure.Messaging;
using Azure.Messaging.EventGrid;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace FunctionApp3
{
    public static class Function1
    {

        [Function(nameof(Function1))]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] TaskOrchestrationContext context, WorkItemIntegrationEventBase @event)
        {
            var entity = @event;

            ILogger logger = context.CreateReplaySafeLogger(nameof(Function1));
            logger.LogInformation($"Saying hello. \nValues are: \n \t{entity.WorkItemId} \n \t{entity.Quantity}");

            if (@event is WorkItemCreatedIntegrationEvent cloudEvent)
            {
                logger.LogInformation("it is a WorkItemCreatedIntegrationEvent");
            }
            else
            {
                logger.LogInformation("it is a WorkItemProgressedIntegrationEvent");
            }

        [Function("EventGrid")]
        public static async Task Run([EventGridTrigger] EventGridEvent cloudEvent,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            ILogger logger = executionContext.GetLogger("Function1_HttpStart");

            var @event = JsonSerializer.Deserialize<WorkItemProgressedIntegrationEvent>(cloudEvent.Data.ToString());

            
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(Function1),input:@event);

            logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
        }
    }

    [JsonDerivedType(typeof(IntegrationEventBase), typeDiscriminator: "IntegrationEventBase")]
    [JsonDerivedType(typeof(WorkItemIntegrationEventBase), typeDiscriminator: "WorkItemIntegrationEventBase")]
    public class IntegrationEventBase
    {
        public string? Traceparent { get; set; }
    }

    [JsonDerivedType(typeof(WorkItemCreatedIntegrationEvent), typeDiscriminator: "WorkItemCreatedIntegrationEvent")]
    [JsonDerivedType(typeof(WorkItemProgressedIntegrationEvent), typeDiscriminator: "WorkItemProgressedIntegrationEvent")]
    public class WorkItemIntegrationEventBase : IntegrationEventBase
    {
        public string? WorkItemId { get; set; }

        public string? Quantity { get; set; }
    }

    public class WorkItemCreatedIntegrationEvent : WorkItemIntegrationEventBase
    {
    }

    public class WorkItemProgressedIntegrationEvent : WorkItemIntegrationEventBase
    {
        public WorkItemStatus Status { get; set; }
    }

    public enum WorkItemStatus
    {
        InProgress,
        Completed
    }
}

Program.cs:

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

var host = new HostBuilder()
    .ConfigureFunctionsWebApplication()
    .ConfigureServices(services =>
    {
        services.AddApplicationInsightsTelemetryWorkerService();
        services.ConfigureFunctionsApplicationInsights();
    })
    .ConfigureLogging(logging =>
    {
        logging.Services.Configure<LoggerFilterOptions>(options =>
        {
            LoggerFilterRule defaultRule = options.Rules.FirstOrDefault(rule => rule.ProviderName
                == "Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider");
            if (defaultRule is not null)
            {
                options.Rules.Remove(defaultRule);
            }
        });
    })
    .Build();

host.Run();

INPUT:

EventGrid Schema sample

[
    {
        "id":"test-id",
        "data":{"WorkItemId": "value1", "Quantity": "value2"},
        "subject":"test-subject",
        "eventType":"test-event-1",
        "eventTime":"2024-11-3",
        "dataVersion":"1.0"
    }
]

OUTPUT:

EDIT:

It is not triggering none is because the type of @event not belongs to any of them.

I tried getting type of @event using @event.GetType(). The value is Type of @event is: FunctionApp3.WorkItemIntegrationEventBase

using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using System.Text.Json.Serialization;

namespace FunctionApp3
{
    public static class Function1

    {

        [Function(nameof(Function1))]
        public static async Task RunOrchestrator(
            [OrchestrationTrigger] TaskOrchestrationContext context, WorkItemIntegrationEventBase @event)
        {
            var entity = @event;

            ILogger logger = context.CreateReplaySafeLogger(nameof(Function1));
            logger.LogInformation($"Saying hello. \nValues are: \n \t{entity.WorkItemId} \n \t{entity.Quantity}");

            logger.LogInformation($"Type of @event is: {@event.GetType()}");

            if (@event is WorkItemCreatedIntegrationEvent)
            {
                logger.LogInformation("it is a WorkItemCreatedIntegrationEvent");
            }
            else if(@event is WorkItemProgressedIntegrationEvent)
            {
                logger.LogInformation("it is a WorkItemProgressedIntegrationEvent");

            }
            else
            {
                logger.LogInformation("it was Neither");
            }

        }


        [Function("EventGrid")]
        public static async Task Run([EventGridTrigger] Azure.Messaging.EventGrid.EventGridEvent cloudEvent,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            ILogger logger = executionContext.GetLogger("EventGrid");

            var @event = JsonSerializer.Deserialize<WorkItemIntegrationEventBase>(cloudEvent.Data.ToString());


            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(Function1), input: @event);

            logger.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
        }
    }

        [JsonDerivedType(typeof(IntegrationEventBase), typeDiscriminator: "IntegrationEventBase")]
        [JsonDerivedType(typeof(WorkItemIntegrationEventBase), typeDiscriminator: "WorkItemIntegrationEventBase")]
        public class IntegrationEventBase
        {
            public string? Traceparent { get; set; }
        }

        [JsonDerivedType(typeof(WorkItemCreatedIntegrationEvent), typeDiscriminator: "WorkItemCreatedIntegrationEvent")]
        [JsonDerivedType(typeof(WorkItemProgressedIntegrationEvent), typeDiscriminator: "WorkItemProgressedIntegrationEvent")]
        public class WorkItemIntegrationEventBase : IntegrationEventBase
        {
            [JsonPropertyName("Type")]
            public string? Type { get; set; }
            public string? WorkItemId { get; set; }

            public string? Quantity { get; set; }
        }

        public class WorkItemCreatedIntegrationEvent : WorkItemIntegrationEventBase
        {
        }

        public class WorkItemProgressedIntegrationEvent : WorkItemIntegrationEventBase
        {
            public WorkItemStatus Status { get; set; }
        }

        public enum WorkItemStatus
        {
            InProgress,
            Completed
        }
    }

Upvotes: 0

Related Questions