让我们来详细讲解一下“C语言实现支持动态拓展和销毁的线程池”的完整攻略。
什么是线程池
线程池是一种线程管理技术,用来解决线程过多而导致系统负载过高的问题。在程序启动时,线程池会创建一定数量的线程,当有任务到达时,会将任务交给池中的线程执行。当所有线程都在工作时,新的任务就会进入等待队列,直到有线程完成任务后被唤醒。
实现线程池的步骤
初始化线程池
首先,我们需要初始化线程池。在定义线程池结构体时,需要包含线程池的状态、线程的数量、任务队列等信息。在初始化线程池时,需要创建线程、初始化互斥锁和条件变量等基础设置。
typedef struct {
pthread_t* threads; // 线程数组
int thread_count; // 线程数量
TaskQueue* task_queue; // 任务队列
pthread_mutex_t lock; // 互斥锁
pthread_cond_t notify; // 条件变量
int is_destroyed; // 是否销毁线程池
} ThreadPool;
void ThreadPoolInit(ThreadPool* pool, int thread_count) {
// 线程池状态初始化
pool->thread_count = thread_count;
pool->is_destroyed = 0;
// 初始化互斥锁和条件变量
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->notify, NULL);
// 创建线程数组
pool->threads = malloc(sizeof(pthread_t) * thread_count);
// 创建任务队列
pool->task_queue = TaskQueueInit();
// 创建线程
for (int i = 0; i < thread_count; i++) {
pthread_create(&pool->threads[i], NULL, ThreadPoolWorker, (void*)pool);
}
}
线程池中的工作线程
在线程池初始化时,我们创建了一些工作线程,这些线程负责从任务队列中取出任务并执行。下面是工作线程的代码示例:
void* ThreadPoolWorker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
// 获取任务
Task* task = TaskQueuePop(pool->task_queue);
// 执行任务
task->func(task->arg);
// 释放任务资源
TaskDestroy(task);
// 判断线程池是否即将销毁
pthread_mutex_lock(&pool->lock);
if (pool->is_destroyed && TaskQueueIsEmpty(pool->task_queue)) {
pthread_mutex_unlock(&pool->lock);
break;
}
pthread_mutex_unlock(&pool->lock);
}
pthread_exit(NULL);
}
添加任务
当有新的任务需要执行时,需要将任务添加到任务队列中。添加任务时需要先加锁,然后将任务添加到队列中,最后唤醒等待中的线程。
void ThreadPoolAddTask(ThreadPool* pool, Func func, void* arg) {
// 创建任务
Task* task = TaskCreate(func, arg);
// 加锁
pthread_mutex_lock(&pool->lock);
// 将任务添加到队列中
TaskQueuePush(pool->task_queue, task);
// 唤醒线程
pthread_cond_signal(&pool->notify);
// 解锁
pthread_mutex_unlock(&pool->lock);
}
销毁线程池
当不再需要线程池时,需要将线程池销毁。销毁时需要将线程池标记为已销毁,然后通知所有线程退出。销毁线程时需要依次释放线程相关资源,如线程数组和任务队列等。
void ThreadPoolDestroy(ThreadPool* pool) {
// 加锁
pthread_mutex_lock(&pool->lock);
// 标记线程池已销毁
pool->is_destroyed = 1;
// 唤醒所有线程
pthread_cond_broadcast(&pool->notify);
// 解锁
pthread_mutex_unlock(&pool->lock);
// 等待线程退出
for (int i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
// 释放线程数组和任务队列
free(pool->threads);
TaskQueueDestroy(pool->task_queue);
// 销毁互斥锁和条件变量
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->notify);
}
示例一
现在假设我们要编写一个多线程程序,用于计算一个范围内所有素数的和,我们可以使用线程池来提高程序的执行效率。首先我们需要定义一个用于计算素数的函数,如下所示:
int IsPrime(int n) {
if (n < 2) {
return 0;
}
for (int i = 2; i * i <= n; i++) {
if (n % i == 0) {
return 0;
}
}
return 1;
}
然后我们编写一个用于计算素数和的函数,如下所示:
void* SumPrimes(void* arg) {
SumPrimesArg* sarg = (SumPrimesArg*)arg;
int sum = 0;
for (int i = sarg->start; i <= sarg->end; i++) {
if (IsPrime(i)) {
sum += i;
}
}
pthread_mutex_lock(&sarg->mutex);
sarg->result += sum;
pthread_mutex_unlock(&sarg->mutex);
return NULL;
}
最后我们编写主函数,如下所示:
int main() {
// 初始化线程池
ThreadPool pool;
ThreadPoolInit(&pool, 4);
// 计算素数和
const int range = 1000000;
const int part = 4;
int step = range / part;
int start = 1, end = 0;
SumPrimesArg args[part];
pthread_mutex_t mutex;
pthread_mutex_init(&mutex, NULL);
for (int i = 0; i < part; i++) {
start = end + 1;
end += step;
if (i == part - 1) {
end = range;
}
args[i].start = start;
args[i].end = end;
args[i].result = 0;
args[i].mutex = mutex;
ThreadPoolAddTask(&pool, SumPrimes, &args[i]);
}
// 等待线程池结束
ThreadPoolDestroy(&pool);
// 输出结果
int result = 0;
for (int i = 0; i < part; i++) {
result += args[i].result;
}
printf("sum of primes: %d\n", result);
return 0;
}
在该示例中,我们将整个范围分成了4部分,并创建了4个线程来计算每个部分的素数和。最后等待所有线程执行完毕并汇总结果。
示例二
下面介绍如何使用线程池来实现网络服务器。假设我们要编写一个简单的echo服务器,该服务器可以同时处理多个客户端连接请求。我们可以使用线程池来管理服务器的多个客户端连接。
首先我们需要编写一个用于处理客户端连接的函数,如下所示:
void* HandleClient(void* arg) {
int client_fd = *(int*)arg;
free(arg);
// 处理客户端连接
char buf[1024];
int n = read(client_fd, buf, sizeof(buf));
write(client_fd, buf, n);
close(client_fd);
return NULL;
}
然后我们编写主函数,如下所示:
int main() {
int listen_fd = create_listen_socket(8888); // 创建监听套接字
// 初始化线程池
ThreadPool pool;
ThreadPoolInit(&pool, 4);
while (1) {
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
int client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_addr_len);
if (client_fd == -1) {
perror("accept");
continue;
}
// 添加任务到线程池
int* arg = malloc(sizeof(int));
*arg = client_fd;
ThreadPoolAddTask(&pool, HandleClient, arg);
}
ThreadPoolDestroy(&pool);
close(listen_fd);
return 0;
}
在该示例中,我们创建了监听套接字,并在主函数中使用accept函数等待客户端连接,每次接收到一个客户端连接时,将客户端连接套接字添加到任务队列中,由线程池中的线程来处理。
总结
通过以上两个示例,我们了解了如何使用线程池来提高程序的执行效率和并发性能。在编写多线程程序时,合理使用线程池可以更好地管理线程,避免因线程数量过多而导致系统负载过高的问题。同时,在使用线程池时需要注意线程安全问题,并及时释放资源以避免内存泄漏等问题。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C语言实现支持动态拓展和销毁的线程池 - Python技术站