C++基于消息队列的多线程实现示例代码

消息队列

消息队列是一种进程间通信的方式,用于不同进程之间的异步通信。消息队列允许发送者将消息存储在队列中,接收者可以在任何时间从队列中获取这些消息。这种通信方式可以提高系统的效率和可拓展性,因为它允许多个线程或进程同时处理消息。

C++基于消息队列的多线程实现示例代码

本文中我们将使用msgpack消息序列化/反序列化库和threadpool线程池库来实现一个基于消息队列的多线程程序。该程序的主要思路是:

  1. 消息队列存储任务数据,工作线程异步处理队列中的任务;
  2. 线程池负责管理工作线程的生命周期和任务分配。

以下是示例代码:

#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <msgpack.hpp>
#include "ThreadPool.h"

using namespace std;

struct TaskData {
    string task_name;
    string task_param;
};

class MessageQueue {
public:
    MessageQueue() {}

    void enqueue(TaskData data) {
        lock_guard<mutex> lock(queue_mutex);
        message_queue.push(data);
        queue_cond_var.notify_one();
    }

    TaskData dequeue() {
        unique_lock<mutex> lock(queue_mutex);
        queue_cond_var.wait(lock, [this]() { return !message_queue.empty(); });
        auto task = message_queue.front();
        message_queue.pop();
        return task;
    }

private:
    mutex queue_mutex;
    condition_variable queue_cond_var;
    queue<TaskData> message_queue;
};

class Worker {
public:
    Worker(int id, MessageQueue& queue) : worker_id(id), message_queue(queue) {}

    void operator()() {
        while(true) {
            TaskData task_data = message_queue.dequeue();
            cout << "Worker " << worker_id << " started " << task_data.task_name << endl;
            string result = this->processTask(task_data);
            cout << "Worker " << worker_id << " finished " << task_data.task_name << ": " << result << endl;
        }
    }

private:
    int worker_id;
    MessageQueue& message_queue;

    string processTask(TaskData data) {
        // do some task processing based on task parameters
        // ...
        return "done";
    }
};

int main(int argc, char* argv[]) {
    int num_threads = 4;
    ThreadPool pool(num_threads);
    MessageQueue queue;

    vector<thread> workers;
    for(int i=0; i<num_threads; i++) {
        workers.emplace_back(Worker(i, queue));
        pool.enqueue(workers.back());
    }

    for(int i=0; i<10; i++) {
        TaskData task_data;
        task_data.task_name = "Task " + to_string(i);
        task_data.task_param = "Param " + to_string(i);

        msgpack::sbuffer buffer;
        msgpack::packer<msgpack::sbuffer> packer(&buffer);
        packer.pack(task_data);

        queue.enqueue(task_data);
    }

    return 0;
}

在上面的代码中,我们定义了一个MessageQueue类来实现消息队列,该类具有两个主要方法:

  • enqueue() - 将任务数据添加到队列中;
  • dequeue() - 从队列中获取任务数据。

每个工作线程(Worker)都会创建一个Worker类的实例,当一个任务被添加到队列中时,线程从队列中获取该任务并处理之。 每个任务由TaskData数据结构来表示,任务数据被序列化并使用消息队列传递到工作线程。 在示例代码中,我们使用msgpack库来序列化任务数据。

接下来,我们将使用threadpool库来实现线程池。 线程池被创建在主函数中,其中num_threads指定了线程池中工作线程的数量。 在实例化线程池之后,将工作线程添加到线程池中并开始处理队列中的任务。

示例代码中的结果将如下所示:

Worker 0 started Task 0
Worker 1 started Task 1
Worker 2 started Task 2
Worker 3 started Task 3
Worker 2 finished Task 2: done
Worker 0 finished Task 0: done
Worker 1 finished Task 1: done
Worker 3 finished Task 3: done
Worker 1 started Task 4
Worker 3 started Task 5
Worker 0 started Task 6
Worker 2 started Task 7
Worker 1 finished Task 4: done
Worker 3 finished Task 5: done
Worker 0 finished Task 6: done
Worker 2 finished Task 7: done
Worker 3 started Task 8
Worker 2 started Task 9
Worker 1 started Task 8
Worker 0 started Task 9
Worker 3 finished Task 8: done
Worker 0 finished Task 9: done
Worker 1 finished Task 8: done
Worker 2 finished Task 9: done

可以看出,程序中4个线程按照FIFO的方式处理任务队列中的任务,每个任务由对应的线程处理,并输出处理结果。每个任务的处理顺序可能不同,但任务之间的处理顺序在整个程序中是有序的。这个程序非常简洁并易于扩展,可用于处理异步任务,提高程序效率和可拓展性。

示例说明

示例1

假设我们正在编写一个由10个文件组成的音频处理程序。这些文件需要经过一些算法才能转换为最终格式。由于该任务是相对独立的,因此我们可以使用消息队列和多线程技术来处理它。

在这种情况下,我们可以将每个文件转换作为一个任务,并将任务定义为TaskData结构体。每个任务的名称可以是文件名,任务参数可以是转换算法的参数。当一个任务添加到消息队列时,线程池中的工作线程将处理该任务并将结果保存在指定的输出目录中。

示例2

假设我们正在编写一个Web服务器并需要处理大量HTTP请求。这些请求需要被解析、验证和转换。由于请求通常是相互独立的,因此我们可以使用多线程技术来处理它们。

使用多线程技术的一种方法是为每个请求创建一个工作线程,但这可能会导致系统负载过重并降低性能。在这种情况下,我们可以将消息队列用于存储请求,由一组工作线程异步处理该队列中的请求。每个请求可以表示为TaskData结构体,并在包含请求数据的消息队列中排队等待处理。在这种情况下,我们可以根据需要增加工作线程的数量,从而实现对系统负载和性能的更好控制。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:C++基于消息队列的多线程实现示例代码 - Python技术站

(0)
上一篇 2023年5月17日
下一篇 2023年5月17日

相关文章

  • Go语言CSP并发模型goroutine及channel底层实现原理

    Go语言CSP并发模型goroutine及channel底层实现原理 前言 Go语言的并发模型引入了CSP(通讯顺序进程),该模型与传统的线程和锁的并发模型不同,更加灵活和高效。在Go语言中,对并发的支持主要是通过goroutine和channel实现的。 Goroutine Goroutine是Go语言并发模型的核心,是一种比线程更加轻量级的并发处理方式,…

    多线程 2023年5月16日
    00
  • Kotlin协程Job生命周期结构化并发详解

    下面是”Kotlin协程Job生命周期结构化并发详解”的完整攻略: Kotlin协程Job生命周期结构化并发详解 概述 Kotlin协程是一种非阻塞式的并发处理机制,它可以极大地简化并发编程。其中一个核心概念就是协程的Job,Job代表了协程的执行任务。在实际使用中,Job可以用来管理和控制协程的生命周期以及取消协程的执行。 本文将详细讲解Kotlin协程J…

    多线程 2023年5月17日
    00
  • Python多线程threading创建及使用方法解析

    Python多线程threading创建及使用方法解析 什么是线程 在计算机中,线程指的是进程中一个单一顺序的控制流。一个进程可以由多个线程组成,每个线程都可以运行不同的代码和指令。线程与进程的不同在于,进程是由操作系统负责调度,而线程则是由进程调度。在多线程编程中,多个线程可以同时运行,提高程序运行效率。 Python多线程threading模块 Pyth…

    多线程 2023年5月16日
    00
  • C#集合之并发集合的用法

    C#集合之并发集合的用法 什么是并发集合 并发集合是一组C#中线程安全的集合类型,允许多个线程同时对一个集合进行读写操作,而不需要进行额外的同步处理。在多线程的场景下,使用并发集合可以提高代码的并发性能,避免多线程访问同一个集合时可能出现的线程安全问题。 .NET Framework提供了多种并发集合类型,包括ConcurrentDictionary、Con…

    多线程 2023年5月16日
    00
  • 详解在SpringBoot如何优雅的使用多线程

    下面我将详细讲解在SpringBoot如何优雅地使用多线程。 为什么需要使用多线程 在程序中使用多线程可以充分发挥多核处理器的性能,提升程序执行效率。而在SpringBoot中使用多线程,可以进一步提升Web应用的性能和响应速度。 多线程的应用场景 应用场景通常包括: 并发请求:同时处理多个请求 异步调用:在一个方法中异步执行耗时的操作,从而减少阻塞等待的时…

    多线程 2023年5月17日
    00
  • Go 并发实现协程同步的多种解决方法

    Go 并发实现协程同步的多种解决方法 在 Go 编程中,对于大量协程的并发执行,我们经常需要对它们进行同步控制,以保证协程之间的正确互动和信息传递。本文介绍 Go 实现协程同步的常用方法,包括使用 WaitGroup、channel、Mutex 等。 使用 WaitGroup 举个例子,我们可能需要同时开启多个协程进行图片下载,且需要等所有协程下载完毕才能继…

    多线程 2023年5月16日
    00
  • linux并发连接50万的配置方法

    首先,要实现Linux系统并发连接50万的配置,需要考虑以下几个方面: 网络优化 调整TCP的参数,包括window size、backlog、max_tw_buckets等,其中window size模拟并发连接很重要。 增加网卡数量,选择高速网卡,如万兆以太网卡,可以提高网络带宽及IO能力。 使用高效的协议栈,如Google的BBR协议。 资源优化 内核…

    多线程 2023年5月16日
    00
  • java多线程并发executorservice(任务调度)类

    Java多线程并发的的Executors类提供了一种创建和管理线程池的方式,其中Executors.newFixedThreadPool(int n)和Executors.newCachedThreadPool()方法最常用。 Executors.newFixedThreadPool ExecutorService executor = Executors.…

    多线程 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部