yBother
yBother

Reputation: 718

Cannot retrieve chunks of data from BlockingCollection<T>

I often find myself in a situation where I do want to stream data in chunks rather then one by one. Usually I do this when I need to do some I/O based operation like database inserts where I want to limit roundtrips. So I got myself this nice little extension method:

        public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> data, int size)
        {            
            using (var enumerator = data.GetEnumerator())
            {
                while (enumerator.MoveNext())
                {
                    yield return YieldBatchElements(enumerator, size - 1).ToList();
                }
            }

            IEnumerable<TU> YieldBatchElements<TU>(
                IEnumerator<TU> source,
                int batchSize)
            {
                yield return source.Current;
                for (var i = 0; i < batchSize && source.MoveNext(); i++)
                {
                    yield return source.Current;
                }
            }
        }

This works just fine but I noticed that it does not work with BlockCollection<T> GetConsumingEnumerable

I created following Test Method to demonstrate my findings:

        [Test]
        public static void ConsumeTest()
        {
            var queue = new BlockingCollection<int>();
            var i = 0;
            foreach (var x in Enumerable.Range(0, 10).Split(3))
            {
                Console.WriteLine($"Fetched chunk: {x.Count}");
                Console.WriteLine($"Fetched total: {i += x.Count}");
            }
            //Fetched chunk: 3
            //Fetched total: 3
            //Fetched chunk: 3
            //Fetched total: 6
            //Fetched chunk: 3
            //Fetched total: 9
            //Fetched chunk: 1
            //Fetched total: 10
         

            Task.Run(
                () =>
                    {
                        foreach (var x in Enumerable.Range(0, 10))
                        {
                            queue.Add(x);
                        }
                    });

            i = 0;
            foreach (var element in queue.GetConsumingEnumerable(
                new CancellationTokenSource(3000).Token).Split(3))
            {
                Console.WriteLine($"Fetched chunk: {element.Count}");
                Console.WriteLine($"Fetched total: {i += element.Count}");
            }

            //Fetched chunk: 3
            //Fetched total: 3
            //Fetched chunk: 3
            //Fetched total: 6
            //Fetched chunk: 3
            //Fetched total: 9
        }

Apparently the last chunk is being "dropped" if there are less elements than chunk size. Any ideas?

Upvotes: 1

Views: 131

Answers (1)

virus
virus

Reputation: 144

We should call CompleteAdding() method to inform the GetConsumingEnumerable() that there are no more elements to add from its producer(s).

The below code change will fix your issue and prints the missing lines.

Task.Run(() =>
              {
                 foreach (var x in Enumerable.Range(0, 10))
                 {
                    queue.Add(x);
                 }
                    queue.CompleteAdding(); // After executing the line, IsCompleted property of queue will be true.
              });

Please refer this link for more info about GetConsumingEnumerable() of the BlockingCollection.

Upvotes: 2

Related Questions