Implement asynchronous-friendly producer-consumer (Pub / Sub) patterns in System.Threading.Channels

5 minute read

I often made it myself, but it’s difficult to implement and I don’t have much time, so if I’m in trouble, Tell me a good one So I will summarize the simple usage.

Aetos-san Thank you very much!

What is this?

This is a Microsoft-made library that helps you implement one of the common design patterns for multithreaded programming, the producer / consumer pattern (recently, the Pub / Sub pattern is better communicated?). It has a high affinity with async / await, is easy to use, and operates at high speed.

Reference material

  1. Use System.Threading.Channels
  2. C# Channels - Publish / Subscribe Workflows

In particular, article 1 is also compared with other libraries, and it is highly recommended that you read it.

Producer: Consumer = 1: 1 implementation example

Let’s start with the simplest example. There is only one producer and one consumer, but when a producer puts an item in a queue, he wants the consumer to process it asynchronously. This is almost the same as Reference. A simple and great sample.

static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleReader = true,
            SingleWriter = true
        });

    var consumer = Task.Run(async () =>
    {
        while (await channel.Reader.WaitToReadAsync())
        {
            Console.WriteLine(await channel.Reader.ReadAsync());
        }
    });
    var producer = Task.Run(async () =>
    {
        var rnd = new Random();
        for (int i = 0; i < 5; i++)
        {
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            await channel.Writer.WriteAsync($"Message {i}");
        }
        channel.Writer.Complete();
    });

    await Task.WhenAll(producer, consumer);            
    Console.WriteLine("Completed.");
}

First of all, the Channel object corresponding to the queue (+ α) is created.

var channel = Channel.CreateUnbounded<string>(
    new UnboundedChannelOptions
    {
        SingleReader = true,
        SingleWriter = true
    });

CreateUnbounded creates a queue with no size limit. CreateBounded allows you to create queues with size restrictions.

Also, it seems that the performance will be improved a little by setting the SingleReader and SingleWriter of UnboundedChannelOptions to true and restricting them. The default is false.

Next, create a Consumer to process the queued items.

var consumer = Task.Run(async () =>
{
    while (await channel.Reader.WaitToReadAsync())
    {
        Console.WriteLine(await channel.Reader.ReadAsync());
    }
});

The WaitToReadAsync method checks if the Channel is closed (all processing is complete), and ReadAsync gets the item and processes it.

Next, let’s look at the producer side.

var producer = Task.Run(async () =>
{
    var rnd = new Random();
    for (int i = 0; i < 5; i++)
    {
        await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
        await channel.Writer.WriteAsync($"Message {i}");
    }
    channel.Writer.Complete();
});

Items are registered with WriteAsync while inserting a delay of 3 seconds or less in Random.

After writing 5 items, the channel is closed with Complete.

At the end, it waits for the processing of producers and consumers before finishing.

await Task.WhenAll(producer, consumer);            
Console.WriteLine("Completed.");

Producer: Consumer = 1: n implementation example

Example of reference material It would be nice if the processing time of the queued items is even. , It is inappropriate when the processing time differs depending on the item.

In that case, use it as follows.

Note that CreateUnbounded only puts a Single constraint on the Writer side.

static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleWriter = true
        });
    
    var consumers = Enumerable
        .Range(1, 3)    //Get a number from 1 to 3
        .Select(consumerNumber =>
            Task.Run(async () =>
            {
                while (await channel.Reader.WaitToReadAsync())
                {
                    if (channel.Reader.TryRead(out var item))
                    {
                        Console.WriteLine($"Consumer:{consumerNumber} {item}");
                    }
                }
            }));
    var producer = Task.Run(async () =>
    {
        var rnd = new Random();
        for (var i = 0; i < 5; i++)
        {
            await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
            await channel.Writer.WriteAsync($"Message {i}");
        }
        channel.Writer.Complete();
    });

    await Task.WhenAll(consumers.Union(new[] {producer}));
    Console.WriteLine("Completed.");
}

First of all, generate multiple consumers.

var consumers = Enumerable
    .Range(1, 3)    //Get a number from 1 to 3
    .Select(consumerNumber =>
        Task.Run(async () =>
        {
            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var item))
                {
                    Console.WriteLine($"Consumer:{consumerNumber} {item}");
                }
            }
        }));

The important point here is that we are using TryRead instead of ReadAsync to retrieve the item.

On Channel, consumers wait for an item to be queued with WaitToReadAsync, but once an item is registered, all producers are “waked up”. When using ReadAsync, the second and subsequent consumers who go to pick up the item will get an exception because there is no item.

Therefore, by using TryRead, it is necessary to implement it so that it will be processed only when there is still an item.

The rest is done by waiting for the processing of all consumers and producers to finish.

await Task.WhenAll(consumers.Union(new[] {producer}));
Console.WriteLine("Completed.");

Implementation example of producer: consumer = n: 1

Example of reference material shows how to create and merge multiple channels, but originally Channel It is not always necessary to create multiple Channels as it itself supports multiple inputs.

So the following is a simple example. Please note that CreateUnbounded puts a Single constraint only on the Reader side.

static async Task Main(string[] args)
{
    var channel = Channel.CreateUnbounded<string>(
        new UnboundedChannelOptions
        {
            SingleReader = true
        });

    var consumer = Task.Run(async () =>
    {
        while (await channel.Reader.WaitToReadAsync())
        {
            Console.WriteLine(await channel.Reader.ReadAsync());
        }
    });

    var producers = Enumerable
        .Range(1, 3)
        .Select(producerNumber =>Task.Run(async () =>
        {
            var rnd = new Random();
            for (var i = 0; i < 5; i++)
            {
                await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
                await channel.Writer.WriteAsync($"Producer:{producerNumber} Message {i}");
            }
        }));

    await Task.WhenAll(producers);
    channel.Writer.Complete();

    await consumer;
    Console.WriteLine("Completed.");
}

The generation on the Consumer side is the same as in the case of 1: 1.

The difference is the implementation on the Producer side. At the time of 1: 1, before and after the for loop was implemented as follows.
I completed and closed the Writer outside the for loop, but this closes the Producer that was completed first, and an error occurs in the subsequent Producer processing.

for (int i = 0; i < 5; i++)
{
    await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
    await channel.Writer.WriteAsync($"Message {i}");
}
channel.Writer.Complete();

Therefore, make sure to complete after all Producer processing is completed as follows.

await Task.WhenAll(producers);
channel.Writer.Complete();

The actual implementation here depends on the case, but if you don’t have to wait for all the producer’s processing, you can also use TryWrite instead of WriteAsync like this:

//await channel.Writer.WriteAsync($"Producer:{producerNumber} Message {i}");
channel.Writer.TryWrite($"Producer:{producerNumber} Message {i}");

If Complete isn’t closed in more than one place, try Use TryComplete.

that’s all.