Reputation: 1577
I have a couple of processes that poll different data sources for some specific kind of information. They poll it quite often and do it in the background so when I need this information it is readily available and doesn't require a roundtrip that will waste time.
Sample code will look like this:
public class JournalBackgroundPoller
{
private readonly int _clusterSize;
private readonly IConfiguration _configuration;
Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;
internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();
_configuration = configuration;
_clusterSize = 20;//for the sake of demo
_journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
_runHolder = BuildAndRun();
}
private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
PopulateShardsRegistry();
foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
pollingTasks.Add(PollInfinitely(js, buffer));
}
var r = Task.WhenAll(pollingTasks);
return r;
}
private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}
private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = journalEntries });
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
}
await Task.Delay(_localRandom.Next(400, 601));
}
}
private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
{
try
{
if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
{
return input;
}
foreach (var journalEntry in input.JournalEntryResponseItems)
{
//do some transformations here
}
return input;
}
catch (Exception ex)
{
Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}
private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
For simplicity journal related entities will look like this:
class JournalEntryResponseItem
{
public string SomeProperty1 { get; set; }
public string SomeProperty2 { get; set; }
}
class JournalResponsesWrapper
{
public KeyValuePair<int, string> JournalDataSource { get; set; }
public List<JournalEntryResponseItem> JournalEntryResponseItems { get; set; }
}
The global problem with the code provided is obviously that I'm creating a relatively big amount of objects that might end up in LOH in short period of time. Data sources always provide up to date entries so i don't need to keep the older ones (nor that i can do it as they are not distinguished). My question is whether it is possible to optimize the memory usage, object creation and replacement roundtrips so I can reduce the frequency of garbage collection? Right now by the looks of it garbage collection happens every ~5-10 seconds.
UPD 1: I access data via ResultsBuffer
and can read the same set multiple times before it's refreshed. It's not guaranteed that one particular data set will be read only once (or read at all). My big objects are List<JournalEntryResponseItem>
instances, initially coming from datasource and then saved to ResultsBuffer
.
UPD 2: Data sources have only one endpoint that returns all entities in this "shard" at once, I can't apply filtering during request. Response entities do not have unique keys/identifiers.
UPD 3: Some answers suggest to measure/profile the app first. While this is perfectly valid suggestion in this particular case it's clearly memory/GC related because of the following observations:
Upvotes: 4
Views: 1020
Reputation: 1713
I'm sure that PollInifinitely could be adjusted to handle how much data is downloaded to the client, but breaking up a large list if that's what's downloaded is quite tricky and requires some deeper work.
Starting from the beginning, if you download 200000 records of anything using a non-streaming client/consumer, then you're always going to end up with some kind of large array - that is unavoidable. You'll need to find (or write) a library that can parse JSON (or XML or whatever) as it streams in. You can then choose how big your individual lists are, so instead of a list of 200,000, you have 200 lists of 1,000 records. Although if you can control the number of records from the client side, you can just request 1000 records instead of 200,000.
I don't know whether you're writing a cache that saves a lot of data, or a service in a streaming chain with a consumer at the other end. Presuming a consumer, you should probably use a Semaphore alongside your delay in PollInfinitely - by maintaining the Semaphore count, you can cease downloading at a maximum record count easily (SemaphoreSlim is awaitable too).
Thirdly, if you're really having problems with memory fluctuation and garbage collection, you could fashion your storage as a single large allocation that never gets freed. Use struct instead of class, and use fixed size byte arrays instead of strings. Write enough code to simulate a ring buffer of a maximum size, and you'll have to blit your data from incoming classes to your ring buffer. This would be slower than the reference assignments that you have but you'd never see Garbage collection releasing any of your memory - use the maximum ring buffer capacity in your semaphore.
If you're streaming data in, you shouldn't get any advantage from reading too far ahead, you'd only do that if your consumer only jumped in sporadically.
I hope I'm on the right track here.
Mark
Upvotes: 1
Reputation: 1741
As behind List<T>
there's always a T[]
of consecutive items, dimensioning it as 200000 will definitely put it straight away in the LOH. To avoid that I suggest to use simple logical partitioning instead of a physical dimensioning and Post
the list in batches. This way during each poll the huge list will go to the LOH but collected in the next GC generation 2 collection (please make sure there are no more references to it). The LOH becomes almost empty however there will be more GC Generation 2 collections than before due to the added copy operations happening in the Managed Heap. It is a small change and I provide the new JournalBackgroundPoller
class:
public class JournalBackgroundPoller
{
private readonly int _clusterSize;
private readonly IConfiguration _configuration;
Dictionary<int, string> _journalAddresses;
private readonly Random _localRandom;
private readonly Task _runHolder;
internal readonly ConcurrentDictionary<int, List<JournalEntryResponseItem>> ResultsBuffer = new ConcurrentDictionary<int, List<JournalEntryResponseItem>>();
public JournalBackgroundPoller(IConfiguration configuration)
{
_localRandom = new Random();
_configuration = configuration;
_clusterSize = 20;//for the sake of demo
// _journalAddresses = //{{1, "SOME ADDR1"}, {2, "SOME ADDR 2"}};
_journalAddresses = new Dictionary<int, string>
{
{ 1, "SOME ADDR1" },
{ 2, "SOME ADDR 2" }
};
_runHolder = BuildAndRun();
}
private Task BuildAndRun()
{
var pollingTasks = new List<Task>();
var buffer = new BroadcastBlock<JournalResponsesWrapper>(item => item);
PopulateShardsRegistry();
foreach (var js in _journalAddresses)
{
var dataProcessor = new TransformBlock<JournalResponsesWrapper, JournalResponsesWrapper>(NormalizeValues,
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 1 });
var dataStorer = new ActionBlock<JournalResponsesWrapper>(StoreValuesInBuffer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, EnsureOrdered = true, BoundedCapacity = 2 });
buffer.LinkTo(dataProcessor, wrapper => wrapper.JournalDataSource.Key == js.Key);
dataProcessor.LinkTo(dataStorer);
dataProcessor.LinkTo(DataflowBlock.NullTarget<JournalResponsesWrapper>());
pollingTasks.Add(PollInfinitely(js, buffer));
}
var r = Task.WhenAll(pollingTasks);
return r;
}
private void PopulateShardsRegistry()
{
try
{
for (int i = 0; i < _clusterSize; i++)
{
var _ = ResultsBuffer.GetOrAdd(i, ix => new List<JournalEntryResponseItem>());
}
}
catch (Exception e)
{
Console.WriteLine("Could `t initialize shards registry");
}
}
private async Task PollInfinitely(KeyValuePair<int, string> dataSourceInfo, BroadcastBlock<JournalResponsesWrapper> buffer)
{
while (true)
{
try
{
//here we create a client and get a big list of journal entries, ~200k from one source. below is dummy code
var journalEntries = new List<JournalEntryResponseItem>(200000);
// NOTE:
// We need to avoid references to the huge list so GC collects it ASAP in the next
// generation 2 collection: after that, nothing else goes to the LOH.
const int PartitionSize = 1000;
for (var index = 0; index < journalEntries.Count; index += PartitionSize)
{
var journalEntryResponseItems = journalEntries.GetRange(index, PartitionSize);
buffer.Post(
new JournalResponsesWrapper
{
JournalDataSource = dataSourceInfo,
JournalEntryResponseItems = journalEntryResponseItems
});
}
}
catch (Exception ex)
{
Console.WriteLine($"Polling {dataSourceInfo.Value} threw an exception, overwriting with empty data");
buffer.Post(
new JournalResponsesWrapper { JournalDataSource = dataSourceInfo, JournalEntryResponseItems = new List<JournalEntryResponseItem>() });
}
await Task.Delay(_localRandom.Next(400, 601));
}
}
private JournalResponsesWrapper NormalizeValues(JournalResponsesWrapper input)
{
try
{
if (input.JournalEntryResponseItems == null || !input.JournalEntryResponseItems.Any())
{
return input;
}
foreach (var journalEntry in input.JournalEntryResponseItems)
{
//do some transformations here
}
return input;
}
catch (Exception ex)
{
Console.WriteLine($"Normalization failed for cluster {input.JournalDataSource.Value}, please review!");
return null;
}
}
private void StoreValuesInBuffer(JournalResponsesWrapper input)
{
try
{
ResultsBuffer[input.JournalDataSource.Key] = input.JournalEntryResponseItems;
}
catch (Exception ex)
{
Console.WriteLine($"Could not write content to dictionary");
}
}
}
JournalEntryResponseItem[]
from wasted 1,600,000 with length 200,000 to none.Upvotes: 2