How to use BlockingCollection
When I was reading the code, I saw something called Blocking Collection
and I wasn’t sure, so I looked it up. Mainly BlockingCollection in C # –Introduction and Examples.
What is Blocking Collection?
BlockingCollection
is an implementation of the Producer-Consumer pattern. There is an interface called ʻIPublisherConsumerCollection
By default, it seems to use ConcurrentQueue
internally, but it seems that you can change it to use that class by passing a class that implements ʻIProducerConsumerCollection
_IProducerConsumerCollection
public interface IProducerConsumerCollection<T> : IEnumerable<T>, IEnumerable, ICollection
{
void CopyTo(T[] array, int index);
T[] ToArray();
bool TryAdd(T item);
bool TryTake([MaybeNullWhen(false)] out T item);
}
constructor
Let’s actually check the behavior. Simply instantiate the BlockingCollection
class. You can specify the maximum value for this collection by passing boundedCapacity
. As mentioned earlier, it is possible to pass an implementation other than ConcurrentQueue
here.
var blockingCollection = new BlockingCollection<string>(boundedCapacity: 3);
Producer
Since it is said that concurrent is OK, create a thread and add elements using ʻAdd. Here, we are passing what was entered from the console. The point of behavior is that when the
boundedCapacity specified in the constructor above is exceeded, the ʻAdd
method blocks and waits for the Consumer to retrieve the item.
Task producerThread = Task.Factory.StartNew(() =>
{
while (true)
{
string command = Console.ReadLine();
if (command.Contains("quit")) break;
blockingCollection.Add(command); // blocked if it reach the capacity
}
});
If you don’t like this standby behavior, there is also a TryAdd
method. In the case of this method, if it is blocked for a certain period of time, it can be regarded as “failed” and processed. Some overloads have a CancellationToken
.
if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
{
// it works!
}
else
{
Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
}
Consumer
You can get one item by the Take
method. If there is no instance of BlockingCollection
, it will be blocked here. You can use the blockingCollection.IsComplete
method to be notified that the BlockingCollection
has finished.
NOTE
By the way, in this sample, if you do something crap like .GetAwaiter (). GetResult ()
, if you set Task.Factory.StartNew (async () => {}
, it’s because of async. This is because the execution of labmda is blocked, it is considered to have ended immediately, and the process of waiting for the end of this thread in the WaitAll
method that comes out later does not work well. To be honest, there seems to be a better way. Production So, since I use async / await, I haven’t started the thread directly, so it’s not a problem, but I want to get rid of it.
Task consumerAThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command = blockingCollection.Take();
Console.WriteLine($"ConsumerA: Take Received: {command}");
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
The TryTake
method is a method that causes a block to fail after a certain amount of time.
Task consumerBThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command;
if (blockingCollection.TryTake(out command, TimeSpan.FromSeconds(5)))
{
Console.WriteLine($"ConsumerB: TryTake Received: {command}");
}
else
{
Console.WriteLine($"consumerB: Can't take now.");
}
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
CancellationToken
The TryXXX
method supports CancellationToken
, so you can take advantage of it. When a CancellationToken
is issued, a ʻOperationCanceledException` is thrown.
CancellationTokenSource source = new CancellationTokenSource();
Task consumerBThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command;
try
{
if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
{
Console.WriteLine($"ConsumerB: TryTake Received: {command}");
}
else
{
Console.WriteLine($"consumerB: Can't take now.");
}
} catch (OperationCanceledException e)
{
Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
break;
}
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
Share the big picture of the program. Run it, type something in, and the Producer will populate the BlockingCollection
with the item. You can observe the blocking behavior of Producer
, ConsumerA
, ConsumerB
. Typing cancel
will issue a CancellationToken
and exit. Alternatively, exit with quit
.
class Program
{
static void Main(string[] args)
{
int capacity = 3;
// Blocking Collection
var blockingCollection = new BlockingCollection<string>(boundedCapacity: capacity);
CancellationTokenSource source = new CancellationTokenSource();
Task producerThread = Task.Factory.StartNew(() =>
{
while (true)
{
string command = Console.ReadLine();
if (command.Contains("quit")) break;
if (command.Contains("cancel"))
{
Console.WriteLine("Cancelling ...");
source.Cancel();
break;
}
// blockingCollection.Add(command); // blocked if it reach the capacity
if (blockingCollection.TryAdd(command, TimeSpan.FromSeconds(1)))
{
// it works!
}
else
{
Console.WriteLine($"It reached boundedCapacity: {capacity} couldn't add {command}");
}
}
});
Task consumerAThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command = blockingCollection.Take();
Console.WriteLine($"ConsumerA: Take Received: {command}");
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
Task consumerBThread = Task.Factory.StartNew(() =>
{
while (true)
{
if (blockingCollection.IsCompleted) break;
string command;
try
{
if (blockingCollection.TryTake(out command, (int)TimeSpan.FromSeconds(5).TotalMilliseconds, source.Token))
{
Console.WriteLine($"ConsumerB: TryTake Received: {command}");
}
else
{
Console.WriteLine($"consumerB: Can't take now.");
}
} catch (OperationCanceledException e)
{
Console.WriteLine($"ConsumerB: Task is cancelled.: {e.Message}");
break;
}
Task.Delay(TimeSpan.FromSeconds(10)).GetAwaiter().GetResult();
}
});
Task.WaitAll(producerThread, consumerAThread, consumerBThread);
}
}
Source
This is a sample.