Bernard
Bernard

Reputation: 1015

Memory efficient data structure for high performance

I have a need for a circular buffer (or other data structure), on which I can do a "ToArray" or similar call to get the current state of the buffer and use whilst the buffer carries on possibly overwriting the values held.

The reason for this use case is that the returned data is passed to a worker thread to process and the idea is to ensure that as little data is overwritten as possible in between the calls. The choice of a circular data structure was ostensibly to reduce the memory usage of the buffer to a fixed value.

I built the data structure below, which up until yesterday was sufficient for my needs. The relevant calls are TakeSnapshot, TakeAll and TakeWeakArraySnapshot which are all variations on the same theme.

The call is made quite frequently when about a 1000 samples of reference types are available.

All the calls result in an out of memory exception at some point. The exception is the TakeWeakArraySnapshot which basically reverts to null midway through using the array (something i guess is to do with the fact that the gc handle is weak?)

Is there a more suitable memory efficient data structure for this use or something I am doing wrong? Would changing the gc handle type to normal help. Is it possible to create a output type that wraps the references (like i attempted to with the weakreference) which would be easily be reclaimed by the garbage collector?

Thanks for reading.

using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading;
using System.Runtime.InteropServices;


public class WeakArray<T>
{
    private GCHandle[] fArray;
    public WeakArray(int length)
    {
        fArray = new GCHandle[length];
        for (int i = 0; i < length; i++)
            fArray[i] = GCHandle.Alloc(null, GCHandleType.Weak);
    }

    ~WeakArray()
    {
        if (fArray == null)
            return;

        int count = fArray.Length;
        for (int i = 0; i < count; i++)
        {
            GCHandle gcHandle = fArray[i];

            if (!gcHandle.IsAllocated)
                break;

            gcHandle.Free();
        }
    }

    public int Length
    {
        get
        {
            return fArray.Length;
        }
    }

    public T this[int index]
    {
        get
        {
            return (T) fArray[index].Target;
        }
        set
        {
            fArray[index].Target = value;
        }
    }
}


public class OutputData<TDataType>:IEnumerable<TDataType>,IDisposable
{
    private TDataType[] Buffer { get;  set; }

    private readonly object _lock = new object();

    public OutputData(ref TDataType[] buffer,long length)
    {
        Buffer = buffer;
        Length = length;
    }

    public long Length { get; private set; }

    /// <summary>
    /// Gets the <see cref="`0"/> at the specified index. Throws IndexOutOfRange for an invalid index
    /// or returns the default value of the generic type on an empty queue
    /// </summary>
    /// <value>
    /// The <see cref="`0"/>.
    /// </value>
    /// <param name="i">The item at that index.</param>
    /// <returns></returns>
    public TDataType this[int i]
    {
        get
        {
            lock (_lock)
            {
                return Length > 0 ? Buffer[i] : default(TDataType);
            }
        }
    }

    public IEnumerator<TDataType> GetEnumerator()
    {
        for (int i = 0; i < Length; i++)
        {
            yield return Buffer[i];
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    public void Dispose()
    {
        Array.Clear(Buffer,0,Buffer.Length);
    }

    public void SetLength(int count)
    {
        Length = count;
    }
}

/// <summary>
/// Implements a CircularBuffer that behaves as a queue
/// </summary>
/// <typeparam name="T"></typeparam>
public class FixedCapacityQueue<T> 
{
    private T[] _buffer;

    private T[] _output;
    private OutputData<T> _outputData;

    /// <summary>
    /// Gets the dropped count. This is the number of samples that have been dropped to
    /// maintain the fixed size of the datastructure.
    /// </summary>
    /// <value>
    /// The dropped count.
    /// </value>
    public int DroppedCount { get; protected set; }

    /// <summary>
    /// The default number of dropped items required to generate a report
    /// </summary>
    protected const int DroppedFramesBetweenReports = 1000;



    /// <summary>
    /// The _start. Index of the first element in buffer.
    /// </summary>
    private int _start;

    /// <summary>
    /// The _end. Index after the last element in the buffer.
    /// </summary>
    private int _end;

    /// <summary>
    /// The _size. Buffer size.
    /// </summary>
    private int _numberOfItemsInBuffer;

    private readonly object _lock = new object();

    /// <summary>
    /// Gets or sets the name of the buffer.
    /// </summary>
    /// <value>
    /// The name.
    /// </value>
    public string Name { get; protected set; }

    private readonly T _default = default(T);

    /// <summary>
    /// Initializes a new instance of the <see cref="FixedCapacityQueue{T}" /> class.
    /// </summary>
    /// <param name="name">The name of the queue.</param>
    /// <param name="bufferSize">Size of the buffer.</param>
    public FixedCapacityQueue(string name, int bufferSize)
    {
        Contract.Requires(bufferSize > 0);
        Contract.Requires(!String.IsNullOrEmpty(name));

        _buffer = new T[bufferSize];
        _output = new T[bufferSize];
        _outputData = new OutputData<T>(ref _output, 0);

        _start = 0;
        _end = _numberOfItemsInBuffer == bufferSize ? 0 : _numberOfItemsInBuffer;


        Name = String.Format("FixedCapacityQueue for {0}", name);
    }

    /// <summary>
    /// Initializes a new instance of the <see cref="FixedCapacityQueue{T}" /> class.
    /// </summary>
    /// <param name="name">The nameof the buffer.</param>
    /// <param name="bufferSize">Size of the buffer.</param>
    /// <param name="data">The data to be added to the queue.</param>
    /// <exception cref="System.ArgumentException"></exception>
    public FixedCapacityQueue(string name, int bufferSize, ICollection<T> data)
        : this(name, bufferSize)
    {
        Contract.Requires(data != null);
        Contract.Requires(bufferSize > 0);
        Contract.Requires(data.Count < bufferSize);

        foreach (var dataItem in data)
        {
            Enqueue(dataItem);
        }
    }

    /// <summary>
    /// Gets a value indicating whether the queue [is empty].
    /// </summary>
    /// <value>
    ///   <c>true</c> if [is empty]; otherwise, <c>false</c>.
    /// </value>
    public bool IsEmpty
    {
        get
        {
            lock (_lock)
            {
                return _numberOfItemsInBuffer == 0;
            }
        }
    }

    /// <summary>
    /// Gets a value indicating whether the queue [is full].
    /// </summary>
    /// <value>
    ///   <c>true</c> if [is full]; otherwise, <c>false</c>.
    /// </value>
    public bool IsFull
    {
        get
        {
            lock (_lock)
            {
                return Count == Size;
            }
        }
    }

    /// <summary>
    /// Gets the number of items currently present in the queue.
    /// </summary>
    /// <value>
    /// The count.
    /// </value>
    public int Count
    {
        get
        {
            lock (_lock)
            {
                return _numberOfItemsInBuffer;
            }
        }
    }

    /// <summary>
    /// Gets the declared size of the queue.
    /// </summary>
    /// <value>
    /// The size.
    /// </value>
    public int Size
    {
        get
        {
            lock (_lock)
            {
                return _buffer.Length;
            }
        }
    }

    /// <summary>
    /// Dequeues an item from the queue. The expected behaviour is that if a Dequeue operation is
    /// requested whilst a queue is empty, the default type of the generic queue is returned.
    /// </summary>
    /// <returns></returns>
    public T Dequeue()
    {
        lock (_lock)
        {
            if (IsEmpty) return _default;

            var item = _buffer[_start];

            _buffer[_start] = _default;

            Increment(ref _start);
            --_numberOfItemsInBuffer;
            return item;
        }
    }

    /// <summary>
    /// Enqueues the specified item to the queue.
    /// </summary>
    /// <param name="toAdd">To add.</param>
    public void Enqueue(T toAdd)
    {
        lock (_lock)
        {
            if (IsFull)
            {
                _buffer[_end] = toAdd;
                Increment(ref _end);
                _start = _end;

                DroppedCount++;
                //report drops
                if (DroppedCount >= DroppedFramesBetweenReports)
                {
                    //EventAndErrorPump.Instance.WriteOnce(1000, ReportLevelEnum.Warning,
                    //  String.Format("{0} FixedQueue Dropped Items: {1} ", Name, DroppedCount));

                    DroppedCount = 0;
                }
            }
            else
            {
                _buffer[_end] = toAdd;
                Increment(ref _end);
                ++_numberOfItemsInBuffer;
            }
        }
    }

    /// <summary>
    /// Increments the provided index variable by one, wrapping
    /// around if necessary.
    /// </summary>
    /// <param name="index"></param>
    private void Increment(ref int index)
    {
        if (++index == Size)
        {
            index = 0;
        }
    }

    /// <summary>
    /// Decrements the provided index variable by one, wrapping
    /// around if necessary.
    /// </summary>
    /// <param name="index"></param>
    private void Decrement(ref int index)
    {
        if (index == 0)
        {
            index = Size;
        }
        index--;
    }

    public void Enqueue(IEnumerable<T> toAdd)
    {
        lock (_lock)
        {
            foreach (var dataItem in toAdd)
            {
                Enqueue(dataItem);
            }
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        var segments = new ArraySegment<T>[2] {ArrayOne(), ArrayTwo()};
        foreach (ArraySegment<T> segment in segments)
        {
            for (int i = 0; i < segment.Count; i++)
            {
                yield return segment.Array[segment.Offset + i];
            }
        }
    }

    /// <summary>
    /// Gets the  at the specified index. Throws IndexOutOfRange for an invalid index
    /// or returns the default value of the generic type on an empty queue. The head/earliest item index can also be
    /// null in the event of a dequeue. If the front of the queue is required,please use the front function instead
    /// </summary>
    /// <param name="i">The item at that index.</param>
    /// <returns></returns>
    /// <exception cref="System.IndexOutOfRangeException"></exception>
    public T this[int index]
    {
        get
        {
            lock (_lock)
            {
                if (IsEmpty)
                {
                    return _default;
                }
                if (index >= _numberOfItemsInBuffer)
                {
                    throw new IndexOutOfRangeException(string.Format("Cannot access index {0}. Buffer size is {1}",
                        index, _numberOfItemsInBuffer));
                }
                int actualIndex = InternalIndex(index);
                return _buffer[actualIndex];
            }
        }
    }

    /// <summary>
    /// Converts the index in the argument to an index in <code>_buffer</code>
    /// </summary>
    /// <returns>
    /// The transformed index.
    /// </returns>
    /// <param name='index'>
    /// External index.
    /// </param>
    private int InternalIndex(int index)
    {
        return _start + (index < (Size - _start) ? index : index - Size);
    }

    /// <summary>
    /// Clears this instance.
    /// </summary>
    public void Clear()
    {
        lock (_lock)
        {
            _numberOfItemsInBuffer = 0;
            _start = 0;
            _end = 0;
        }
    }

    /// <summary>
    /// Takes a snapshot of the queue and returns it. If used in isolation .i.e not in a buffer implementation 
    /// it will return all the elements of the queue and leave the queue empty.
    /// In a buffer implementation , since the buffer could be accepting data at the point of this call; it is a stale 
    /// snapshot of the queue when the call is made.
    /// </summary>
    /// <returns></returns>
    [Obsolete("Use TakeAllData() instead")]
    public T[] TakeSnapshot()
    {
        lock (_lock)
        {
            return TakeSnapshot(Count);
        }
    }

    /// <summary>
    /// Takes all data available in the queue.
    /// </summary>
    /// <returns></returns>
    public OutputData<T> TakeAll()
    {
        var count = Count;
        if (count <= 0) return null;
        lock (_lock)
        {
            CopyInto(count, _output);
            _outputData.SetLength(count);
            return _outputData;
        }
    }


    /// <summary>
    /// Takes a snapshot of the queue using the count to limit the output to return. In the event that the specified 
    /// count is larger than the current size of the queue, it will throw an exception. A zero count value will immediately return
    /// In a buffer implementation , since the buffer could be accepting data at the point of this call; it is a stale 
    /// snapshot of the queue when the call is made.
    /// </summary>
    /// <returns></returns>
    [Obsolete("Use TakeAllData(int count) instead")]
    public T[] TakeSnapshot(int count)
    {
        if (count == 0) return null;
        lock (_lock)
        {
            if (count > _numberOfItemsInBuffer)
            {
                count = _numberOfItemsInBuffer;
                //throw new ArgumentOutOfRangeException(String.Format("Queue size is {0}", Size));
            }
            var output = new T[count];

            CopyInto(count, output);
            return output;
        }
    }

    private void CopyInto(int count, T[] output)
    {
        var lastIndex = (_start + count) - 1;

        if (lastIndex >= Size)
        {
            Array.Copy(_buffer, _start, output, 0, Size - _start);
            Array.Copy(_buffer, 0, output, Size - _start, (lastIndex%Size) + 1);
        }
        else
        {
            Array.Copy(_buffer, _start, output, 0, count);
        }
        _start = (_start + count)%Size;

        _numberOfItemsInBuffer = _numberOfItemsInBuffer - count;
    }

    public T[] PeekSnapshot()
    {
        lock (_lock)
        {
            var count = Count;
            var output = new T[count];
            for (var i = 0; i < count; i++)
            {
                output[i] = this[i];
            }
            return output;
        }
    }

    public T[] PeekSnapshot(int count)
    {
        if (count == 0) return null;
        lock (_lock)
        {
            if (count > Size)
            {
                throw new ArgumentOutOfRangeException(String.Format("Queue size is {0}", Size));
            }
            var output = new T[count];
            for (var i = 0; i < count; i++)
            {
                output[i] = this[i];
            }
            return output;
        }
    }


    /// <summary>
    /// Gets the front of the queue. The earliest item added to the queue.
    /// Use this in lieu of checking this[0] as that could be null whilst items still
    /// exist in the queue
    /// </summary>
    /// <value>
    /// The front.
    /// </value>
    public T Front()
    {
        lock (_lock)
        {
            return IsEmpty ? _default : _buffer[_start];
        }
    }

    /// <summary>
    /// Gets the utilisation of the datastructure i.e % of the datastructure currently in use
    /// </summary>
    /// <value>
    /// The utilisation.
    /// </value>
    public float Utilisation
    {
        get
        {
            lock (_lock)
            {
                return CalculateUtilisation();
            }
        }
    }

    private float CalculateUtilisation()
    {
        var util = 0f;
        if (Size > 0)
        {
            util = (Count/(float) Size)*100;
        }
        return util;
    }

    /// <summary>
    /// Returns the latest item in the queue.
    /// </summary>
    /// <returns></returns>
    public T Back()
    {
        lock (_lock)
        {
            return Count > 0 ? _buffer[_end - 1] : _default;
            ;
        }
    }

    public WeakArray<T> TakeWeakArraySnapshot()
    {
        lock (_lock)
        {
            var count = Count;
            var arr = new WeakArray<T>(count);
            for (var i = 0; i < count; i++)
            {
                arr[i] = Dequeue();
            }
            return arr;
        }
    }

    /// <summary>
    /// Gets the internal array. Use with EXTREME CAUTION! This is not a thread safe operation
    /// and should ONLY be used in situations where any modification to the queue is not possible.
    /// Modification operations include Enqueing or Dequeing
    /// </summary>
    /// <returns></returns>
    public T[] GetInternalArray()
    {
        return _buffer;
    }


    // doing ArrayOne and ArrayTwo methods returning ArraySegment<T> as seen here: 
    // http://www.boost.org/doc/libs/1_37_0/libs/circular_buffer/doc/circular_buffer.html#classboost_1_1circular__buffer_1957cccdcb0c4ef7d80a34a990065818d
    // http://www.boost.org/doc/libs/1_37_0/libs/circular_buffer/doc/circular_buffer.html#classboost_1_1circular__buffer_1f5081a54afbc2dfc1a7fb20329df7d5b
    // should help a lot with the code.

    #region Array items easy access.

    // The array is composed by at most two non-contiguous segments, 
    // the next two methods allow easy access to those.

    private ArraySegment<T> ArrayOne()
    {
        if (_start < _end)
        {
            return new ArraySegment<T>(_buffer, _start, _end - _start);
        }
        else
        {
            return new ArraySegment<T>(_buffer, _start, _buffer.Length - _start);
        }
    }

    private ArraySegment<T> ArrayTwo()
    {
        if (_start < _end)
        {
            return new ArraySegment<T>(_buffer, _end, 0);
        }
        else
        {
            return new ArraySegment<T>(_buffer, 0, _end);
        }
    }

    #endregion




}

Upvotes: 0

Views: 1253

Answers (1)

Maxime Rouiller
Maxime Rouiller

Reputation: 13699

Most data structure performance and efficiency comes down to the problem you are trying to solve.

I would recommend not to re-implement data structures yourself unless they don't exist already. Things like ngenerics exist exactly to solve those kind of problems.

This is just a sample project but I'm sure there is other types.

Upvotes: 1

Related Questions