Azure Functions — Event hub processing
When processing an Event Hub stream, your consumer has it’s view of where it has progressed to within the stream and manages this through sequence numbers, checkpoints and offsets.
Depending on how much new information is being added to the stream, your consumer may be falling behind — there is more data ingress than your compute can process. Understanding where in the stream your compute has processed to and how far behind the latest events is a metric we need to understand, as this will enable us to scale out if required and keep to near real-time processing.
Due to the nature of streams, there is no hard number as with queues and topics in Service Bus. For example, we can measure a queue and say ‘there are 30 items to be processed’. Likewise with topics, we can check each subscription and find the number of items. In stream processing, as different consumers can have different views of the data, measuring is therefore a relative process — at the point in time that the measurement is taken and what is doing the measuring.
This article will focus on Azure Function V2 C# using the EventHubTrigger binding to access the stream.
The metadata in an EventData object passed to a function binding contains some of the items we require to calculate a metric, but we also need some additional information from the particular partition to make the metric useful. We can access this by including a PartitionContext object as part of the function signature. NB: It must be named partitionContext in order for the function binding to resolve correctly.
The PartitionContext object will contain data about where the EventData object has been retrieved from — such as PartitionId, ConsumerGroupName and EventHubPath, but it has another object we’re interested in, RuntimeInformation, that will allow us to perform a calculation.
By default, this object is not populated, you must amend your host.json to specify you want the retrieve the details by setting enableReceiverRuntimeMetric in the eventProcessorOptions configuration, e.g.
At this point, you have information about the partition, including the last sequence generated, as well as data for the current message, so we’re ready to create a metric. At the point of processing the message, we want to determine where we are in the stream relative to the last message for the particular partition.
This example uses an application insights custom metric, which you would call from your main function signature where you have access to the EventData and PartitionContext objects as part of the function binding.
NB: In a high volume production system, you would be managing the instances of TelemetryClient and Metric , rather than calling GetMetric each time — this snippet just shows the technique. You’d also want to control how often this gets called, rather than for every event processed.
To determine how far behind the latest message in the partition you are, get the sequence number of the particular EventData object and the number of the last sequence for that partition. The metric value that we want to track is the difference between these 2 numbers (lower is better), which we can then analyse and make scaling decisions upon. In order to correctly graph and split, we need to include a number of dimensions for the metric — the partition, consumer group and event hub name.
Once you’ve started sending the metric to application insights, you can now query the custom metrics section and can view if the sequence difference is increasing over time / set up alerts based on critical values etc.
Using the Azure Monitor plugin for Grafana, you can visualise the metric and plot a chart
- Choose the metric name you created, e.g. PartitionSequenceDifference
- Group by ConsumerGroupName, if applicable.
- Filter by EventHubPath, by specifying customDimensions/EventHubPath eq ‘your-eventhub-name’
Which will gives you a view of the metric on a dashboard…
Nice!