消息队列
消息队列是一种进程间通信的方式,用于不同进程之间的异步通信。消息队列允许发送者将消息存储在队列中,接收者可以在任何时间从队列中获取这些消息。这种通信方式可以提高系统的效率和可拓展性,因为它允许多个线程或进程同时处理消息。
C++基于消息队列的多线程实现示例代码
本文中我们将使用msgpack消息序列化/反序列化库和threadpool线程池库来实现一个基于消息队列的多线程程序。该程序的主要思路是:
- 消息队列存储任务数据,工作线程异步处理队列中的任务;
- 线程池负责管理工作线程的生命周期和任务分配。
以下是示例代码:
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <msgpack.hpp>
#include "ThreadPool.h"
using namespace std;
struct TaskData {
string task_name;
string task_param;
};
class MessageQueue {
public:
MessageQueue() {}
void enqueue(TaskData data) {
lock_guard<mutex> lock(queue_mutex);
message_queue.push(data);
queue_cond_var.notify_one();
}
TaskData dequeue() {
unique_lock<mutex> lock(queue_mutex);
queue_cond_var.wait(lock, [this]() { return !message_queue.empty(); });
auto task = message_queue.front();
message_queue.pop();
return task;
}
private:
mutex queue_mutex;
condition_variable queue_cond_var;
queue<TaskData> message_queue;
};
class Worker {
public:
Worker(int id, MessageQueue& queue) : worker_id(id), message_queue(queue) {}
void operator()() {
while(true) {
TaskData task_data = message_queue.dequeue();
cout << "Worker " << worker_id << " started " << task_data.task_name << endl;
string result = this->processTask(task_data);
cout << "Worker " << worker_id << " finished " << task_data.task_name << ": " << result << endl;
}
}
private:
int worker_id;
MessageQueue& message_queue;
string processTask(TaskData data) {
// do some task processing based on task parameters
// ...
return "done";
}
};
int main(int argc, char* argv[]) {
int num_threads = 4;
ThreadPool pool(num_threads);
MessageQueue queue;
vector<thread> workers;
for(int i=0; i<num_threads; i++) {
workers.emplace_back(Worker(i, queue));
pool.enqueue(workers.back());
}
for(int i=0; i<10; i++) {
TaskData task_data;
task_data.task_name = "Task " + to_string(i);
task_data.task_param = "Param " + to_string(i);
msgpack::sbuffer buffer;
msgpack::packer<msgpack::sbuffer> packer(&buffer);
packer.pack(task_data);
queue.enqueue(task_data);
}
return 0;
}
在上面的代码中,我们定义了一个MessageQueue
类来实现消息队列,该类具有两个主要方法:
enqueue()
- 将任务数据添加到队列中;dequeue()
- 从队列中获取任务数据。
每个工作线程(Worker
)都会创建一个Worker
类的实例,当一个任务被添加到队列中时,线程从队列中获取该任务并处理之。 每个任务由TaskData
数据结构来表示,任务数据被序列化并使用消息队列传递到工作线程。 在示例代码中,我们使用msgpack库来序列化任务数据。
接下来,我们将使用threadpool库来实现线程池。 线程池被创建在主函数中,其中num_threads
指定了线程池中工作线程的数量。 在实例化线程池之后,将工作线程添加到线程池中并开始处理队列中的任务。
示例代码中的结果将如下所示:
Worker 0 started Task 0
Worker 1 started Task 1
Worker 2 started Task 2
Worker 3 started Task 3
Worker 2 finished Task 2: done
Worker 0 finished Task 0: done
Worker 1 finished Task 1: done
Worker 3 finished Task 3: done
Worker 1 started Task 4
Worker 3 started Task 5
Worker 0 started Task 6
Worker 2 started Task 7
Worker 1 finished Task 4: done
Worker 3 finished Task 5: done
Worker 0 finished Task 6: done
Worker 2 finished Task 7: done
Worker 3 started Task 8
Worker 2 started Task 9
Worker 1 started Task 8
Worker 0 started Task 9
Worker 3 finished Task 8: done
Worker 0 finished Task 9: done
Worker 1 finished Task 8: done
Worker 2 finished Task 9: done
可以看出,程序中4个线程按照FIFO的方式处理任务队列中的任务,每个任务由对应的线程处理,并输出处理结果。每个任务的处理顺序可能不同,但任务之间的处理顺序在整个程序中是有序的。这个程序非常简洁并易于扩展,可用于处理异步任务,提高程序效率和可拓展性。
示例说明
示例1
假设我们正在编写一个由10个文件组成的音频处理程序。这些文件需要经过一些算法才能转换为最终格式。由于该任务是相对独立的,因此我们可以使用消息队列和多线程技术来处理它。
在这种情况下,我们可以将每个文件转换作为一个任务,并将任务定义为TaskData
结构体。每个任务的名称可以是文件名,任务参数可以是转换算法的参数。当一个任务添加到消息队列时,线程池中的工作线程将处理该任务并将结果保存在指定的输出目录中。
示例2
假设我们正在编写一个Web服务器并需要处理大量HTTP请求。这些请求需要被解析、验证和转换。由于请求通常是相互独立的,因此我们可以使用多线程技术来处理它们。
使用多线程技术的一种方法是为每个请求创建一个工作线程,但这可能会导致系统负载过重并降低性能。在这种情况下,我们可以将消息队列用于存储请求,由一组工作线程异步处理该队列中的请求。每个请求可以表示为TaskData
结构体,并在包含请求数据的消息队列中排队等待处理。在这种情况下,我们可以根据需要增加工作线程的数量,从而实现对系统负载和性能的更好控制。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C++基于消息队列的多线程实现示例代码 - Python技术站