Deserialize an Avro file with C#
I was able to get full data access working using dynamic
. Here's the code for accessing the raw body
data, which is stored as an array of bytes. In my case, those bytes contain UTF8-encoded JSON, but of course it depends on how you initially created your EventData
instances that you published to the Event Hub:
using (var reader = AvroContainer.CreateGenericReader(stream)){ while (reader.MoveNext()) { foreach (dynamic record in reader.Current.Objects) { var sequenceNumber = record.SequenceNumber; var bodyText = Encoding.UTF8.GetString(record.Body); Console.WriteLine($"{sequenceNumber}: {bodyText}"); } }}
If someone can post a statically-typed solution, I'll upvote it, but given that the bigger latency in any system will almost certainly be the connection to the Event Hub Archive blobs, I wouldn't worry about parsing performance. :)
This Gist shows how to deserialize an event hub capture with C# using Microsoft.Hadoop.Avro2, which has the advantage of being both .NET Framework 4.5 and .NET Standard 1.6 compliant:
var connectionString = "<Azure event hub capture storage account connection string>"; var containerName = "<Azure event hub capture container name>"; var blobName = "<Azure event hub capture BLOB name (ends in .avro)>"; var storageAccount = CloudStorageAccount.Parse(connectionString); var blobClient = storageAccount.CreateCloudBlobClient(); var container = blobClient.GetContainerReference(containerName); var blob = container.GetBlockBlobReference(blobName); using (var stream = blob.OpenRead()) using (var reader = AvroContainer.CreateGenericReader(stream)) while (reader.MoveNext()) foreach (dynamic result in reader.Current.Objects) { var record = new AvroEventData(result); record.Dump(); } public struct AvroEventData { public AvroEventData(dynamic record) { SequenceNumber = (long) record.SequenceNumber; Offset = (string) record.Offset; DateTime.TryParse((string) record.EnqueuedTimeUtc, out var enqueuedTimeUtc); EnqueuedTimeUtc = enqueuedTimeUtc; SystemProperties = (Dictionary<string, object>) record.SystemProperties; Properties = (Dictionary<string, object>) record.Properties; Body = (byte[]) record.Body; } public long SequenceNumber { get; set; } public string Offset { get; set; } public DateTime EnqueuedTimeUtc { get; set; } public Dictionary<string, object> SystemProperties { get; set; } public Dictionary<string, object> Properties { get; set; } public byte[] Body { get; set; } }
NuGet references:
- Microsoft.Hadoop.Avro2 (1.2.1 works)
- WindowsAzure.Storage (8.3.0 works)
Namespaces:
- Microsoft.Hadoop.Avro.Container
- Microsoft.WindowsAzure.Storage
I was finally able to get this to work with the Apache C# library / framework.
I was stuck for a while because the Capture feature of the Azure Event Hubs sometimes outputs a file without any message content. I may have also had an issue with how the messages were originally serialized into the EventData object.
The code below was for a file saved to disk from a capture blob container.
var dataFileReader = DataFileReader<EventData>.OpenReader(file);foreach (var record in dataFileReader.NextEntries){ // Do work on EventData object}
This also works using the GenericRecord object.
var dataFileReader = DataFileReader<GenericRecord>.OpenReader(file);
This took some effort to figure out. However I now agree this Azure Event Hubs Capture feature is a great feature to backup all events. I still feel they should make the format optional like they did with Stream Analytic job output but maybe I will get used to Avro.