Go 语言入门10 – 并发之资源池

提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go

一、资源池

import "log"

package pool

import (
"sync"
"io"
"errors"
"log"
)

// 声明池类结构体
type Pool struct {
   // 锁
   lock sync.Mutex
   // 池中存储的资源
   resources chan io.Closer
   // 资源创建工厂函数
   factory func() (io.Closer, error)
   // 池是否已经被关闭
   closed bool
}

// 创建池类实例的工厂函数
// 工厂函数名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
   if size <= 0 {
      return nil, errors.New("size too small");
   }

   return &Pool{
      resources: make(chan io.Closer, size),
      factory:   fn,
   }, nil
}

// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
   // select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default
   // 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
   select {
   // 检查是否有空闲的资源
   case r, ok := <-p.resources:
      log.Println("Acquire:", "Shared Resource")
      if !ok {
         return nil, errors.New("pool already closed")
      }
      return r, nil
   default:
      log.Println("Acquire:", "New Resource")
      // 调用资源创建函数创建资源
      return p.factory()
   }
}

// 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
   // 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源
   // 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel
   p.lock.Lock()
   defer p.lock.Unlock()

   // 如果池已经被关闭,销毁这个资源
   if p.closed {
      r.Close()
      return
   }

   select {
   // 试图将这个资源放入队列
   case p.resources <- r:
      log.Println("Release:", "In Queue")
   default:
      log.Println("Release:", "Closing")
      r.Close()
   }
}

// 关闭资源池,并关闭所有现有的资源
func (p *Pool) Close() {
   p.lock.Lock()
   defer p.lock.Unlock()

   if p.closed {
      return
   }

   p.closed = true

   // 在清空通道里的资源之前,将通道关闭
   close(p.resources)

   // 关闭资源
   for r := range p.resources {
      r.Close()
   }
}

select – default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources

二、具体的资源类

import (
   "io"
   "log"
   "sync/atomic"
)

package db

import (
"log"
"io"
"sync/atomic"
)

// 给每个连接分配一个独一无二的id
var idCounter int32

// 资源 - 数据库连接
type DBConnection struct {
   ID int32
}

// dbConnection 实现了 io.Closer 接口
// 关闭资源
func (conn *DBConnection) Close() error {
   log.Println("conn closed")
   return nil
}

// 创建一个资源 - dbConnection
func CreateConn() (io.Closer, error) {
   id := atomic.AddInt32(&idCounter, 1)
   log.Println("Create conn, id:", id)
   return &DBConnection{
      ID: id,
   }, nil
}

三、使用资源池

package main

import (
    "sync"
    "github.com/zhaojigang/pool/pool"
    "github.com/zhaojigang/pool/db"
    "log"
    "time"
    "math/rand"
)

const (
    maxGoroutines   = 5 // 要使用的goroutine的数量
    pooledResources = 2 // 池中的资源的数量
)

func performQuery(query int, p *pool.Pool) {
    // 1. 获取连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println("acquire conn error, ", err)
        return
    }

    // 使用结束后,释放链接
    defer p.Release(conn)

    // 该 log 模拟对连接的使用
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}

func main() {
    var waitGroup sync.WaitGroup
    waitGroup.Add(maxGoroutines)
    // 1. 创建一个 Pool
    p, err := pool.New(db.CreateConn, pooledResources)
    if err != nil {
        log.Println("create Pool error")
    }

    // 2. 开启 goroutine 执行任务
    for query := 0; query < maxGoroutines; query++ {
        // 每个goroutine需要自己复制一份要、查询值的副本,
        // 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
        go func(q int) {
            performQuery(q, p)
            waitGroup.Done()
        }(query)
        //time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
    }

    // 3. 关闭连接池
    waitGroup.Wait()
    p.Close()
    log.Println("pool closed - main")
}

在高并发的创建 go routine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如:提前创建好资源放入池中,go routine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。

 

Leave a Comment