Go 语言入门11 – 并发之协程池

提供一个 go routine 池,每个 go routine 循环阻塞等待从任务池中执行任务;外界使用者不断的往任务池里丢任务,则 go routine 池中的多个 go routine 会并发的处理这些任务

一、worker/workPool.go

import "sync"

type Worker interface {
   Task()
}

type Pool struct {
   wg sync.WaitGroup
   // 工作池
   taskPool chan Worker
}

func New(maxGoroutineNum int) *Pool {
   // 1. 初始化一个 Pool
   p := Pool{
      taskPool: make(chan Worker),
   }

   p.wg.Add(maxGoroutineNum)
   // 2. 创建 maxGoroutineNum 个 goroutine,并发的从 taskPool 中获取任务
   for i := 0; i < maxGoroutineNum; i++ {
      go func() {
         for task := range p.taskPool { // 阻塞获取,一旦没有任务,阻塞在这里 - 无缓冲 channel
            // 3. 执行任务
            task.Task()
         }
         p.wg.Done()
      }()
   }

   return &p
}

// 提交任务到worker池中
func (p *Pool) Run(worker Worker) {
   p.taskPool <- worker
}

func (p *Pool) Shutdown() {
   // 关闭通道
   close(p.taskPool)
   p.wg.Wait()
}

 

二、namePrinter/namePrinter.go

import (
   "fmt"
   "time"
)

type NamePrinter struct {
   Name string
}

func (np *NamePrinter) Task()  {
   fmt.Println(np.Name)
   time.Sleep(time.Second)
}

三、main.go

import (
   "github.com/zhaojigang/worker/worker"
   "sync"
   "github.com/zhaojigang/worker/namePrinter"
)

var names = []string{
   "steve",
   "bob",
}

func main() {

   // 1. 启动两个 goroutine,等待执行任务
   p := worker.New(2)

   var wg sync.WaitGroup
   wg.Add(3 * len(names))

   // 2. 创建 worker,扔到 goroutine 池中
   for i := 0; i < 3; i++ {
      for _, namex := range names {
         worker := namePrinter.NamePrinter{
            Name:namex,
         }
         go func() {
            p.Run(&worker)
            wg.Done()
         }()
      }
   }

   // 3. 等待添加任务完毕
   wg.Wait()

   // 4. 关闭 goroutine 池
   p.Shutdown()
}

Go 语言入门10 – 并发之资源池

提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go

一、资源池

import "log"

package pool

import (
"sync"
"io"
"errors"
"log"
)

// 声明池类结构体
type Pool struct {
   // 锁
   lock sync.Mutex
   // 池中存储的资源
   resources chan io.Closer
   // 资源创建工厂函数
   factory func() (io.Closer, error)
   // 池是否已经被关闭
   closed bool
}

// 创建池类实例的工厂函数
// 工厂函数名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
   if size <= 0 {
      return nil, errors.New("size too small");
   }

   return &Pool{
      resources: make(chan io.Closer, size),
      factory:   fn,
   }, nil
}

// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
   // select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default
   // 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
   select {
   // 检查是否有空闲的资源
   case r, ok := <-p.resources:
      log.Println("Acquire:", "Shared Resource")
      if !ok {
         return nil, errors.New("pool already closed")
      }
      return r, nil
   default:
      log.Println("Acquire:", "New Resource")
      // 调用资源创建函数创建资源
      return p.factory()
   }
}

// 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
   // 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源
   // 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel
   p.lock.Lock()
   defer p.lock.Unlock()

   // 如果池已经被关闭,销毁这个资源
   if p.closed {
      r.Close()
      return
   }

   select {
   // 试图将这个资源放入队列
   case p.resources <- r:
      log.Println("Release:", "In Queue")
   default:
      log.Println("Release:", "Closing")
      r.Close()
   }
}

// 关闭资源池,并关闭所有现有的资源
func (p *Pool) Close() {
   p.lock.Lock()
   defer p.lock.Unlock()

   if p.closed {
      return
   }

   p.closed = true

   // 在清空通道里的资源之前,将通道关闭
   close(p.resources)

   // 关闭资源
   for r := range p.resources {
      r.Close()
   }
}

select – default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources

二、具体的资源类

import (
   "io"
   "log"
   "sync/atomic"
)

package db

import (
"log"
"io"
"sync/atomic"
)

// 给每个连接分配一个独一无二的id
var idCounter int32

// 资源 - 数据库连接
type DBConnection struct {
   ID int32
}

// dbConnection 实现了 io.Closer 接口
// 关闭资源
func (conn *DBConnection) Close() error {
   log.Println("conn closed")
   return nil
}

// 创建一个资源 - dbConnection
func CreateConn() (io.Closer, error) {
   id := atomic.AddInt32(&idCounter, 1)
   log.Println("Create conn, id:", id)
   return &DBConnection{
      ID: id,
   }, nil
}

三、使用资源池

package main

import (
    "sync"
    "github.com/zhaojigang/pool/pool"
    "github.com/zhaojigang/pool/db"
    "log"
    "time"
    "math/rand"
)

const (
    maxGoroutines   = 5 // 要使用的goroutine的数量
    pooledResources = 2 // 池中的资源的数量
)

func performQuery(query int, p *pool.Pool) {
    // 1. 获取连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println("acquire conn error, ", err)
        return
    }

    // 使用结束后,释放链接
    defer p.Release(conn)

    // 该 log 模拟对连接的使用
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}

func main() {
    var waitGroup sync.WaitGroup
    waitGroup.Add(maxGoroutines)
    // 1. 创建一个 Pool
    p, err := pool.New(db.CreateConn, pooledResources)
    if err != nil {
        log.Println("create Pool error")
    }

    // 2. 开启 goroutine 执行任务
    for query := 0; query < maxGoroutines; query++ {
        // 每个goroutine需要自己复制一份要、查询值的副本,
        // 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
        go func(q int) {
            performQuery(q, p)
            waitGroup.Done()
        }(query)
        //time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
    }

    // 3. 关闭连接池
    waitGroup.Wait()
    p.Close()
    log.Println("pool closed - main")
}

在高并发的创建 go routine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如:提前创建好资源放入池中,go routine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。

 

Go 语言入门9 – Channel(通道)

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。(这是除了 atomic 和 mutex 之外的第三种处理竞态资源的方式)
Channel分为两种:

  • 无缓冲的 Channel
    • 无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。
  • 有缓冲的 Channel
    • 有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

一、无缓冲的通道

import (
    "sync"
    "fmt"
    "math/rand"
)

var wg sync.WaitGroup

// Channel 完整的类型是 "chan 数据类型"
func player(name string, court chan int) {
    defer wg.Done()

    for {
        // 1. 阻塞等待接球,如果通道关闭,ok返回false
        ball, ok := <-court
        if !ok {
            fmt.Printf("channel already closed! Player %s won\n", name)
            return
        }

        random := rand.Intn(100)
        if random%13 == 0 {
            fmt.Printf("Player %s Lose\n", name)
            // 关闭通道
            close(court)
            return
        }

        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball ++
        // 2. 发球,阻塞等待对方接球
        court <- ball
    }
}

// 两个 player 打网球,即生产者和消费者模式(互为生产者和消费者)
func main() {
    wg.Add(2)

    // 1. 创建一个无缓冲的通道
    // Channel 完整的类型是 "chan 数据类型"
    court := make(chan int)

    // 2. 创建两个 goroutine
    go player("zhangsan", court)
    go player("lisi", court)

    // 3. 发球:向通道发送数据,阻塞等待通道对端接收
    court <- 1

    // 4. 等待输家出现
    wg.Wait()
}

二、有缓冲的通道

import (
   "sync"
   "fmt"
   "time"
)

// 使用4个goroutine来完成10个任务
const (
   taskNum      = 10
   goroutineNum = 4
)

var countDownLatch sync.WaitGroup

func worker(name string, taskChannel chan string) {
   defer countDownLatch.Done()
   for {
      // 1. 不断的阻塞等待分配工作
      task, ok := <-taskChannel
      if !ok {
         fmt.Printf("channel closed and channel is empty\n")
         return
      }

      //fmt.Printf("worker %s start %s\n", name, task)
      time.Sleep(100 * time.Millisecond)
      fmt.Printf("worker %s complete %s\n", name, task)
   }
}

func main() {
   countDownLatch.Add(goroutineNum)
   // 1. 创建有缓冲区的string channel
   taskChannel := make(chan string, taskNum)

   // 2. 创建 4 个goroutine去干活
   for i := 0; i < goroutineNum; i++ {
      go worker(fmt.Sprintf("worker %d", i), taskChannel)
   }

   // 3. 向通道加入task
   for i := 0; i < taskNum; i++ {
      taskChannel <- fmt.Sprintf("task %d", i)
   }

   // 4. 关闭通道:
   // 当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。
   // 能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。
   // 从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值
   close(taskChannel)

   // 5. 等待
   countDownLatch.Wait()
}

当通道关闭后,go routine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。

从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。

Go 语言入门8 – 多线程

Go routine 基于协程 Coroutine,原理总结:

如果创建一个 goroutine 并准备运行,这个 goroutine 就会被放到调度器的全局运行队列中。之后,调度器就将这些队列中的 goroutine 分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中。本地运行队列中的 goroutine 会一直等待直到自己被分配的逻辑处理器执行。

Go routine 机制原理如下图所示:

1556591431-8545-5842684-caf2ddc8d7ed6338

一、示例

import (
   "runtime"
   "sync"
   "fmt"
)

func main() {
   // 1. 分配一个逻辑处理器给调度器使用
   runtime.GOMAXPROCS(1)

   // 2. 设定等待器,类比 Java CountDownLatch
   var waitGroup sync.WaitGroup
   waitGroup.Add(2)

   fmt.Println("=== start ===")
   // 3. 创建第一个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 4. 创建第二个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 5. 阻塞 main goroutine
   waitGroup.Wait() // CountDownLatch#await()
   fmt.Println("=== end ===")
}

使用 go 关键字创建 Go routine

  • 匿名函数实现方式 go func() {xxx}()
  • 普通函数 funcA 实现方式 go funcA()

二、打断正在运行的 Go routine

  • 基于调度器的内部算法,一个正运行的 go routine 在工作结束前,可以被停止并重新调度。
  • 调度器这样做的目的是防止某个 go routine 长时间占用逻辑处理器。当 go routine 占用时间过长时,调度器会停止当前正运行的 go routine,并给其他可运行的 go routine 运行的机会。

该机制的原理如下图所示:
1556591439-2367-5842684-7023f4c17581356e
步骤:

  • 在第 1 步,调度器开始运行 go routine A,而 go routine B 在运行队列里等待调度。
  • 在第 2 步,调度器交换了 go routine A 和 go routine B。 由于 go routine A 并没有完成工作,因此被放回到运行队列。
  • 在第 3 步,go routine B 完成了它的工作并被系统销毁。这也让 go routine A 继续之前的工作。

注意:上述步骤都是由调度器内部实现的,我们不需要编写代码去实现。

三、设置逻辑处理器数量

    // 为每个物理处理器分配一个逻辑处理器给调度器使用
    runtime.GOMAXPROCS(runtime.NumCPU())

四、竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。

同一时刻只能有一个 goroutine 对共享资源进行读和写操作

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   // 两个 goroutine 同时操作的变量,竞态变量
   counter     int
   waitGroup sync.WaitGroup
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      value := counter
      // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
      // 让其他 goroutine 进行执行
      runtime.Gosched()
      value ++
      counter = value
   }
}

func main() {
   runtime.GOMAXPROCS(1)
   waitGroup.Add(2)

   go incCount(1)
   go incCount(2)

   waitGroup.Wait()
   fmt.Println(counter) // 正确为4,实际上为2
}

代码执行图:
1556591451-4320-5842684-0609cefc2b7bb6a5

 

五、锁机制

5.1、原子类 atomic

import (
   "runtime"
   "sync/atomic"
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      // 使用原子类
      atomic.AddInt64(&counter, 1)
      runtime.Gosched()
   }
}

另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式

import (
   "sync"
   "time"
   "sync/atomic"
   "fmt"
)

var (
   shutdown  int64
   waitGroup sync.WaitGroup
)

func doWork(name string) {
   defer waitGroup.Done()
   for {
      time.Sleep(250 * time.Millisecond)
      // 记载关机标志
      if atomic.LoadInt64(&shutdown) == 1 {
         fmt.Println("shutDown, ", name)
         break
      }
   }
}

func main() {
   waitGroup.Add(2)

   go doWork("A")
   go doWork("B")

   // 给定goroutine执行的时间
   time.Sleep(1000 * time.Millisecond)

   // 设定关机标志
   atomic.StoreInt64(&shutdown, 1)

   waitGroup.Wait()
}

5.2、互斥锁 mutex

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码

 

var (
    // 两个 goroutine 同时操作的变量,竞态变量
    counter     int
    waitGroup sync.WaitGroup
        // 锁,定义一段临界区
    lock sync.Mutex
)

func incCount(int) {
    defer waitGroup.Done()
    for count := 0; count < 2; count++ {
        lock.Lock()
        { // Lock() 与 UnLock() 之间的代码都属于临界区,{}是可以省略的,加上看起来清晰
            value := counter
            // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
            // 让其他 goroutine 进行执行
            // 但是因为锁没有释放,调度器还会继续安排执行该 goroutine
            runtime.Gosched()
            value ++
            counter = value
        }
        lock.Unlock()
        // 释放锁,允许其他正在等待的 goroutine 进入临界区
    }
}
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |