Golang WorkerPool线程池并发模式示例详解

yizhihongxing

Golang WorkerPool线程池并发模式示例详解

简介

WorkerPool即工作池,也称为线程池。它是一种并发编程模式,通常用于解决并发问题。在WorkerPool中,创建固定数量的worker,他们并行地从池中获取任务,并在处理任务时将其标记为完成。当所有可用的Worker都在使用时,新任务将被放入队列中,并等待有空闲的Worker。

原理

WorkerPool的主要优点是控制线程达到最大效率,最大限度地利用CPU时间,由于每个Worker都支持并发,因此可以处理大量的任务,并且可以在应用程序中实现流控制。以下是WorkerPool的基本原理:

  • 创建一个worker池,该池中包含了可以进行并发处理的固定数量的worker。

  • 工人从池中获取任务并执行任务。

  • 当处理完成任务时,工作人员将结果放回池中。

  • 如果池中没有可用的worker,则将任务放入队列中,并等待worker可用时将其分配。

示例一

在此示例中,将展示如何在Golang中实现WorkerPool模式。此示例将创建两个worker并向池中添加任务。

package main

import (
    "time"
    "fmt"
)

type job struct {
    id       int
    randomno int
}

type result struct {
    job         job
    sumofdigits int
}

var jobs = make(chan job, 10)
var results = make(chan result, 10)

func digits(number int) int {
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

func worker(w int) {
    for job := range jobs {
        output := result{job, digits(job.randomno)}
        results <- output
    }
}

func createWorkerPool(noOfWorkers int) {
    for i := 0; i < noOfWorkers; i++ {
        go worker(i)
    }
}

func allocate(noOfJobs int) {
    for i := 0; i < noOfJobs; i++ {
        randomno := 99
        job := job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

func result(done chan bool) {
    for result := range results {
        fmt.Printf("job id %d, input random no %d, sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits) 
    }
    done <- true
}

func main() {
    startTime := time.Now()
    noOfJobs := 10
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 2
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diffTime := endTime.Sub(startTime)
    fmt.Println("total time taken ", diffTime.Seconds(), "seconds")
}

运行后的输出结果为

job id 0, input random no 99, sum of digits 18
job id 4, input random no 99, sum of digits 18
job id 2, input random no 99, sum of digits 18
job id 3, input random no 99, sum of digits 18
job id 1, input random no 99, sum of digits 18
job id 6, input random no 99, sum of digits 18
job id 5, input random no 99, sum of digits 18
job id 8, input random no 99, sum of digits 18
job id 7, input random no 99, sum of digits 18
job id 9, input random no 99, sum of digits 18
total time taken  2.156952 seconds

示例二

在此示例中,将演示如何创建带有worker-pool的HTTP服务器。

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
)

var jobs = make(chan string, 10)
var results = make(chan string, 10)

func worker(w int) {
    for job := range jobs {
        resp, err := http.Get(job)
        if err == nil {
            bodyBytes, err := ioutil.ReadAll(resp.Body)
            if err == nil {
                results <- fmt.Sprintf("Worker id %d requested url %s and status code %s length is %d" , w, job, resp.Status, len(bodyBytes))
            }
        } else {
            results <- fmt.Sprintf("Worker id %d requested url %s but received error %v", w, job, err)
        }
    }
}

func createWorkerPool(noOfWorkers int) {
    for i := 0; i < noOfWorkers; i++ {
        go worker(i)
    }
}

func allocate(w http.ResponseWriter, req *http.Request) {
    url := req.URL.Query().Get("url")
    if url == "" {
        http.Error(w, "Please provide a URL", http.StatusBadRequest)
        return
    }
    jobs <- url
    w.WriteHeader(http.StatusCreated)
    return
}

func result(w http.ResponseWriter, req *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.WriteHeader(http.StatusOK)
    flusher := w.(http.Flusher)
    for {
        select {
        case result := <-results:
            fmt.Fprintf(w, "data: {\"message\": \"%s\"}\n\n", result)
            flusher.Flush()
        }
    }
}

func main() {
    noOfWorkers := 2
    createWorkerPool(noOfWorkers)
    http.HandleFunc("/allocate", allocate)
    http.HandleFunc("/result", result)
    http.ListenAndServe(":8080", nil)
}

运行示例后,在浏览器中打开http://localhost:8080/allocate?url=http://www.google.com,会在控制台上看到worker完成了请求和响应,像这样:

Worker id 1 requested url http://www.google.com and status code 200 OK length is 11662

结论

Go语言支持WorkerPool线程池并发模式,使应用程序更加高效。希望以上示例可以为你提供一些灵感和帮助。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Golang WorkerPool线程池并发模式示例详解 - Python技术站

(0)
上一篇 2023年5月17日
下一篇 2023年5月17日

相关文章

  • c#编写的高并发数据库控制访问代码

    针对c#编写的高并发数据库控制访问代码,可以采取以下步骤进行攻略: 步骤一:选择合适的数据库 选择合适的数据库是高并发处理中的第一步。一些常见的高并发数据库如Mysql、MongoDB、Oracle等等。在选择时,需要考虑实际业务情况和数据量,选择合适的数据库类型,同时要注意数据库的读写分离、分库分表等问题,以充分利用数据库的性能。 步骤二:使用连接池 在高…

    多线程 2023年5月17日
    00
  • 使用Redis incr解决并发问题的操作

    使用Redis incr操作可以解决并发问题。在Redis中,incr命令表示给定键的值增加1。在多人并发访问同一个键时,incr命令可以一定程度上解决并发问题。 以下是采取Redis incr解决并发问题的攻略: 1、设计键名 在设计键名时,应该遵循以下原则: 键名要尽可能简短和清晰易懂,以利于代码编写和阅读。 键名应该尽可能遵循命名规范,包括大小写、下划…

    多线程 2023年5月16日
    00
  • Go语言并发之原子操作详解

    《Go语言并发之原子操作详解》是一篇介绍Go语言中原子操作的高质量文章,下面就该主题进行详细的讲解及其示例说明。 什么是原子操作 原子操作是指一个操作是不可分割的一整个事务。当我们在运行并发程序的时候,原子操作就能够防止竞争条件的发生,保证数据的一致性以及避免数据竞争。 Go语言中的原子操作 Go语言内置了原子操作,可以通过原子操作实现并发安全。在Go语言中…

    多线程 2023年5月17日
    00
  • 一文搞懂Java创建线程的五种方法

    下面我将为您详细讲解创建Java线程的五种方法。 1. 继承Thread类 创建线程的第一种方式是继承Thread类。需要定义一个类来继承Thread,并覆写Thread的run方法,在其中编写线程要执行的任务。 public class MyThread extends Thread { public void run() { // 线程要执行的代码 } …

    多线程 2023年5月16日
    00
  • java ThreadPoolExecutor 并发调用实例详解

    Java ThreadPoolExecutor 并发调用实例详解 Java中的线程池可以提高应用程序的性能和可伸缩性。ThreadPoolExecutor是一个实现了ExecutorService接口的线程池类。通过ThreadPoolExecutor的配置,可以定制线程池的大小、任务队列大小、线程空闲时间等参数,以适应不同的应用场景。 ThreadPool…

    多线程 2023年5月16日
    00
  • PHP安装threads多线程扩展基础教程

    标题:PHP安装threads多线程扩展基础教程 1. 确认服务器环境 在安装threads多线程扩展前,需先确认一下服务器环境是否满足以下要求: PHP版本:5.5以上 SAPI类型:CLI(Command Line Interface) 系统:Linux/Unix/MacOS 2. 安装pthreads多线程扩展 2.1 下载pthreads扩展 git…

    多线程 2023年5月16日
    00
  • js Promise并发控制数量的方法

    JS Promise并发控制数量的方法指的是在使用 Promise 进行并发操作时,控制并发数量的技巧。 一般而言,我们可以使用 Promise.all() 或 Promise.race() 来处理并发请求,并获取返回结果。但是,有时我们需要控制并发请求的数量,避免发送过多的请求导致服务端出错或无响应。 以下是 JS Promise 并发控制数量的方法: 使…

    多线程 2023年5月16日
    00
  • Linux之多线程以及多线程并发访问同一块内存的处理问题

    Linux中的多线程是通过线程库来实现的,主要采用了POSIX线程库(Pthread)的API。多线程可以提高程序的并发性和效率,但同时也会带来线程并发访问同一块内存的问题,特别是当多个线程读写同一块数据时。 解决多线程并发访问同一块内存的问题,通常有以下两种方式: 使用锁机制 互斥锁(Mutex):防止多个线程同时访问共享资源 读写锁(Reader-Wri…

    多线程 2023年5月16日
    00
合作推广
合作推广
分享本页
返回顶部