Reputation: 530
I am making a C# Isolated worker process Azure Function triggered by an EventHub that will output a list of EventData
with headers into an other EventHub.
The Function works well when using the string[]
binding but when I am using the binding EventData
from Azure.Messaging.EventHubs
the function throw the following messages in the console:
System.Private.CoreLib: Exception while executing function: Functions.MyFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'eventDatas'. Azure.Core.Amqp: Serialization failed due to an unsupported type, System.Byte[].
Executed 'Functions.MyFunction' (Failed, Id=4a2fcfa1-0042-4cb8-92d6-75289685b4dd, Duration=15ms)
System.Private.CoreLib: Exception while executing function: Functions.MyFunction. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'eventDatas'. Azure.Core.Amqp: Serialization failed due to an unsupported type, System.Byte[].
<TargetFramework>net7.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
...
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.9.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.19.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.EventHubs" Version="5.5.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.14.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
using Azure.Messaging.EventHubs;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using my.Models;
using Newtonsoft.Json;
namespace my.function
{
public class MyFunction
{
[EventHubOutput("EventHubOutput", Connection = "EventHubOutputConnectionString")]
[FixedDelayRetry(5, "00:00:10")]
[Function("MyFunction")]
public EventData[] Run([EventHubTrigger("EventHubInput", Connection = "EventHubInputConnectionString", IsBatched = true)] EventData[] eventDatas)
{
List<EventData> eventDataOutputs = new List<EventData> { };
// Receive events
foreach (EventData eventData in eventDatas)
{
// Serialize the event
string eventJsonBody = eventData.EventBody.ToString();
MyObject? myObject = JsonConvert.DeserializeObject<MyObject>(eventJsonBody);
// Append nested elements
if (myObject != null)
{
eventDataOutputs.AddRange(myObject.nestedElements.Select(nestedElement => new EventData(JsonConvert.SerializeObject(nestedElement))));
}
}
return eventDataOutputs.ToArray();
}
}
}
namespace my.Models
{
using Newtonsoft.Json;
public class MyObject
{
[JsonProperty("randomField")]
public string RandomField { get; set; }
[JsonProperty("nestedFields")]
public NestedField[] NestedFields { get; set; }
}
}
namespace my.Models
{
using Newtonsoft.Json;
public class NestedField
{
[JsonProperty("randomFieldNested")]
public string RandomField { get; set; }
[JsonProperty("time")]
public DateTimeOffset Time { get; set; }
[JsonProperty("longField")]
public long LongField { get; set; }
}
}
I tried on a newer EventHubs and I received events.
Upvotes: 2
Views: 626
Reputation: 6487
I have made few modification in your code and it worked for me
Code:
[Function(nameof(Function1))]
[FixedDelayRetry(5, "00:00:10")]
[EventHubOutput("eventHubOutput", Connection = "EventHubConnectionAppSetting")]
public EventData[] Run([EventHubTrigger("eventhubinput", Connection = "EventHubConnectionAppSetting", IsBatched = true)] EventData[] events)
{
_logger.LogInformation($"Received {events.Length} events");
List<EventData> eventDataOutputs = new List<EventData>();
foreach (EventData eventData in events)
{
string eventJsonBody = Encoding.UTF8.GetString(eventData.EventBody);
MyObject[]? myObjects = JsonConvert.DeserializeObject<MyObject[]>(eventJsonBody);
if (myObjects != null)
{
foreach (MyObject myObject in myObjects)
{
if (myObject.NestedFields != null)
{
foreach (NestedField nestedField in myObject.NestedFields)
{
// Extract field values from the NestedField object
string randomFieldNested = nestedField.RandomField;
DateTimeOffset time = nestedField.Time;
long longField = nestedField.LongField;
// Create a new EventData instance with the extracted field values
string eventDataJson = JsonConvert.SerializeObject(new
{
RandomFieldNested = randomFieldNested,
Time = time,
LongField = longField
});
EventData newEventData = new EventData(Encoding.UTF8.GetBytes(eventDataJson));
// Add the new EventData to the output list
eventDataOutputs.Add(newEventData);
}
}
}
}
}
_logger.LogInformation("Data Processed.....");
return eventDataOutputs.ToArray();
}
Output:
Upvotes: 1