生产者消费者模式与消息队列

什么是生产者消费者模式

生产者消费者模式是一种常见的并发设计模式,用于在多线程或分布式系统中协调生产者和消费者之间的工作。

class Producer
{
    public async Task<string> ProduceAsync()
    {
        // 生产数据
        await Task.Yield(); // 模拟异步操作
        return "数据";
    }
}

class Consumer
{
    public async Task ConsumeAsync(string data)
    {
        // 消费数据
        Console.WriteLine($"消费了: {data}");
    }
}

var producer = new Producer();
var consumer = new Consumer();

var product = await producer.ProduceAsync();
consumer.Consume(product);

如上的代码是一个简单的生产者消费者模式的实现,生产者生产数据,消费者消费数据。

其中使用了异步语法,不过我们可以忽略,仅考虑其中同步部分。

假设消费者消费的很慢,我们需要多个消费者同时进行消费,同时需要一个容器来协调生产者和消费者之间的工作,像这样的容器就叫做消息队列。

class Producer
{
    private bool someCondition = true; // 模拟条件
    public async Task<string> ProduceAsync()
    {
        // 生产数据
        await Task.Yield(); // 模拟异步操作
        return "数据";
    }
}

class SlowConsumer
{
    public async Task ConsumeAsync(string data)
    {
        // 消费数据
        Thread.Sleep(1000); // 模拟慢速消费
        Console.WriteLine($"消费了: {data}");
    }
}

var producer = new Producer();
var consumer = new SlowConsumer();
var messageQueue = new ConcurrentQueue<string>(); // 普通的 Queue 不支持并发访问,我们这里使用 ConcurrentQueue 作为容器

messageQueue.Enqueue(await producer.ProduceAsync());
messageQueue.Enqueue(await producer.ProduceAsync());

await Task.WhenAll(
    consumer.ConsumeAsync(messageQueue.Dequeue()),
    consumer.ConsumeAsync(messageQueue.Dequeue())
);

像这样,我们就可以使用消息队列来协调生产者和消费者之间的工作了。生产者将数据放入消息队列中,消费者从消息队列中取出数据进行消费。这样即使消费者消费的很慢,也不会阻塞生产者的生产过程。 但是,使用 ConcurrentQueue 作为消息队列有一个问题,就是它是无界的,如果生产者生产的速度远远大于消费者消费的速度,那么消息队列就会无限增长,最终导致内存溢出。

var stop = false;
var producerTask = Task.Run(async () =>
{
    while (stop == false)
    {
        messageQueue.Enqueue(await producer.ProduceAsync());
    }
});
var consumerTask = Task.Run(async () =>
{
    while (stop == false)
    {
        if (messageQueue.TryDequeue(out var data))
        {
            await consumer.ConsumeAsync(data);
        }
    }
});
await Task.WhenAny(producerTask, consumerTask, Task.Delay(10000)); // 运行 10 秒后结束
stop = true;
Console.WriteLine("运行结束");

在内存不足的机器上,像如上的代码很快就会导致内存溢出,甚至导致整个系统崩溃。为了避免这种情况,我们需要一个有界的消息队列容器来限制消息队列的大小,同时要一个机制通知生产者停止生产,直到消费者消费完消息队列中的数据。

更好的消息队列容器

许多语言都有线程的实用的消息队列容器,例如 Go 里的 channel, .NET 里的 ChannelBlockingCollection,Java 里的 BlockingQueue 等等。我们可以使用这些容器来更优雅的实现生产者消费者模式。 如下是 C# 里一个比较优雅的实现,使用了 Channel 来作为消息队列容器。

var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(10) // 限制消息队列的大小为 10
{
    // SingleReader = true, // 只有一个消费者
    // SingleWriter = true, // 只有一个生产者
    FullMode = BoundedChannelFullMode.Wait // 当消息队列满时,生产者等待
});

var producerTask = Task.Run(async () =>
{
    try {
        while (true)
        {
            await channel.Writer.WriteAsync(await producer.ProduceAsync(), cancellationTokenSource.Token);
        }
    } catch(ChannelClosedException) {
        // 这时消息队列已经关闭,生产者停止生产
    }
    Console.WriteLine("生产者已停止生产");
});

var consumerTask = Task.Run(async () =>
{
    await foreach (var data in channel.Reader.ReadAllAsync())
    {
        await consumer.ConsumeAsync(data);
    }
    Console.WriteLine("消费者已完成消费");
});

await Task.WhenAny(producerTask, consumerTask, Task.Delay(10000)); // 运行 10 秒后结束
await channel.Writer.CompleteAsync(); // 通知消费者消息队列已经关闭
await Task.WhenAll(producerTask, consumerTask); // 等待生产者和消费者都结束

Console.WriteLine("运行结束");

这里我们使用了 Channel.CreateBounded 来创建一个有界的消息队列,限制了消息队列的大小为 10。

当消息队列满时,生产者调用的 channel.Writer.WriteAsync 方法会等待,十分自然的将消费者的消费压力传播到了生产者,实现了背压(Back Pressure)。同时我们使用了 channel.Writer.CompleteAsync() 来通知消费者消息队列已经关闭,消费者在消费完消息队列中的数据后会自动结束。