How to use BlockingCollection

4 minute read

When I was reading the code, I saw something called Blocking Collection and I wasn’t sure, so I looked it up. Mainly BlockingCollection in C # –Introduction and Examples.

What is Blocking Collection?

BlockingCollection is an implementation of the Producer-Consumer pattern. There is an interface called ʻIPublisherConsumerCollection `, which is a thread-safe implementation. In other words, it works well even in concurrent situations.

By default, it seems to use ConcurrentQueue internally, but it seems that you can change it to use that class by passing a class that implements ʻIProducerConsumerCollection ` in the constructor. The interface is as follows:

_IProducerConsumerCollection_

public interface IProducerConsumerCollection<T> : IEnumerable<T>, IEnumerable, ICollection
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
    bool TryAdd(T item);
    bool TryTake([MaybeNullWhen(false)] out T item);
}

constructor

Let’s actually check the behavior. Simply instantiate the BlockingCollection class. You can specify the maximum value for this collection by passing boundedCapacity. As mentioned earlier, it is possible to pass an implementation other than ConcurrentQueue here.

var blockingCollection = new BlockingCollection<string>(boundedCapacity: 3);

Producer

Since it is said that concurrent is OK, create a thread and add elements using ʻAdd. Here, we are passing what was entered from the console. The point of behavior is that when the boundedCapacity specified in the constructor above is exceeded, the ʻAdd method blocks and waits for the Consumer to retrieve the item.

Task producerThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        string command = Console.ReadLine();
        if (command.Contains("quit")) break;
        blockingCollection.Add(command);   // blocked if it reach the capacity
    }
});

If you don’t like this standby behavior, there is also a TryAdd method. In the case of this method, if it is blocked for a certain period of time, it can be regarded as “failed” and processed. Some overloads have a CancellationToken.

if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
{
        // it works!
}
else
{
    Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
}

Consumer

You can get one item by the Take method. If there is no instance of BlockingCollection, it will be blocked here. You can use the blockingCollection.IsComplete method to be notified that the BlockingCollection has finished.

NOTE
By the way, in this sample, if you do something crap like .GetAwaiter (). GetResult (), if you set Task.Factory.StartNew (async () => {}, it’s because of async. This is because the execution of labmda is blocked, it is considered to have ended immediately, and the process of waiting for the end of this thread in the WaitAll method that comes out later does not work well. To be honest, there seems to be a better way. Production So, since I use async / await, I haven’t started the thread directly, so it’s not a problem, but I want to get rid of it.

Task consumerAThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (blockingCollection.IsCompleted) break;
        string command = blockingCollection.Take();
        Console.WriteLine($"ConsumerA: Take Received: {command}");
        Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
    }
});

The TryTake method is a method that causes a block to fail after a certain amount of time.

Task consumerBThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (blockingCollection.IsCompleted) break;
        string command;
        if (blockingCollection.TryTake(out command, TimeSpan.FromSeconds(5)))
        {
            Console.WriteLine($"ConsumerB: TryTake Received: {command}");
        }
        else
        {
            Console.WriteLine($"consumerB: Can't take now.");
        }
        Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
    }
});

CancellationToken

The TryXXX method supports CancellationToken, so you can take advantage of it. When a CancellationToken is issued, a ʻOperationCanceledException` is thrown.

CancellationTokenSource source = new CancellationTokenSource();
Task consumerBThread = Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (blockingCollection.IsCompleted) break;
        string command;
        try
        {
            if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
            {
                Console.WriteLine($"ConsumerB: TryTake Received: {command}");
            }
            else
            {
                Console.WriteLine($"consumerB: Can't take now.");
            }
        } catch (OperationCanceledException e)
        {
            Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
            break;
        }
        Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
    }
}); 

Share the big picture of the program. Run it, type something in, and the Producer will populate the BlockingCollection with the item. You can observe the blocking behavior of Producer, ConsumerA, ConsumerB. Typing cancel will issue a CancellationToken and exit. Alternatively, exit with quit.

class Program
{
    static void Main(string[] args)
    {
        int capacity = 3;
        // Blocking Collection 
        var blockingCollection = new BlockingCollection<string>(boundedCapacity: capacity);
        CancellationTokenSource source = new CancellationTokenSource();

        Task producerThread = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                string command = Console.ReadLine();
                if (command.Contains("quit")) break;
                if (command.Contains("cancel"))
                {
                    Console.WriteLine("Cancelling ...");
                    source.Cancel();
                    break;
                }
                // blockingCollection.Add(command);   // blocked if it reach the capacity
                if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
                {
                        // it works!
                }
                else
                {
                    Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
                }
            }
        });
        Task consumerAThread = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                if (blockingCollection.IsCompleted) break;
                string command = blockingCollection.Take();
                Console.WriteLine($"ConsumerA: Take Received: {command}");
                Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
            }
        });
        Task consumerBThread = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                if (blockingCollection.IsCompleted) break;
                string command;
                try
                {
                    if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
                    {
                        Console.WriteLine($"ConsumerB: TryTake Received: {command}");
                    }
                    else
                    {
                        Console.WriteLine($"consumerB: Can't take now.");
                    }
                } catch (OperationCanceledException e)
                {
                    Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
                    break;
                }
                Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
            }
        });
        Task.WaitAll(producerThread, consumerAThread, consumerBThread);
    }
}

Source
This is a sample.

Tags:

Updated: