C# 线程安全队列的用法原理及使用示例
什么是线程安全队列?
在线程并发编程中,多个线程同时访问共享数据结构时,会存在竞态条件(race condition)问题,可能导致数据不一致、数据丢失或程序崩溃等问题。为了解决这些问题,需要使用线程安全的数据结构进行并发操作,其中线程安全队列就是一种常见的数据结构。
线程安全队列是一种特殊的队列,能够在多线程并发的情况下安全地进行插入和删除操作。当多个线程同时对队列进行插入和删除时,队列可以保证每个操作执行的原子性,从而避免了多个线程之间的竞争条件问题,并保证了数据的一致性和完整性。
C# 中的线程安全队列类
C# 提供了多种线程安全的队列类,其中最常用的是ConcurrentQueue<T>
类。该类实现了IProducerConsumerCollection<T>
接口,提供了线程安全的操作方法。下面列出该类的常用方法:
Enqueue(T)
:将一个元素添加到队列的结尾。TryDequeue(out T)
:尝试获取并删除队列的第一个元素。如果队列为空,返回false
,否则返回true
。TryPeek(out T)
:尝试获取队列的第一个元素,但不删除该元素。如果队列为空,返回false
,否则返回true
。Count
:获取队列中元素的数量。
使用示例一:生产者消费者模式
生产者消费者模式是一种经典的多线程并发设计模式,其主要思想是将数据的生产和消费分离,由不同的线程来负责数据的生产和消费。线程安全队列是生产者消费者模式的重要组成部分,用于缓存数据并协调生产者和消费者的速度。
下面是使用ConcurrentQueue<T>
类实现的生产者消费者模式示例:
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
// 创建线程安全队列
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
// 创建生产者任务
Task producer = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 1000; i++)
{
queue.Enqueue(i); // 生产数据
Thread.Sleep(10); // 模拟生产过程
}
});
// 创建多个消费者任务
Task[] consumers = new Task[4];
for (int i = 0; i < 4; i++)
{
consumers[i] = Task.Factory.StartNew(() =>
{
while (true)
{
if (queue.TryDequeue(out int data))
{
Console.WriteLine("Consumed: {0}", data); // 消费数据
}
else if (producer.IsCompleted)
{
break; // 生产者已经结束,退出消费者任务
}
else
{
Thread.Sleep(1); // 防止过分占用 CPU 资源
}
}
});
}
// 等待所有任务完成
Task.WaitAll(consumers);
}
}
该示例中,主线程创建了一个线程安全队列queue
,然后创建了一个生产者任务和多个消费者任务。生产者任务使用一个循环来生产数据,并调用Enqueue
方法将数据放入队列中;消费者任务使用一个循环来从队列中消费数据,并调用TryDequeue
方法将数据取出。当生产者任务结束时,消费者任务也随之结束。
使用示例二:并行遍历列表
除了生产者消费者模式外,线程安全队列还可以用于并行遍历数据,例如列表(List<T>
)。使用ConcurrentQueue<T>
类可以将列表中的数据分为多个块,然后让多个线程并行遍历这些块,在遍历完成后将结果进行合并。
下面是使用ConcurrentQueue<T>
类实现并行遍历列表的示例:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
// 创建列表并填充数据
List<int> list = Enumerable.Range(1, 100000).ToList();
// 分割列表为多个块,并将块加入线程安全队列
ConcurrentQueue<List<int>> queue = new ConcurrentQueue<List<int>>();
int chunkSize = 1000;
for (int i = 0; i < list.Count; i += chunkSize)
{
List<int> chunk = list.GetRange(i, Math.Min(chunkSize, list.Count - i));
queue.Enqueue(chunk);
}
// 创建多个任务并行遍历列表块
Task[] tasks = new Task[Environment.ProcessorCount];
for (int i = 0; i < tasks.Length; i++)
{
tasks[i] = Task.Factory.StartNew(() =>
{
int sum = 0;
while (queue.TryDequeue(out List<int> chunk))
{
foreach (int item in chunk)
{
sum += item;
}
}
return sum;
});
}
// 等待所有任务完成并将结果合并
int totalSum = 0;
foreach (Task<int> task in tasks)
{
totalSum += task.Result;
}
Console.WriteLine("Total sum: {0}", totalSum);
}
}
该示例中,主线程先创建一个包含100000个元素的列表,并将该列表切分成100个大小为1000的块,并将这些块放入线程安全队列queue
中。然后,使用Task.Factory.StartNew
方法并行创建多个任务,在这些任务中遍历列表块并计算块中元素的总和,并将这些总和作为任务返回值。最后,主线程等待所有任务完成,并将所有任务返回值的和作为列表元素的总和输出到控制台。
总结
线程安全队列是一种非常实用的数据结构,可以用于多线程并发场景下的数据缓存和协调。在 C# 中,ConcurrentQueue<T>
类是线程安全队列的实现类,具有高效可靠的特点。在实际应用中,可以将线程安全队列作为集成并发编程模型的基础组件,有效提高程序的可扩展性和性能。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:c# 线程安全队列的用法原理及使用示例 - Python技术站