GO实现并发数控制的方法
在进行并发编程时,控制并发数显得尤为重要。在GO语言中,我们可以通过各种方式实现该控制。本文将提供基于Goroutine和Channel两种实现方式的讲解。
Goroutine 实现
使用goroutine来实现并发数控制,最简单的方式是使用sync.WaitGroup和channel。
WaitGroup
sync包提供了一个WaitGroup
类型,可以用于在goroutine结束时通知主线程,以便它可以等待所有goroutine完成。下面是基本示例:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
fmt.Println("worker", id, "processing job", j)
time.Sleep(time.Second)
results <- j * 2
}
}
func main() {
const numJobs = 5
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= numJobs; a++ {
<-results
}
}
在这个示例中,我们将假设有5个工作要完成。通过将它们发送到带有3个worker goroutines的jobs channel中,在每个worker goroutine中执行这些工作并将这些结果发送回results channel。单个main goroutine将准备好从results channel中接收5个结果,以确保有5个工作已完成。
控制并发数量
通过在worker goroutine中添加counter,可以轻松控制goroutines的数量。这里使用了带缓存的jobs来将工作放入goroutine,但是在 worker goroutine中,我们将调用Add方法以通知waitgroup,调用Done方法以通知waitgroup已完成,调用Wait方法以阻止main goroutine直到所有worker goroutine已完成。
func worker(id int, jobs <-chan int, results chan<- int, counter chan<- bool, wg *sync.WaitGroup) {
for j := range jobs {
counter <- true
fmt.Println("worker", id, "processing job", j)
time.Sleep(time.Second)
results <- j * 2
<-counter
wg.Done()
}
}
func main() {
const numJobs = 10
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
counter := make(chan bool, 10)
var wg sync.WaitGroup
for w := 1; w <= 3; w++ {
wg.Add(1)
go worker(w, jobs, results, counter, &wg)
}
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
wg.Wait()
for a := 1; a <= numJobs; a++ {
<-results
}
}
在这个例子中,我们将并发goroutine的数量从3增加到10。工作完成时,我们可以看到jobs channel已关闭,同时在results channel中检查结果。这个实现非常简单,易于理解和增强。
Channel实现
GO的channel提供了一种更灵活的方法,可以用来控制对资源的访问。当我们只关心一个请求或动作时,使用通道来限制并发工作非常有用。
在下面的例子中,我们测试了一个程序,对于具有相同传输参数的许多数据库查询请求,观察并发的性能差异。该示例使用两个channel(in、out)和一个goroutine池,以限制并发性。因为channel信道的阻塞特性,我们可以实现任务的同步控制。
package main
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"log"
"runtime"
"sync"
)
const (
concurrency = 3
)
type Query struct {
statement string
args []interface{}
}
func dbQuery(db *sql.DB, in chan *Query, out chan interface{}, pool chan bool) {
for q := range in {
pool <- true
rows, err := db.Query(q.statement, q.args...)
if err != nil {
fmt.Println("dbQuery Error: ", err)
byPass := make(chan bool)
(<-pool) // 取消该操作的连接数
out <- byPass
} else {
fmt.Println("dbQuery Succeeded: ")
out <- rows
(<-pool)
}
}
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
db, err := sql.Open("mysql", "test:test@tcp(127.0.0.1:3306)/test")
if err != nil {
log.Fatal(err)
}
defer db.Close()
err = db.Ping()
if err != nil {
log.Fatal(err)
}
queries := make(chan *Query, 20)
results := make(chan interface{}, 20)
pool := make(chan bool, concurrency)
for i := 0; i < concurrency; i++ {
go dbQuery(db, queries, results, pool)
}
// 发送并发请求
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
queries <- &Query{statement: "SELECT sleep(1)"}
queries <- &Query{statement: "SELECT sleep(2)"}
queries <- &Query{statement: "SELECT sleep(3)"}
queries <- &Query{statement: "BAD QUERY", args: []interface{}{}}
queries <- &Query{statement: "SELECT sleep(4)"}
queries <- &Query{statement: "SELECT sleep(5)"}
}()
// 连续读取结果
wg.Do(func() {
done := false
for !done {
select {
case r := <-results:
switch rt := r.(type) {
case chan bool:
done = <-rt
case *sql.Rows:
fmt.Println(rt)
}
}
}
})
}
在上述示例中,我们使用了3个goroutine并发执行了6个查询请求,其中一些为异常查询,其中通过池控制goroutine并发访问数据库。
通过使用并发既得到了优点又规避了优点,以使用数据库的同样方式,但用更少的连接(这里pool是连接数量)。此实现中的channel通信和goroutine池使并行改进非常容易。同时,当传入“非法”具有错误语句的查询时,我们可以快速出错,并优雅地尝试继续执行。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:golang实现并发数控制的方法 - Python技术站