LCaraway
LCaraway

Reputation: 1357

C# - Parse JSON Asynchronously into Objects and add to queue for further processing

I need to preform ETL like operations onto a JSON file that I receive inside of a .tar.gz. I have been able to successfully unzip and untar the file into a memory stream.

My issue is this, these files are so large that I see memory issues when trying to use various libraries to parse the file into objects. I have used suggestions from a few places to try not to read the whole file but they still involve throwing the whole file into memory it would seem.

https://www.newtonsoft.com/json/help/html/Performance.htm https://www.newtonsoft.com/json/help/html/SerializingJSONFragments.htm

What I would like to do, is asynchronously read a lite bit of the file, try to parse this read into objects, and then add to the appropriate queues for further processing. My hope is that I can strip these objects out of memory as quickly as possible.

Example JSON

{
  "header" : {
    "id" : 12345,
    "datetime" : 1640423287060050040,
    "version" : 1.0
  },
  "Reading" : [
    {
      "id" : 54321,
      "units" : "fps",
      "data" : [
        {
          "value"  : 32,
          "time" : 1630000000000000400
        },
        {
          "value"  : 32,
          "time" : 1630000000000000400
        },
        {
          "value"  : 32,
          "time" : 1630000000000000400
        }
      ]
    },
    {
      "id" : 765432,
      "units" : "fps",
      "data" : [
        {
          "value"  : 21,
          "time" : 1630000000000000400
        },
        {
          "value"  : 21,
          "time" : 1630000000000000400
        },
        {
          "value"  : 21,
          "time" : 1630000000000000400
        }
      ]
    }
  ]
}

This is memory inefficent due to the ReadToEndAsync()

using (var stream = _readFile(inFilePath)) //TODO Read async 
    {
        if (stream == null || stream == StreamReader.Null)
        {
            throw new Exception("streamReader is null");
        }
        
        data = JObject.Parse(await stream.ReadToEndAsync());
    }
}

Where im stuck

var memoryStream = UnTarGz.ExtractTarGzToStream(inFilePath);
memoryStream.Position = 0;
using (var streamReader = new StreamReader(memoryStream))
    {
        using (JsonReader reader = new JsonTextReader(streamReader))
        {
            var header = new Header();
            while (await reader.ReadAsync(CancellationToken.None))
            { 
                //if somehow I can detect header set parameter above as header data. 
                //else if its reading data build an object such that 
                // {
                //     header = header;
                //     readings = the reading data
                // }
                //
                // add to queue
            }
                    
        }
    }

Upvotes: 0

Views: 871

Answers (1)

evilmandarine
evilmandarine

Reputation: 4553

Jeroen provided an overview of the solution in the comments. At the time of writing my reputation is about 5% of his, so it took me a while to figure out what that actually meant :) but as I finally got it working I'll post the result here in case this is what you're looking for:

public static class Json
{
    public static void Run()
    {
        using (Stream s = File.OpenRead(@"C:\Temp\input.json")) // This is your sample.
        using (var sr = new StreamReader(s))
        using (var reader = new JsonTextReader(sr))
        {
            var serializer = new JsonSerializer();
            var readingCounter = 0;

            // While there are tokens to be read, do it and update reader with token attributes.
            while (reader.Read())
            {
                if (reader.TokenType == JsonToken.StartObject && reader.Path == "Header")
                {
                    var header = serializer.Deserialize<Header>(reader);
                    Console.WriteLine(header);
                }

                if (reader.TokenType == JsonToken.StartObject && reader.Path == $"Reading[{readingCounter}]")
                {
                    var currentReading = serializer.Deserialize<Reading>(reader);
                    readingCounter++;

                    // Process current reading object here (ex. "add to queue"):
                    Console.WriteLine(currentReading);
                }
            }
        }
    }
}

public class Header
{
    public long Id { get; set; }
    public long Datetime { get; set; }
    public string Version { get; set; }

    // For the example only:
    public override string ToString()
    {
        return Id + ", " + Datetime + ", " + Version;
    }
}

public class Reading
{
    public int Id { get; set; }
    public string Units { get; set; }
    public List<ValueTime> Data { get; set; }

    // For the example only:
    public override string ToString()
    {
        return Id + ", " + Units + ", data count = " + Data.Count;
    }
}

public class ValueTime
{
    public int Value { get; set; }
    public long Time { get; set; }
}

Upvotes: 1

Related Questions