Golang协程池gopool设计与实现
协程池的概念
在 Golang 中,我们可以通过 Go 关键字,轻松创建协程(也称作 goroutine),但这种方式也会导致大量的协程被创建,如果这些协程的生命周期很短,那么会导致频繁的创建和销毁,带来较大的系统开销。此时,协程池就应运而生了。协程池的工作原理是,创建一些协程并将它们放到一个池子里面,并在需要使用协程时,从池子中取出一个协程来执行任务,任务完成后再将协程放回池子中,从而避免了频繁地创建和销毁协程。
gopool的实现原理
gopool 是一个 Golang 协程池的工具库,通过封装 sync.Pool 和 Golang 协程实现。gopool 提供了一个分配协程的入口,我们可以在程序里调用该函数来申请一个协程来执行任务。
gopool 包含以下几个部分的实现:
协程池结构体
type Pool interface {
Stop()
Serve(context.Context) error
}
type pool struct {
mu sync.Mutex
cond *sync.Cond
cap int // 协程池容量
running int // 正在执行的协程数
workers []*worker // 协程池所包含的工人
expiredTime time.Duration // 协程空闲超时时间
ctx context.Context // 上下文
cancel context.CancelFunc
}
工人结构体
type worker struct {
pool *pool
task chan func() // 实际执行任务的通道
used int64 // 工人使用次数
expire int64 // 工人的过期时间
}
协程池运行方法
func (p *pool) Serve(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
default:
}
if len(p.workers) >= p.cap {
return nil // 达到最大并发数量
}
w := &worker{
pool: p,
task: make(chan func(), 1),
}
p.mu.Lock()
p.workers = append(p.workers, w)
p.mu.Unlock()
go w.run()
select {
case <-ctx.Done():
return nil
default:
}
}
}
工人运行方法
func (w *worker) run() {
for {
w.pool.cond.L.Lock() // 获取锁
for len(w.task) == 0 {
when := time.Now().UnixNano()
// 判断是否已经过期了
d := w.expire - when
if d <= 0 {
w.pool.removeWorker(w)
w.pool.cond.L.Unlock()
return // 过期了,退出
}
w.pool.cond.L.Unlock()
time.Sleep(time.Duration(d))
w.pool.cond.L.Lock()
}
// 从任务队列中收取任务
task := <-w.task
w.pool.running++
w.pool.cond.L.Unlock()
// 执行任务
task()
// 通知任务已完成,并放回任务池中
w.pool.cond.L.Lock()
w.pool.running--
w.pool.putTask(task)
w.pool.cond.L.Unlock()
}
}
协程池停止方法
为了保证在没有任务时,协程池可以正确的退出,需要实现一个协程锁,用来控制工作协程停止工作。
// 停止协程池
func (p *pool) Stop() {
p.cancel()
p.mu.Lock()
for _, worker := range p.workers {
worker.stop()
}
p.workers = nil
p.running = 0
p.mu.Unlock()
}
gopool使用示例
下面,我们就来看两个使用 gopool 的例子:计算斐波那契数列和从文件中读入数据。
计算斐波那契数列
func Fibonacci(n int) int {
if n == 0 {
return 0
}
if n == 1 {
return 1
}
return Fibonacci(n-1) + Fibonacci(n-2)
}
// 计算斐波那契数列和
func CalculateFibonacci(n int) int {
if n <= 0 {
return 0
}
if n == 1 {
return 1
}
task := make(chan func())
p := newGoroutinePool(10, 10)
defer p.Stop()
for i := 0; i < n; i++ {
i := i
p.Execute(func() {
task <- func() {
Fibonacci(i)
}
}, nil)
}
result := make(chan int, n)
for i := 0; i < n; i++ {
p.Execute(nil, func() {
result <- <-task
})
}
total := 0
for i := 0; i < n; i++ {
total += <-result
}
return total
}
从文件中读入数据
func countWords(fn string, filter func(string) bool) (int, error) {
var count int
file, err := os.Open(fn)
if err != nil {
return 0, err
}
defer file.Close()
scanner := bufio.NewScanner(file)
task := make(chan func())
p := newGoroutinePool(10, 10)
defer p.Stop()
for scanner.Scan() {
line := scanner.Text()
p.Execute(func() {
task <- func() {
if filter == nil || filter(line) {
count += len(strings.Split(line, " "))
}
}
}, nil)
}
if err := scanner.Err(); err != nil {
return 0, err
}
for {
if len(task) == 0 {
break
}
p.Execute(nil, func() {
<-task
})
}
return count, nil
}
以上是 gopool 的使用示例,可以看出,gopool 的使用相对简单,通过传入一个需要执行的函数即可。同时,也可以从以上示例中发现 gopool 的协程池确实可以很好的提高程序的性能。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Golang协程池gopool设计与实现 - Python技术站