C#中神器类BlockingCollection的实现详解
什么是BlockingCollection
BlockingCollection
是 C# 中一个非常有用的线程安全的集合类,用于在多线程并发环境下进行数据的读取、写入和处理。它的用途非常广泛,比如在生产者-消费者模型中,用于协调生产者和消费者之间的数据传输,以及在大数据处理中,用于使用多个线程处理大规模的数据并发。
BlockingCollection的实现原理
BlockingCollection
是基于 ConcurrentQueue
实现的,底层用到了 Monitor.Wait()
和 Monitor.Pulse()
进行线程同步。在集合为空时,调用 Take()
方法的线程会被阻塞,直到队列中有可用的数据;在队列满时,调用 Add()
方法的线程会被阻塞,直到队列有空闲的空间。
BlockingCollection的基本用法
创建BlockingCollection
在使用 BlockingCollection
之前,首先需要进行实例化:
var collection = new BlockingCollection<T>();
其中,T 为要存储的数据类型。
添加数据
可以使用 Add()
方法向 BlockingCollection
中添加数据:
collection.Add(data);
如果队列已满,Add()
方法会阻塞当前线程,直到队列有可用的空间。
取出数据
可以使用 Take()
方法从 BlockingCollection
中取出数据:
var data = collection.Take();
如果队列为空,Take()
方法会阻塞当前线程,直到队列中有可用的数据。
遍历数据
可以使用 foreach
循环遍历 BlockingCollection
中的数据:
foreach(var data in collection)
{
// 处理数据
}
完成添加
在生产者向队列中添加完数据后,需要调用 CompleteAdding()
告知消费者已经完成添加操作:
collection.CompleteAdding();
判断是否有数据
可以使用 IsCompleted()
和 Count
属性判断集合是否已经完成添加,以及队列中当前的元素个数:
if(collection.IsCompleted && collection.Count == 0)
{
// 队列已经处理完毕
}
BlockingCollection的高级用法
设置队列的最大容量
可以在实例化 BlockingCollection
时指定队列的最大容量:
var collection = new BlockingCollection<T>(capacity);
设置超时等待
可以使用 Take()
和 Add()
方法的带有超时参数的方法,来指定线程的等待时间:
var data = collection.Take(timeout);
collection.Add(data, timeout);
设置取消标志
可以使用 CancellationToken
对象来取消队列的操作,例如:
var cts = new CancellationTokenSource();
Task.Run(() =>
{
while(!collection.IsAddingCompleted)
{
var data = collection.Take(cts.Token);
// 处理数据
}
});
// 取消操作
cts.Cancel();
示例1:生产者消费者模型
以下示例展示了 BlockingCollection
在生产者-消费者模型中的基本用法:
using System.Collections.Concurrent;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var collection = new BlockingCollection<int>(5);
// 生产者
Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
collection.Add(i);
Console.WriteLine($"生产者产生了数据{i},当前队列中元素个数为{collection.Count}");
}
// 完成添加
collection.CompleteAdding();
});
// 消费者
Task.Run(() =>
{
while (!collection.IsCompleted)
{
var data = collection.Take();
Console.WriteLine($"消费者消费了数据{data},当前队列中元素个数为{collection.Count}");
// 模拟耗时操作
Thread.Sleep(1000);
}
});
Console.Read();
}
}
在上面的示例中,生产者不停地向队列中添加数据,每当添加完一个数字时就会打印出当前队列中元素个数;消费者不停地从队列中取出数据,并且每当取出一个数字时会打印出当前队列中元素个数。
当所有的数据都被生产者添加进队列中后,消费者就会停止取出数据,而由于生产者调用了 CompleteAdding()
方法,所以消费者不会再等待添加操作,从而退出循环。
示例2:并发任务处理
以下示例展示了如何使用 BlockingCollection
实现并发任务处理:
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
var collection = new BlockingCollection<int>();
var sw = new Stopwatch();
sw.Start();
// 并发执行10个任务
for (int i = 0; i < 10; i++)
{
var task = Task.Run(() =>
{
for (int j = 0; j < 1000000; j++)
{
collection.Add(j);
}
});
}
Task.Run(() =>
{
while (!collection.IsCompleted)
{
var data = collection.Take();
// 处理数据
}
}).Wait();
sw.Stop();
Console.WriteLine($"处理1亿条数据所需时间:{sw.ElapsedMilliseconds} ms");
Console.Read();
}
}
在上面的示例中,我们启动了10个并发任务,每个任务都向 BlockingCollection
中添加了100万条数字数据。最后,我们启动一个消费者任务,从队列中取出所有的数据,进行处理。运行结果表明,在多线程并发情况下,使用 BlockingCollection
去协调和同步数据的读取和写入,可以极大地提升程序的运行效率。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C#中神器类BlockingCollection的实现详解 - Python技术站