C++基于消息队列的多线程实现示例代码

消息队列

消息队列是一种进程间通信的方式,用于不同进程之间的异步通信。消息队列允许发送者将消息存储在队列中,接收者可以在任何时间从队列中获取这些消息。这种通信方式可以提高系统的效率和可拓展性,因为它允许多个线程或进程同时处理消息。

C++基于消息队列的多线程实现示例代码

本文中我们将使用msgpack消息序列化/反序列化库和threadpool线程池库来实现一个基于消息队列的多线程程序。该程序的主要思路是:

  1. 消息队列存储任务数据,工作线程异步处理队列中的任务;
  2. 线程池负责管理工作线程的生命周期和任务分配。

以下是示例代码:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <msgpack.hpp>
#include "ThreadPool.h"

using namespace std;

struct TaskData {
    string task_name;
    string task_param;
};

class MessageQueue {
public:
    MessageQueue() {}

    void enqueue(TaskData data) {
        lock_guard<mutex> lock(queue_mutex);
        message_queue.push(data);
        queue_cond_var.notify_one();
    }

    TaskData dequeue() {
        unique_lock<mutex> lock(queue_mutex);
        queue_cond_var.wait(lock, [this]() { return !message_queue.empty(); });
        auto task = message_queue.front();
        message_queue.pop();
        return task;
    }

private:
    mutex queue_mutex;
    condition_variable queue_cond_var;
    queue<TaskData> message_queue;
};

class Worker {
public:
    Worker(int id, MessageQueue& queue) : worker_id(id), message_queue(queue) {}

    void operator()() {
        while(true) {
            TaskData task_data = message_queue.dequeue();
            cout << "Worker " << worker_id << " started " << task_data.task_name << endl;
            string result = this->processTask(task_data);
            cout << "Worker " << worker_id << " finished " << task_data.task_name << ": " << result << endl;
        }
    }

private:
    int worker_id;
    MessageQueue& message_queue;

    string processTask(TaskData data) {
        // do some task processing based on task parameters
        // ...
        return "done";
    }
};

int main(int argc, char* argv[]) {
    int num_threads = 4;
    ThreadPool pool(num_threads);
    MessageQueue queue;

    vector<thread> workers;
    for(int i=0; i<num_threads; i++) {
        workers.emplace_back(Worker(i, queue));
        pool.enqueue(workers.back());
    }

    for(int i=0; i<10; i++) {
        TaskData task_data;
        task_data.task_name = "Task " + to_string(i);
        task_data.task_param = "Param " + to_string(i);

        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> packer(&buffer);
        packer.pack(task_data);

        queue.enqueue(task_data);
    }

    return 0;
}

在上面的代码中,我们定义了一个MessageQueue类来实现消息队列,该类具有两个主要方法:

  • enqueue() - 将任务数据添加到队列中;
  • dequeue() - 从队列中获取任务数据。

每个工作线程(Worker)都会创建一个Worker类的实例,当一个任务被添加到队列中时,线程从队列中获取该任务并处理之。 每个任务由TaskData数据结构来表示,任务数据被序列化并使用消息队列传递到工作线程。 在示例代码中,我们使用msgpack库来序列化任务数据。

接下来,我们将使用threadpool库来实现线程池。 线程池被创建在主函数中,其中num_threads指定了线程池中工作线程的数量。 在实例化线程池之后,将工作线程添加到线程池中并开始处理队列中的任务。

示例代码中的结果将如下所示:

Worker 0 started Task 0
Worker 1 started Task 1
Worker 2 started Task 2
Worker 3 started Task 3
Worker 2 finished Task 2: done
Worker 0 finished Task 0: done
Worker 1 finished Task 1: done
Worker 3 finished Task 3: done
Worker 1 started Task 4
Worker 3 started Task 5
Worker 0 started Task 6
Worker 2 started Task 7
Worker 1 finished Task 4: done
Worker 3 finished Task 5: done
Worker 0 finished Task 6: done
Worker 2 finished Task 7: done
Worker 3 started Task 8
Worker 2 started Task 9
Worker 1 started Task 8
Worker 0 started Task 9
Worker 3 finished Task 8: done
Worker 0 finished Task 9: done
Worker 1 finished Task 8: done
Worker 2 finished Task 9: done

可以看出,程序中4个线程按照FIFO的方式处理任务队列中的任务,每个任务由对应的线程处理,并输出处理结果。每个任务的处理顺序可能不同,但任务之间的处理顺序在整个程序中是有序的。这个程序非常简洁并易于扩展,可用于处理异步任务,提高程序效率和可拓展性。

示例说明

示例1

假设我们正在编写一个由10个文件组成的音频处理程序。这些文件需要经过一些算法才能转换为最终格式。由于该任务是相对独立的,因此我们可以使用消息队列和多线程技术来处理它。

在这种情况下,我们可以将每个文件转换作为一个任务,并将任务定义为TaskData结构体。每个任务的名称可以是文件名,任务参数可以是转换算法的参数。当一个任务添加到消息队列时,线程池中的工作线程将处理该任务并将结果保存在指定的输出目录中。

示例2

假设我们正在编写一个Web服务器并需要处理大量HTTP请求。这些请求需要被解析、验证和转换。由于请求通常是相互独立的,因此我们可以使用多线程技术来处理它们。

使用多线程技术的一种方法是为每个请求创建一个工作线程,但这可能会导致系统负载过重并降低性能。在这种情况下,我们可以将消息队列用于存储请求,由一组工作线程异步处理该队列中的请求。每个请求可以表示为TaskData结构体,并在包含请求数据的消息队列中排队等待处理。在这种情况下,我们可以根据需要增加工作线程的数量,从而实现对系统负载和性能的更好控制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C++基于消息队列的多线程实现示例代码 - Python技术站

(0)
上一篇 2023年5月17日
下一篇 2023年5月17日

相关文章

  • Go语言中的并发goroutine底层原理

    Go语言中的并发goroutine底层原理 背景 Go语言被称为互联网时代的C语言,因为它具有高效的并发能力,支持使用轻量级的goroutine进行并发编程。在Go语言中,每个goroutine都代表着一个独立的线程,但是它们可以在同一时间运行且共享内存,因此能够实现高效的并发编程。 goroutine的实现原理 Go语言的goroutine是基于M:N线程…

    多线程 2023年5月17日
    00
  • Go使用sync.Map来解决map的并发操作问题

    Go语言中的map是一种非常常用的数据结构,但在多线程并发操作时,由于map没有自带的同步锁,会导致大量的并发问题。为此,Go语言提供了一个叫做 sync.Map 的类型,它是专门用于替代map在高并发环境下发生竞争时的解决方案。 下面就为大家详细介绍一下使用 sync.Map 解决map的并发问题的攻略。 sync.Map 概述 sync.Map 是 Go…

    多线程 2023年5月17日
    00
  • 基于并发服务器几种实现方法(总结)

    当我们在设计高并发服务器时,需要考虑使用哪种实现方法来提高服务器的并发处理能力,以下是几种基于并发服务器的常用实现方法: I/O 复用(select、poll、epoll) I/O 复用是通过一个进程管理多个 I/O 事件的模型,可以同时监听多个文件描述符,当其中任意一个文件描述符就绪时操作系统会通知进程进行读写操作。select、poll、epoll 都是…

    多线程 2023年5月16日
    00
  • Java使用代码模拟高并发操作的示例

    我来为你详细讲解Java使用代码模拟高并发操作的示例攻略。 1. 实现思路 高并发是指在同一时间内有大量的请求涌入到系统中,如何处理这些请求成为一个挑战。使用代码模拟高并发操作,则可以帮助我们评估系统在高并发情况下的稳定性和可靠性。实现思路如下: 定义一个接口或者方法,并为该方法添加synchronized关键字,确保该方法同一时间只能被一个线程访问,以模拟…

    多线程 2023年5月16日
    00
  • Java多线程的调度_动力节点Java学院整理

    Java多线程的调度_动力节点Java学院整理 概述 Java中的多线程是通过Thread类来实现的,一个线程即是Java中的一个Thread对象。多个线程可以同时执行,这种方式称为多线程并发执行。在多个线程并发执行时,操作系统会给每个线程分配一个时间片用于执行。由于时间片非常短,一般是几毫秒,因此看起来多个线程是同时执行的。 多线程的调度 在多线程并发执行…

    多线程 2023年5月17日
    00
  • web 性能测试中的几个关键指标(并发用户数,QPS,用户平均请求等待时间)

    在进行 Web 性能测试时,需要关注一些关键指标,以便评估网站的性能,提高用户体验和满意度。以下是几个重要的指标: 并发用户数 并发用户数指的是同时访问网站的用户数量。在进行并发测试时,需要模拟多个用户同时访问网站,以评估网站是否能够支持高并发。测试时需要逐步增加并发用户数,并记录每个用户请求的响应时间。通常,最大并发用户数是网站性能测试的一个重要指标。 示…

    多线程 2023年5月16日
    00
  • python thread 并发且顺序运行示例

    当我们在python中使用多线程编程时,为了保证多个线程能够在正确的顺序运行,我们需要进行线程同步操作,避免数据的竞争和混乱。下面我将提供两个示例来展示如何在python中使用线程同步操作实现并发且顺序运行的效果。 1. 通过Lock对象实现线程同步 首先我们需要导入threading模块中的Lock类,这是python内置的线程同步机制之一。在本次示例中,…

    多线程 2023年5月17日
    00
  • JAVA线程用法详解

    JAVA线程用法详解 线程基础知识 线程定义 线程可以理解为轻量级的进程,是程序执行的一条单独的路径。一个程序中通常可以有多个线程同时执行不同的任务,线程之间可以共享程序的数据和资源,因此其效率比多进程更高。 JAVA中,线程是Thread类的实例,在程序中启动和控制线程的执行需要调用Thread类中的方法。 线程状态 线程的状态可以分为以下5种: 新建状态…

    多线程 2023年5月17日
    00
合作推广
合作推广
分享本页
返回顶部