HardLuck
HardLuck

Reputation: 1577

C# garbage collection of many relatively big objects

I have a couple of processes that poll different data sources for some specific kind of information. They poll it quite often and do it in the background so when I need this information it is readily available and doesn't require a roundtrip that will waste time.
Sample code will look like this:

public class JournalBackgroundPoller
{
    private readonly int _clusterSize;

    private readonly IConfiguration _configuration;

    Dictionary<int, string> _journalAddresses;
    private readonly Random _localRandom;
    private readonly Task _runHolder;

    internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

    public JournalBackgroundPoller(IConfiguration configuration)
    {
        _localRandom = new Random();

        _configuration = configuration;
        _clusterSize = 20;//for the sake of demo

        _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};

        _runHolder = BuildAndRun();
    }

    private Task BuildAndRun()
    {
        var pollingTasks = new List<Task>();
        var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

        PopulateShardsRegistry();

        foreach (var js in _journalAddresses)
        {
            var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
                new ExecutionDataflowBlockOptions
                { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

            var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

            buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

            dataProcessor.LinkTo(dataStorer);
            dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

            pollingTasks.Add(PollInfinitely(js, buffer));
        }

        var r = Task.WhenAll(pollingTasks);
        return r;
    }

    private void PopulateShardsRegistry()
    {
        try
        {
            for (int i = 0; i < _clusterSize; i++)
            {
                var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Could `t initialize shards registry");
        }
    }

    private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
    {
        while (true)
        {
            try
            {
                //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
                var journalEntries = new List<JournalEntryResponseItem>(200000);

                buffer.Post(
                    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
                buffer.Post(
                    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
            }

            await Task.Delay(_localRandom.Next(400, 601));
        }
    }

    private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
    {
        try
        {
            if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
            {
                return input;
            }

            foreach (var journalEntry in input.JournalEntryResponseItems)
            {
                //do some transformations here
            }

            return input;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
            return null;
        }
    }

    private void StoreValuesInBuffer(JournalResponsesWrapper input)
    {
        try
        {
            ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Could not write content to dictionary");
        }
    }
}

For simplicity journal related entities will look like this:

class JournalEntryResponseItem
{
    public string SomeProperty1 { get; set; }

    public string SomeProperty2 { get; set; }
}

class JournalResponsesWrapper
{
    public KeyValuePair<int, string> JournalDataSource { get; set; }

    public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}

The global problem with the code provided is obviously that I'm creating a relatively big amount of objects that might end up in LOH in short period of time. Data sources always provide up to date entries so i don't need to keep the older ones (nor that i can do it as they are not distinguished). My question is whether it is possible to optimize the memory usage, object creation and replacement roundtrips so I can reduce the frequency of garbage collection? Right now by the looks of it garbage collection happens every ~5-10 seconds.

UPD 1: I access data via ResultsBuffer and can read the same set multiple times before it's refreshed. It's not guaranteed that one particular data set will be read only once (or read at all). My big objects are List<JournalEntryResponseItem> instances, initially coming from datasource and then saved to ResultsBuffer .

UPD 2: Data sources have only one endpoint that returns all entities in this "shard" at once, I can't apply filtering during request. Response entities do not have unique keys/identifiers.

UPD 3: Some answers suggest to measure/profile the app first. While this is perfectly valid suggestion in this particular case it's clearly memory/GC related because of the following observations:

  1. Visual throttling happens exactly at the moment when apps RAM consumption goes down sharply after growing steadily for some time.
  2. If I add X more journal sources apps' memory will grow until it takes all free memory on the server and then there is an even longer freeze (1-3 seconds) after which memory goes down sharply and app keeps working until it hits memory limit again.

Upvotes: 4

Views: 1020

Answers (2)

Mark Rabjohn
Mark Rabjohn

Reputation: 1713

I'm sure that PollInifinitely could be adjusted to handle how much data is downloaded to the client, but breaking up a large list if that's what's downloaded is quite tricky and requires some deeper work.

Starting from the beginning, if you download 200000 records of anything using a non-streaming client/consumer, then you're always going to end up with some kind of large array - that is unavoidable. You'll need to find (or write) a library that can parse JSON (or XML or whatever) as it streams in. You can then choose how big your individual lists are, so instead of a list of 200,000, you have 200 lists of 1,000 records. Although if you can control the number of records from the client side, you can just request 1000 records instead of 200,000.

I don't know whether you're writing a cache that saves a lot of data, or a service in a streaming chain with a consumer at the other end. Presuming a consumer, you should probably use a Semaphore alongside your delay in PollInfinitely - by maintaining the Semaphore count, you can cease downloading at a maximum record count easily (SemaphoreSlim is awaitable too).

Thirdly, if you're really having problems with memory fluctuation and garbage collection, you could fashion your storage as a single large allocation that never gets freed. Use struct instead of class, and use fixed size byte arrays instead of strings. Write enough code to simulate a ring buffer of a maximum size, and you'll have to blit your data from incoming classes to your ring buffer. This would be slower than the reference assignments that you have but you'd never see Garbage collection releasing any of your memory - use the maximum ring buffer capacity in your semaphore.

If you're streaming data in, you shouldn't get any advantage from reading too far ahead, you'd only do that if your consumer only jumped in sporadically.

I hope I'm on the right track here.

Mark

Upvotes: 1

Lucky Brain
Lucky Brain

Reputation: 1741

As behind List<T> there's always a T[] of consecutive items, dimensioning it as 200000 will definitely put it straight away in the LOH. To avoid that I suggest to use simple logical partitioning instead of a physical dimensioning and Post the list in batches. This way during each poll the huge list will go to the LOH but collected in the next GC generation 2 collection (please make sure there are no more references to it). The LOH becomes almost empty however there will be more GC Generation 2 collections than before due to the added copy operations happening in the Managed Heap. It is a small change and I provide the new JournalBackgroundPoller class:

public class JournalBackgroundPoller
{
    private readonly int _clusterSize;

    private readonly IConfiguration _configuration;

    Dictionary<int, string> _journalAddresses;
    private readonly Random _localRandom;
    private readonly Task _runHolder;

    internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();

    public JournalBackgroundPoller(IConfiguration configuration)
    {
        _localRandom = new Random();

        _configuration = configuration;
        _clusterSize = 20;//for the sake of demo

        // _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
        _journalAddresses = new Dictionary<int, string>
        {
            { 1, "SOME ADDR1" },
            { 2, "SOME ADDR 2" }
        };

        _runHolder = BuildAndRun();
    }

    private Task BuildAndRun()
    {
        var pollingTasks = new List<Task>();
        var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);

        PopulateShardsRegistry();

        foreach (var js in _journalAddresses)
        {
            var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
                new ExecutionDataflowBlockOptions
                { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });

            var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });

            buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);

            dataProcessor.LinkTo(dataStorer);
            dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());

            pollingTasks.Add(PollInfinitely(js, buffer));
        }

        var r = Task.WhenAll(pollingTasks);
        return r;
    }

    private void PopulateShardsRegistry()
    {
        try
        {
            for (int i = 0; i < _clusterSize; i++)
            {
                var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("Could `t initialize shards registry");
        }
    }

    private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
    {
        while (true)
        {
            try
            {
                //here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
                var journalEntries = new List<JournalEntryResponseItem>(200000);

                // NOTE:
                // We need to avoid references to the huge list so GC collects it ASAP in the next
                // generation 2 collection: after that, nothing else goes to the LOH.
                const int PartitionSize = 1000;
                for (var index = 0; index < journalEntries.Count; index += PartitionSize)
                {
                    var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
                    buffer.Post(
                        new JournalResponsesWrapper
                        {
                            JournalDataSource = dataSourceInfo,
                            JournalEntryResponseItems = journalEntryResponseItems
                        });
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
                buffer.Post(
                    new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
            }

            await Task.Delay(_localRandom.Next(400, 601));
        }
    }

    private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
    {
        try
        {
            if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
            {
                return input;
            }

            foreach (var journalEntry in input.JournalEntryResponseItems)
            {
                //do some transformations here
            }

            return input;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
            return null;
        }
    }

    private void StoreValuesInBuffer(JournalResponsesWrapper input)
    {
        try
        {
            ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Could not write content to dictionary");
        }
    }
}

Please take a look at a snapshot of the original memory usage after 30 seconds

Before the optimization

This is the snapshot of the optimized memory usage after 30 seconds

After the optimization

Note the differences

  • Sparse arrays: JournalEntryResponseItem[] from wasted 1,600,000 with length 200,000 to none.
  • LOH usage: from 3.05 MB used to none.

Upvotes: 2

Related Questions