生产者消费者模式与消息队列
什么是生产者消费者模式
生产者消费者模式是一种常见的并发设计模式,用于在多线程或分布式系统中协调生产者和消费者之间的工作。
class Producer
{
public async Task<string> ProduceAsync()
{
// 生产数据
await Task.Yield(); // 模拟异步操作
return "数据";
}
}
class Consumer
{
public async Task ConsumeAsync(string data)
{
// 消费数据
Console.WriteLine($"消费了: {data}");
}
}
var producer = new Producer();
var consumer = new Consumer();
var product = await producer.ProduceAsync();
consumer.Consume(product);
如上的代码是一个简单的生产者消费者模式的实现,生产者生产数据,消费者消费数据。
其中使用了异步语法,不过我们可以忽略,仅考虑其中同步部分。
假设消费者消费的很慢,我们需要多个消费者同时进行消费,同时需要一个容器来协调生产者和消费者之间的工作,像这样的容器就叫做消息队列。
class Producer
{
private bool someCondition = true; // 模拟条件
public async Task<string> ProduceAsync()
{
// 生产数据
await Task.Yield(); // 模拟异步操作
return "数据";
}
}
class SlowConsumer
{
public async Task ConsumeAsync(string data)
{
// 消费数据
Thread.Sleep(1000); // 模拟慢速消费
Console.WriteLine($"消费了: {data}");
}
}
var producer = new Producer();
var consumer = new SlowConsumer();
var messageQueue = new ConcurrentQueue<string>(); // 普通的 Queue 不支持并发访问,我们这里使用 ConcurrentQueue 作为容器
messageQueue.Enqueue(await producer.ProduceAsync());
messageQueue.Enqueue(await producer.ProduceAsync());
await Task.WhenAll(
consumer.ConsumeAsync(messageQueue.Dequeue()),
consumer.ConsumeAsync(messageQueue.Dequeue())
);
像这样,我们就可以使用消息队列来协调生产者和消费者之间的工作了。生产者将数据放入消息队列中,消费者从消息队列中取出数据进行消费。这样即使消费者消费的很慢,也不会阻塞生产者的生产过程。
但是,使用 ConcurrentQueue 作为消息队列有一个问题,就是它是无界的,如果生产者生产的速度远远大于消费者消费的速度,那么消息队列就会无限增长,最终导致内存溢出。
var stop = false;
var producerTask = Task.Run(async () =>
{
while (stop == false)
{
messageQueue.Enqueue(await producer.ProduceAsync());
}
});
var consumerTask = Task.Run(async () =>
{
while (stop == false)
{
if (messageQueue.TryDequeue(out var data))
{
await consumer.ConsumeAsync(data);
}
}
});
await Task.WhenAny(producerTask, consumerTask, Task.Delay(10000)); // 运行 10 秒后结束
stop = true;
Console.WriteLine("运行结束");
在内存不足的机器上,像如上的代码很快就会导致内存溢出,甚至导致整个系统崩溃。为了避免这种情况,我们需要一个有界的消息队列容器来限制消息队列的大小,同时要一个机制通知生产者停止生产,直到消费者消费完消息队列中的数据。
更好的消息队列容器
许多语言都有线程的实用的消息队列容器,例如 Go 里的 channel, .NET 里的 Channel 和 BlockingCollection,Java 里的 BlockingQueue 等等。我们可以使用这些容器来更优雅的实现生产者消费者模式。
如下是 C# 里一个比较优雅的实现,使用了 Channel 来作为消息队列容器。
var channel = Channel.CreateBounded<string>(new BoundedChannelOptions(10) // 限制消息队列的大小为 10
{
// SingleReader = true, // 只有一个消费者
// SingleWriter = true, // 只有一个生产者
FullMode = BoundedChannelFullMode.Wait // 当消息队列满时,生产者等待
});
var producerTask = Task.Run(async () =>
{
try {
while (true)
{
await channel.Writer.WriteAsync(await producer.ProduceAsync(), cancellationTokenSource.Token);
}
} catch(ChannelClosedException) {
// 这时消息队列已经关闭,生产者停止生产
}
Console.WriteLine("生产者已停止生产");
});
var consumerTask = Task.Run(async () =>
{
await foreach (var data in channel.Reader.ReadAllAsync())
{
await consumer.ConsumeAsync(data);
}
Console.WriteLine("消费者已完成消费");
});
await Task.WhenAny(producerTask, consumerTask, Task.Delay(10000)); // 运行 10 秒后结束
await channel.Writer.CompleteAsync(); // 通知消费者消息队列已经关闭
await Task.WhenAll(producerTask, consumerTask); // 等待生产者和消费者都结束
Console.WriteLine("运行结束");
这里我们使用了 Channel.CreateBounded 来创建一个有界的消息队列,限制了消息队列的大小为 10。
当消息队列满时,生产者调用的 channel.Writer.WriteAsync 方法会等待,十分自然的将消费者的消费压力传播到了生产者,实现了背压(Back Pressure)。同时我们使用了 channel.Writer.CompleteAsync() 来通知消费者消息队列已经关闭,消费者在消费完消息队列中的数据后会自动结束。