Reputation: 837
I'm trying to configure an Azure Stream Analytics job, but consistently getting bad performance. I received data from a client system that pushes data into an Event Hub. And the ASA queries that into an Azure SQL database.
A few days ago I noticed that it was generating large amount of InputEventLateBeyondThreshold errors. Here an example out of the ASA. The Timestamp element is set by the client system.
{
"Tag": "MANG_POWER_5",
"Value": 1.08411181,
"ValueType": "Analogue",
"Timestamp": "2022-02-01T09:00:00.0000000Z",
"EventProcessedUtcTime": "2022-02-01T09:36:05.1482308Z",
"PartitionId": 0,
"EventEnqueuedUtcTime": "2022-02-01T09:00:00.8980000Z"
}
You can see that the event arrives pretty quickly, but takes more than 30 mins to process it. To try and avoid InputEventLateBeyondThreshold errors, I have increased the late event threshold. This may be contributing to the increased processing time, but having it too low also increases number of InputEventLateBeyondThreshold errors.
The Watermark Delay is consistently high, and yet SU usage is around 5%. I have increased the SU to as high as I can for this query.
I'm trying to figure out, why it takes so long to process the events once they have arrived.
This is the query I'm using:
WITH PIDataSet AS (SELECT * FROM [<event-hub>] TIMESTAMP BY timestamp)
--Write data to SQL joining with a lookup
SELECT
i.Timestamp as timestamp,
i.Value as value,
INTO [<sql-database>]
FROM PIDataSet as i
INNER JOIN [tagmapping-ref-alias] tm ON tm.sourcename = i.Tag
----Write data to AzureTable joining with a lookup
SELECT
DATEDIFF(second,CAST('1970-01-01' as DateTime), I1.Timestamp) As Rowkey,
I2.TagId as PartitionKey,
I1.Value as Value,
UDF.formatTime(I1.Timestamp) as DeviceTimeStamp
into [<azure-table>]
FROM PIDataSet as I1
JOIN [tagmapping-ref-alias] as I2 on I2.Sourcename = I1.Tag
--Get an hourly count into a SQL Table.
SELECT
I2.TagId,
System.Timestamp() as WindowEndTime, COUNT(I2.TagId) AS InputCount
into [tagmeta-ref-alias]
FROM PIDataSet as I1
JOIN [tagmapping-ref-alias] as I2 on I2.Sourcename = I1.Tag
GROUP BY I2.TagId, TumblingWindow(Duration(hour, 1))
Upvotes: 1
Views: 952
Reputation: 842
When you set up a 59 minutes out-of-order window, what you do is that you set up a 59 minutes buffer for that input. When records land in that buffer, they will wait 59 minutes until they get out. What you get in exchange, is that we have the opportunity to re-order these events so they will look in order to the job.
Using it at 1h is an extreme setting that will automatically give you 59minute of watermark, by definition. This is very surprising and I'm wondering why you need a value so high.
Edit
Now looking at the late arrival policy.
You are using an event time (TIMESTAMP BY timestamp
) which means that your events can now be late, see this doc and this one.
What this means is that when a record comes later than 1h (so timestamp is older than 1h from the wall clock on our servers - in UTC), then we adjust its timestamp to our wall clock minus 1h, and send it to the query. It also means that your tumbling window always has to wait an additional hour to be sure it's not missing those late records.
Here what I would do is restore the default settings (no out of order, 5 seconds late events, adjust events). Then when you get InputEventLateBeyondThreshold
, it means that that the job received a timestamp
that was later in the past than 5 seconds. You're not losing the data, we are adjusting its system.timestamp
to a more recent value (but not the timestamp field, we don't change it).
What we then need to understand is why does it take more than 5 seconds for a record in your pipeline to go from production to consumption. Is it because you have big delays in your ingestion pipeline, or because you have a time skew on your producer clock? Do you know?
Upvotes: 2