Golang WorkerPool线程池并发模式示例详解
简介
WorkerPool即工作池,也称为线程池。它是一种并发编程模式,通常用于解决并发问题。在WorkerPool中,创建固定数量的worker,他们并行地从池中获取任务,并在处理任务时将其标记为完成。当所有可用的Worker都在使用时,新任务将被放入队列中,并等待有空闲的Worker。
原理
WorkerPool的主要优点是控制线程达到最大效率,最大限度地利用CPU时间,由于每个Worker都支持并发,因此可以处理大量的任务,并且可以在应用程序中实现流控制。以下是WorkerPool的基本原理:
-
创建一个worker池,该池中包含了可以进行并发处理的固定数量的worker。
-
工人从池中获取任务并执行任务。
-
当处理完成任务时,工作人员将结果放回池中。
-
如果池中没有可用的worker,则将任务放入队列中,并等待worker可用时将其分配。
示例一
在此示例中,将展示如何在Golang中实现WorkerPool模式。此示例将创建两个worker并向池中添加任务。
package main
import (
"time"
"fmt"
)
type job struct {
id int
randomno int
}
type result struct {
job job
sumofdigits int
}
var jobs = make(chan job, 10)
var results = make(chan result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(w int) {
for job := range jobs {
output := result{job, digits(job.randomno)}
results <- output
}
}
func createWorkerPool(noOfWorkers int) {
for i := 0; i < noOfWorkers; i++ {
go worker(i)
}
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := 99
job := job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("job id %d, input random no %d, sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 10
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 2
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diffTime := endTime.Sub(startTime)
fmt.Println("total time taken ", diffTime.Seconds(), "seconds")
}
运行后的输出结果为
job id 0, input random no 99, sum of digits 18
job id 4, input random no 99, sum of digits 18
job id 2, input random no 99, sum of digits 18
job id 3, input random no 99, sum of digits 18
job id 1, input random no 99, sum of digits 18
job id 6, input random no 99, sum of digits 18
job id 5, input random no 99, sum of digits 18
job id 8, input random no 99, sum of digits 18
job id 7, input random no 99, sum of digits 18
job id 9, input random no 99, sum of digits 18
total time taken 2.156952 seconds
示例二
在此示例中,将演示如何创建带有worker-pool的HTTP服务器。
package main
import (
"fmt"
"io/ioutil"
"net/http"
)
var jobs = make(chan string, 10)
var results = make(chan string, 10)
func worker(w int) {
for job := range jobs {
resp, err := http.Get(job)
if err == nil {
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err == nil {
results <- fmt.Sprintf("Worker id %d requested url %s and status code %s length is %d" , w, job, resp.Status, len(bodyBytes))
}
} else {
results <- fmt.Sprintf("Worker id %d requested url %s but received error %v", w, job, err)
}
}
}
func createWorkerPool(noOfWorkers int) {
for i := 0; i < noOfWorkers; i++ {
go worker(i)
}
}
func allocate(w http.ResponseWriter, req *http.Request) {
url := req.URL.Query().Get("url")
if url == "" {
http.Error(w, "Please provide a URL", http.StatusBadRequest)
return
}
jobs <- url
w.WriteHeader(http.StatusCreated)
return
}
func result(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.WriteHeader(http.StatusOK)
flusher := w.(http.Flusher)
for {
select {
case result := <-results:
fmt.Fprintf(w, "data: {\"message\": \"%s\"}\n\n", result)
flusher.Flush()
}
}
}
func main() {
noOfWorkers := 2
createWorkerPool(noOfWorkers)
http.HandleFunc("/allocate", allocate)
http.HandleFunc("/result", result)
http.ListenAndServe(":8080", nil)
}
运行示例后,在浏览器中打开http://localhost:8080/allocate?url=http://www.google.com,会在控制台上看到worker完成了请求和响应,像这样:
Worker id 1 requested url http://www.google.com and status code 200 OK length is 11662
结论
Go语言支持WorkerPool线程池并发模式,使应用程序更加高效。希望以上示例可以为你提供一些灵感和帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Golang WorkerPool线程池并发模式示例详解 - Python技术站