Go 语言入门11 – 并发之协程池

提供一个 go routine 池,每个 go routine 循环阻塞等待从任务池中执行任务;外界使用者不断的往任务池里丢任务,则 go routine 池中的多个 go routine 会并发的处理这些任务

一、worker/workPool.go

import "sync"

type Worker interface {
   Task()
}

type Pool struct {
   wg sync.WaitGroup
   // 工作池
   taskPool chan Worker
}

func New(maxGoroutineNum int) *Pool {
   // 1. 初始化一个 Pool
   p := Pool{
      taskPool: make(chan Worker),
   }

   p.wg.Add(maxGoroutineNum)
   // 2. 创建 maxGoroutineNum 个 goroutine,并发的从 taskPool 中获取任务
   for i := 0; i < maxGoroutineNum; i++ {
      go func() {
         for task := range p.taskPool { // 阻塞获取,一旦没有任务,阻塞在这里 - 无缓冲 channel
            // 3. 执行任务
            task.Task()
         }
         p.wg.Done()
      }()
   }

   return &p
}

// 提交任务到worker池中
func (p *Pool) Run(worker Worker) {
   p.taskPool <- worker
}

func (p *Pool) Shutdown() {
   // 关闭通道
   close(p.taskPool)
   p.wg.Wait()
}

 

二、namePrinter/namePrinter.go

import (
   "fmt"
   "time"
)

type NamePrinter struct {
   Name string
}

func (np *NamePrinter) Task()  {
   fmt.Println(np.Name)
   time.Sleep(time.Second)
}

三、main.go

import (
   "github.com/zhaojigang/worker/worker"
   "sync"
   "github.com/zhaojigang/worker/namePrinter"
)

var names = []string{
   "steve",
   "bob",
}

func main() {

   // 1. 启动两个 goroutine,等待执行任务
   p := worker.New(2)

   var wg sync.WaitGroup
   wg.Add(3 * len(names))

   // 2. 创建 worker,扔到 goroutine 池中
   for i := 0; i < 3; i++ {
      for _, namex := range names {
         worker := namePrinter.NamePrinter{
            Name:namex,
         }
         go func() {
            p.Run(&worker)
            wg.Done()
         }()
      }
   }

   // 3. 等待添加任务完毕
   wg.Wait()

   // 4. 关闭 goroutine 池
   p.Shutdown()
}

Leave a Comment