Golang WorkerPool线程池并发模式示例详解

Golang WorkerPool线程池并发模式示例详解

简介

WorkerPool即工作池,也称为线程池。它是一种并发编程模式,通常用于解决并发问题。在WorkerPool中,创建固定数量的worker,他们并行地从池中获取任务,并在处理任务时将其标记为完成。当所有可用的Worker都在使用时,新任务将被放入队列中,并等待有空闲的Worker。

原理

WorkerPool的主要优点是控制线程达到最大效率,最大限度地利用CPU时间,由于每个Worker都支持并发,因此可以处理大量的任务,并且可以在应用程序中实现流控制。以下是WorkerPool的基本原理:

  • 创建一个worker池,该池中包含了可以进行并发处理的固定数量的worker。

  • 工人从池中获取任务并执行任务。

  • 当处理完成任务时,工作人员将结果放回池中。

  • 如果池中没有可用的worker,则将任务放入队列中,并等待worker可用时将其分配。

示例一

在此示例中,将展示如何在Golang中实现WorkerPool模式。此示例将创建两个worker并向池中添加任务。

package main

import (
    "time"
    "fmt"
)

type job struct {
    id       int
    randomno int
}

type result struct {
    job         job
    sumofdigits int
}

var jobs = make(chan job, 10)
var results = make(chan result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

func worker(w int) {
    for job := range jobs {
        output := result{job, digits(job.randomno)}
        results <- output
    }
}

func createWorkerPool(noOfWorkers int) {
    for i := 0; i < noOfWorkers; i++ {
        go worker(i)
    }
}

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := 99
        job := job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

func result(done chan bool) {
    for result := range results {
        fmt.Printf("job id %d, input random no %d, sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits) 
    }
    done <- true
}

func main() {
    startTime := time.Now()
    noOfJobs := 10
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 2
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diffTime := endTime.Sub(startTime)
    fmt.Println("total time taken ", diffTime.Seconds(), "seconds")
}

运行后的输出结果为

job id 0, input random no 99, sum of digits 18
job id 4, input random no 99, sum of digits 18
job id 2, input random no 99, sum of digits 18
job id 3, input random no 99, sum of digits 18
job id 1, input random no 99, sum of digits 18
job id 6, input random no 99, sum of digits 18
job id 5, input random no 99, sum of digits 18
job id 8, input random no 99, sum of digits 18
job id 7, input random no 99, sum of digits 18
job id 9, input random no 99, sum of digits 18
total time taken  2.156952 seconds

示例二

在此示例中,将演示如何创建带有worker-pool的HTTP服务器。

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
)

var jobs = make(chan string, 10)
var results = make(chan string, 10)

func worker(w int) {
    for job := range jobs {
        resp, err := http.Get(job)
        if err == nil {
            bodyBytes, err := ioutil.ReadAll(resp.Body)
            if err == nil {
                results <- fmt.Sprintf("Worker id %d requested url %s and status code %s length is %d" , w, job, resp.Status, len(bodyBytes))
            }
        } else {
            results <- fmt.Sprintf("Worker id %d requested url %s but received error %v", w, job, err)
        }
    }
}

func createWorkerPool(noOfWorkers int) {
    for i := 0; i < noOfWorkers; i++ {
        go worker(i)
    }
}

func allocate(w http.ResponseWriter, req *http.Request) {
    url := req.URL.Query().Get("url")
    if url == "" {
        http.Error(w, "Please provide a URL", http.StatusBadRequest)
        return
    }
    jobs <- url
    w.WriteHeader(http.StatusCreated)
    return
}

func result(w http.ResponseWriter, req *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.WriteHeader(http.StatusOK)
    flusher := w.(http.Flusher)
    for {
        select {
        case result := <-results:
            fmt.Fprintf(w, "data: {\"message\": \"%s\"}\n\n", result)
            flusher.Flush()
        }
    }
}

func main() {
    noOfWorkers := 2
    createWorkerPool(noOfWorkers)
    http.HandleFunc("/allocate", allocate)
    http.HandleFunc("/result", result)
    http.ListenAndServe(":8080", nil)
}

运行示例后,在浏览器中打开http://localhost:8080/allocate?url=http://www.google.com,会在控制台上看到worker完成了请求和响应,像这样:

Worker id 1 requested url http://www.google.com and status code 200 OK length is 11662

结论

Go语言支持WorkerPool线程池并发模式,使应用程序更加高效。希望以上示例可以为你提供一些灵感和帮助。

阅读剩余 76%

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Golang WorkerPool线程池并发模式示例详解 - Python技术站

(0)
上一篇 2023年5月17日
下一篇 2023年5月17日

相关文章

  • Java并发编程示例(二):获取和设置线程信息

    首先介绍一下本文的目的和背景。 Java 是一门非常重要的编程语言,支持多线程编程。在多线程编程时,很重要的一点就是了解线程的状态和信息。本文将介绍如何获取和设置线程的信息,包括线程状态、线程的优先级和线程的名称。 获取线程信息 获取线程状态 线程状态是指线程当前所处的状态,常用的线程状态有: NEW:线程创建后的初始状态 RUNNABLE:线程正在运行或可…

    多线程 2023年5月16日
    00
  • python实现多线程的方式及多条命令并发执行

    首先,Python可以通过多线程编程技术实现多条命令的并发执行,从而提高程序的执行效率。本文将为大家详细讲解Python实现多线程的方式及多条命令并发执行的攻略。 实现多线程的方式 Python实现多线程可以通过以下两种方式: 使用threading模块创建线程。 继承Thread类并重写run()方法实现线程。 本文以第一种方式为例进行讲解。 使用thre…

    多线程 2023年5月16日
    00
  • Node.js 多线程完全指南总结

    Node.js 多线程完全指南总结 简介 Node.js是一种事件驱动的、非阻塞式I/O的JavaScript运行时环境,通常用于服务器端的编程应用。虽然Node.js主要是单线程的,但是它是支持多线程操作的。本文将详细讲解Node.js多线程的概念和指南,并附上一些示例说明。 如何创建多线程 Node.js多线程最常用的方式是使用cluster模块和chi…

    多线程 2023年5月17日
    00
  • C#多线程系列之手动线程通知

    让我详细讲解一下“C#多线程系列之手动线程通知”的完整攻略。 简介 多线程是指在一个应用程序中同时运行多个线程,每个线程都可以独立执行不同的任务。C#多线程中,为了保证线程协作的正确性,需要手动进行线程通知,而本文就是一篇关于手动线程通知的攻略。 实现手动线程通知的方式 实现手动线程通知的方式有好几种。以下是手动线程通知的三种实现方式: AutoResetE…

    多线程 2023年5月16日
    00
  • C语言由浅入深讲解线程的定义

    C语言线程定义攻略 什么是线程 线程是一种执行路径,是进程中的一个执行流程。一个进程可以拥有多个线程,每个线程都可以独立执行,但是它们都共享相同的资源。 线程的优势 线程可以极大的提高程序的运行效率。当程序的某部分需要长时间运行时,通过创建线程可以使得该部分程序有多个执行流程,让每个线程独立的运行。这样就能提高程序运行效率,减少用户等待时间,提高用户体验。 …

    多线程 2023年5月16日
    00
  • Android版多线程下载 仿下载助手(最新)

    下面是《Android版多线程下载 仿下载助手(最新)》的完整攻略。 一、项目说明 本项目为 Android 版本多线程下载,实现了仿照下载助手的功能,支持多线程下载、暂停和继续下载、断点续传、下载速度统计等等。 二、环境配置 首先,我们需要安装以下环境: JDK Android Studio Git 三、下载源码 我们可以在 GitHub 上将项目克隆到本…

    多线程 2023年5月16日
    00
  • 详解C++ thread用法总结

    详解C++ thread用法总结 什么是C++ thread? C++ thread是一个多线程库,用于在C++中实现多线程编程。多线程是指在同一时间内执行多个线程,从而实现并发执行的目的。C++ thread为程序员提供了创建、启动、等待、终止线程以及互斥锁、条件变量等并发编程工具。 C++ thread用法总结 创建和启动线程 在C++中创建和启动线程可…

    多线程 2023年5月17日
    00
  • Java多线程中Lock锁的使用总结

    Java多线程中Lock锁的使用总结 什么是Lock? 在Java中,Lock是一种比synchronized更加灵活、功能更加强大的线程同步机制。它可以提供比传统的synchronized更为广泛的锁定操作。 Lock和synchronized的对比 锁的获取方式 synchronized是隐式获取锁,只要进入synchronized保护的代码段,锁就会自…

    多线程 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部