rudimenter
rudimenter

Reputation: 3460

TPL Dataflow block which delays the forward of the message to the next block

I require a Dataflow block which delays the forward of the message to the next block based on the timestamp in the message (LogEntry).

This is what i came up with but it feels not right. Any suggestions for improvements?

  private IPropagatorBlock<LogEntry, LogEntry> DelayedForwardBlock()
    {
        var buffer = new ConcurrentQueue<LogEntry>();

        var source = new BufferBlock<LogEntry>();

        var target = new ActionBlock<LogEntry>(item =>
        {
            buffer.Enqueue(item);
        });


        Task.Run(() =>
            {
                LogEntry entry;
                while (true)
                {
                    entry = null;
                    if (buffer.TryPeek(out entry))
                    {
                        if (entry.UtcTimestamp < (DateTime.UtcNow - TimeSpan.FromMinutes(5)))
                        {
                            buffer.TryDequeue(out entry);
                            source.Post(entry);
                        }
                    }
                }
            });


        target.Completion.ContinueWith(delegate
        {
            LogEntry entry;
            while (buffer.TryDequeue(out entry))
            {
                source.Post(entry);
            }

            source.Complete();
        });

        return DataflowBlock.Encapsulate(target, source);
    }

Upvotes: 4

Views: 1401

Answers (1)

i3arnon
i3arnon

Reputation: 116586

You could simply use a single TransformBlock that asynchronously waits out the delay using Task.Delay:

IPropagatorBlock<TItem, TItem> DelayedForwardBlock<TItem>(TimeSpan delay)
{
    return new TransformBlock<TItem, TItem>(async item =>
    {
        await Task.Delay(delay);
        return item;
    });
}

Usage:

var block = DelayedForwardBlock<LogEntry>(TimeSpan.FromMinutes(5));

Upvotes: 10

Related Questions