基于条件变量的消息队列是一种进程间通信机制,适用于多线程环境。在使用过程中,需要注意线程同步和互斥的问题。
什么是条件变量
条件变量是线程间同步的一种方式,线程可以调用它的wait()方法将自己阻塞,直到其他线程调用signal()方法才能重新运行。条件变量常和互斥锁配合使用,锁用来保护数据,条件变量用来等待特定条件的发生。
消息队列
消息队列是一种消息传递机制,用来在进程间或线程间传递数据。
条件变量实现消息队列
基于条件变量的消息队列可以用下面的数据结构表示:
#define QUEUE_SIZE 10
struct Queue {
int buffer[QUEUE_SIZE];
int head;
int tail;
int size;
pthread_mutex_t mutex;
pthread_cond_t cond_full;
pthread_cond_t cond_empty;
};
这里的Queue是一个循环队列,buffer数组用来存储数据,head和tail记录队首和队尾的位置,size记录队列中现有元素的个数。mutex是互斥锁,用来保护队列的操作;cond_full和cond_empty是条件变量,用来提示队列是否满或空以及通知变化。
消息队列示例
接下来,我们通过两个示例来展示基于条件变量的消息队列的实现。
示例1:生产者-消费者
下面是一个简单的生产者-消费者程序:
void *producer(void *arg){
struct Queue *queue = (struct Queue *) arg;
while(1) {
pthread_mutex_lock(&queue->mutex);
while(queue->size == QUEUE_SIZE) {
pthread_cond_wait(&queue->cond_full, &queue->mutex);
}
int item = rand() % 1000;
printf("producer produced %d\n", item);
queue->buffer[queue->tail] = item;
queue->tail = (queue->tail + 1) % QUEUE_SIZE;
queue->size++;
pthread_cond_signal(&queue->cond_empty);
pthread_mutex_unlock(&queue->mutex);
usleep(rand() % 1000000);
}
return NULL;
}
void *consumer(void *arg){
struct Queue *queue = (struct Queue *) arg;
while(1) {
pthread_mutex_lock(&queue->mutex);
while(queue->size == 0) {
pthread_cond_wait(&queue->cond_empty, &queue->mutex);
}
int item = queue->buffer[queue->head];
queue->head = (queue->head + 1) % QUEUE_SIZE;
queue->size--;
printf("consumer consumed %d\n", item);
pthread_cond_signal(&queue->cond_full);
pthread_mutex_unlock(&queue->mutex);
usleep(rand() % 1000000);
}
return NULL;
}
int main(){
struct Queue q;
q.head = 0;
q.tail = 0;
q.size = 0;
pthread_mutex_init(&q.mutex, NULL);
pthread_cond_init(&q.cond_full, NULL);
pthread_cond_init(&q.cond_empty, NULL);
pthread_t t1, t2;
pthread_create(&t1, NULL, producer, &q);
pthread_create(&t2, NULL, consumer, &q);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return 0;
}
在这个程序中,生产者和消费者各自运行在一个线程里,它们通过向消息队列中加入或删除数据来完成通信。线程之间通过条件变量和互斥锁来同步,确保生产者只在队列不满的时候把数据添加到队尾,消费者只在队列不空的时候从队头取出数据。
示例2:线程池
下面是一个简单的线程池程序:
void *threadpool_function(void *arg){
struct Queue *queue = (struct Queue *) arg;
while(1) {
pthread_mutex_lock(&queue->mutex);
while(queue->size == 0) {
pthread_cond_wait(&queue->cond_empty, &queue->mutex);
}
int fd = queue->buffer[queue->head];
queue->head = (queue->head + 1) % QUEUE_SIZE;
queue->size--;
printf("thread %d processing %d\n", pthread_self(), fd);
pthread_cond_signal(&queue->cond_full);
pthread_mutex_unlock(&queue->mutex);
handle_connection(fd); // 处理连接
}
return NULL;
}
int main(){
struct Queue q;
q.head = 0;
q.tail = 0;
q.size = 0;
pthread_mutex_init(&q.mutex, NULL);
pthread_cond_init(&q.cond_full, NULL);
pthread_cond_init(&q.cond_empty, NULL);
pthread_t t1, t2, t3;
pthread_create(&t1, NULL, threadpool_function, &q);
pthread_create(&t2, NULL, threadpool_function, &q);
pthread_create(&t3, NULL, threadpool_function, &q);
int listenfd = open_listenfd(8080);
while(1) {
int connfd = accept(listenfd, NULL, NULL);
pthread_mutex_lock(&q.mutex);
while(queue->size == QUEUE_SIZE) {
pthread_cond_wait(&queue->cond_full, &queue->mutex);
}
queue->buffer[queue->tail] = connfd;
queue->tail = (queue->tail + 1) % QUEUE_SIZE;
queue->size++;
pthread_cond_signal(&queue->cond_empty);
pthread_mutex_unlock(&queue->mutex);
}
return 0;
}
在这个程序中,线程池由三个线程组成,它们共享一个队列。main线程不断接收客户端请求,并把套接字描述符放到队列中,工作线程从队列中取出任务,并处理连接请求。线程之间通过条件变量和互斥锁来同步,确保队列不满不空。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:基于条件变量的消息队列 说明介绍 - Python技术站