C/C++原生API实现线程池,可以通过以下步骤来实现:
第一步:定义线程池结构体
线程池结构体的定义通常包含以下几个成员:
- 线程池中线程的数量:通过这个成员可以控制线程池中线程的数量
- 任务队列:用来存储要执行的任务
- 线程池是否正在运行:通过这个成员可以控制是否继续执行任务
定义如下:
typedef struct threadpool {
int thread_num; // 线程池线程数量
int queue_size; // 任务队列长度
pthread_t *threads; // 线程数组
pthread_mutex_t lock; // 互斥锁
pthread_cond_t notify; // 条件变量
task *task_queue; // 任务队列
int queue_head; // 任务队列头
int queue_tail; // 任务队列尾
int queue_count; // 任务队列中任务数量
int shutdown; // 线程池是否关闭
} threadpool;
第二步:实现任务结构体
任务结构体存储要执行的任务,其中包含以下成员:
- 任务函数指针:用来存储要执行的任务函数
- 任务参数:执行任务所需的参数
定义如下:
typedef struct {
void (*function)(void *arg); // 任务函数指针
void *arg; // 任务参数
} task;
第三步:初始化线程池
线程池的初始化包含以下步骤:
- 初始化锁和条件变量
- 创建线程数组
- 创建任务队列
- 启动线程
具体代码如下:
threadpool *threadpool_create(int thread_num, int queue_size)
{
threadpool *pool;
int i;
// 判断输入参数是否合法
if ((pool = (threadpool *)malloc(sizeof(threadpool))) == NULL)
{
goto err;
}
// 初始化线程池变量
pool->thread_num = 0;
pool->queue_size = queue_size;
pool->queue_head = 0;
pool->queue_tail = 0;
pool->queue_count = 0;
pool->shutdown = 0;
// 创建线程数组
pool->threads = (pthread_t *)malloc(thread_num * sizeof(pthread_t));
if (pool->threads == NULL)
{
goto err;
}
// 创建任务队列
pool->task_queue = (task *)malloc(queue_size * sizeof(task));
if (pool->task_queue == NULL)
{
goto err;
}
// 初始化锁和条件变量
if ((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
(pthread_cond_init(&(pool->notify), NULL) != 0))
{
goto err;
}
// 创建线程
for (i = 0; i < thread_num; i++)
{
if (pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void*)pool) != 0)
{
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_num++;
}
return pool;
err:
if (pool)
{
threadpool_destroy(pool, 0);
}
return NULL;
}
第四步:实现任务添加函数
任务添加函数用来向任务队列中添加任务。实现步骤如下:
- 首先需要获得线程池的锁
- 添加任务到任务队列中
- 通知正在等待任务的线程有任务可供执行
具体代码如下:
int threadpool_add(threadpool *pool, void (*function)(void *), void *arg)
{
int err = 0;
int next_tail = 0;
// 获得线程池的锁
if (pthread_mutex_lock(&(pool->lock)) != 0)
{
return THREADPOOL_LOCK_FAILURE;
}
next_tail = (pool->queue_tail + 1) % pool->queue_size;
do
{
// 判断任务队列是否已满
if (pool->queue_count == pool->queue_size)
{
err = THREADPOOL_QUEUE_FULL;
break;
}
// 添加任务到任务队列中
pool->task_queue[pool->queue_tail].function = function;
pool->task_queue[pool->queue_tail].arg = arg;
pool->queue_tail = next_tail;
pool->queue_count += 1;
// 通知正在等待任务的线程有任务可供执行
if (pthread_cond_signal(&(pool->notify)) != 0)
{
err = THREADPOOL_LOCK_FAILURE;
break;
}
} while (0);
// 释放线程池的锁
if (pthread_mutex_unlock(&pool->lock) != 0)
{
err = THREADPOOL_LOCK_FAILURE;
}
return err;
}
第五步:实现线程函数
线程函数用来从任务队列中取出任务并执行。线程函数应该不断地从任务队列中取出任务执行,或者等待新的任务到来执行。具体代码如下:
void *threadpool_thread(void *threadpool)
{
threadpool *pool = (threadpool *)threadpool;
task task;
for (;;)
{
// 获得线程池的锁
pthread_mutex_lock(&(pool->lock));
// 循环直到有任务可供执行
while ((pool->queue_count == 0) && (!pool->shutdown))
{
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
if ((pool->shutdown) && (pool->queue_count == 0))
{
break;
}
// 取出任务并执行
task.function = pool->task_queue[pool->queue_head].function;
task.arg = pool->task_queue[pool->queue_head].arg;
pool->queue_head = (pool->queue_head + 1) % pool->queue_size;
pool->queue_count -= 1;
// 释放线程池的锁
pthread_mutex_unlock(&(pool->lock));
(*(task.function))(task.arg); // 执行任务
}
// 销毁线程池
pthread_mutex_unlock(&(pool->lock));
pthread_exit(NULL);
return (NULL);
}
第六步:实现线程池销毁函数
线程池销毁函数用来销毁线程池。销毁线程池时,应该将线程池标记为关闭状态,通知所有等待任务的线程线程池已关闭,然后等待所有线程完成任务并退出。具体代码如下:
int threadpool_destroy(threadpool *pool, int flags)
{
int err = 0;
int i;
if (pool == NULL)
{
return THREADPOOL_INVALID;
}
// 获得线程池的锁
if (pthread_mutex_lock(&(pool->lock)) != 0)
{
return THREADPOOL_LOCK_FAILURE;
}
// 标记线程池为关闭状态
pool->shutdown = 1;
// 发送通知给所有等待任务的线程
if ((pthread_cond_broadcast(&(pool->notify)) != 0) ||
(pthread_mutex_unlock(&(pool->lock)) != 0))
{
err = THREADPOOL_LOCK_FAILURE;
goto err;
}
// 等待所有线程完成任务并退出
for (i = 0; i < pool->thread_num; i++)
{
if (pthread_join(pool->threads[i], NULL) != 0)
{
err = THREADPOOL_THREAD_FAILURE;
}
}
if (!flags)
{
free(pool->threads);
free(pool->task_queue);
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
free(pool);
return err;
err:
pthread_mutex_unlock(&(pool->lock));
return err;
}
下面是一个示例代码:
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include "threadpool.h"
#define THREAD 4
#define QUEUE 100
typedef struct {
int a;
int b;
} test_task_t;
void task_test(void *arg)
{
test_task_t *task = (test_task_t *)arg;
int i, sum = 0;
for (i = 0; i < task->a; i++)
{
sum += task->b;
}
printf("sum=%d\n", sum);
}
int main(int argc, char **argv)
{
int a[4] = {1, 2, 3, 4};
int b[4] = {1, 10, 100, 1000};
int i;
threadpool *pool;
test_task_t task[4];
pool = threadpool_create(THREAD, QUEUE);
for (i = 0; i < 4; i++)
{
task[i].a = a[i];
task[i].b = b[i];
threadpool_add(pool, task_test, &task[i]);
}
sleep(5);
threadpool_destroy(pool, 0);
return 0;
}
在这个示例中,首先定义了一个test_task_t结构体存储任务参数a和b,然后创建了一个线程池,向线程池中添加4个任务,每个任务都是对test_task_t结构体进行处理。最后,等待5秒钟后销毁线程池。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C/C++ 原生API实现线程池的方法 - Python技术站