Go 语言入门12 – 实战项目之单任务版爬虫

  • 单人版爬虫:一个 Goroutine 运行整个爬虫项目
  • 并发版爬虫:多个 Goroutine 在一台机器上实现爬虫项目
  • 分布式爬虫:多个 Goroutine 在多台机器上实现爬虫项目

一、爬虫整体算法

该爬虫项目爬取的是珍爱网的数据,总体算法如下

5842684-fbe951dcef43f94c

  • 首先根据城市列表 Url 爬取城市列表,爬取出来的内容通过城市列表解析器解析出来每一个城市的 Url
  • 然后根据每一个城市的 Url 爬取该城市的用户信息列表,通过城市解析器将用户信息列表中的用户 Url 解析出来
  • 最后根据每一个用户的 Url 爬取该用户的详细信息,并进行解析

三种 Url 示例:

  • 城市列表 Url:http://www.zhenai.com/zhenghun
  • 城市 Url:http://www.zhenai.com/zhenghun/aba
  • 用户 Url:http://album.zhenai.com/u/1902329077

二、单任务版爬虫架构

5842684-fbe951dcef43f94c

  • 1.会将种子 Url Seed 连同其解析器 Parser 封装为一个 Request,放入 Engine 引擎中的任务队列(其实就是 []Request 切片)中,启动爬取任务(这里的 Seed 就是城市列表 Url)
  • 2. Engine 使用 Fetcher 爬取该 Url 的内容 text,然后使用对应 Url 的Parser 解析该 text,将解析出来的 Url(例如,城市 Url)和其 Parser 封装为 Request 加入 Engine 任务队列,将解析出来的 items(例如,城市名)打印出来
  • 3. Engine 不断的从其任务队列中获取任务 Request 一个个进行串行执行(使用 Fetcher 对 Request.Url 进行爬取,使用 Request.Parser 对爬取出来的 text 进行解析,将解析出来的内容部分进行封装为Request,进行后续循环,部分进行打印)

三、代码实现

5842684-fbe951dcef43f94c

3.1 请求与解析结果类型结构 type.go

package engine

// 请求任务封装体
type Request struct {
   // 需爬取的 Url
   Url string
   // Url 对应的解析函数
   ParserFunc func([]byte) ParseResult
}

// 解析结果
type ParseResult struct {
   // 解析出来的多个 Request 任务
   Requests []Request
   // 解析出来的实体(例如,城市名),是任意类别(interface{},类比 java Object)
   Items    []interface{}
}

3.2 执行引擎 engine.go

package engine

import (
   "fetcher"
   "log"
)

func Run(seeds ...Request) {
   // Request 任务队列
   var requests []Request
   // 将 seeds Request 放入 []requests,即初始化 []requests
   for _, r := range seeds {
      requests = append(requests, r)
   }
   // 执行任务
   for len(requests) > 0 {
      // 1. 获取第一个 Request,并从 []requests 移除,实现了一个队列功能
      r := requests[0]
      requests = requests[1:]

      // 2. 使用爬取器进行对 Request.Url 进行爬取
      body, err := fetcher.Fetch(r.Url)
      // 如果爬取出错,记录日志
      if err != nil {
         log.Printf("fetch error, url: %s, err: %v", r.Url, err)
         continue
      }

      // 3. 使用 Request 的解析函数对怕渠道的内容进行解析
      parseResult := r.ParserFunc(body)
      // 4. 将解析体中的 []Requests 加到请求任务队列 requests 的尾部
      requests = append(requests, parseResult.Requests...)

      // 5. 遍历解析出来的实体,直接打印
      for _, item := range parseResult.Items {
         log.Printf("getItems, url: %s, items: %v", r.Url, item)
      }
   }
}

3.3 爬取器 fetcher.go

package fetcher

import (
   "fmt"
   "io/ioutil"
   "net/http"
)

func Fetch(url string) ([]byte, error) {
   // 1. 爬取 url
   resp, err := http.Get(url)
   if err != nil {
      return nil, err
   }
   defer resp.Body.Close()

   if resp.StatusCode != http.StatusOK {
      return nil, fmt.Errorf("wrong statusCode, %d", resp.StatusCode)
   }
   // 2. 读取响应体并返回
   return ioutil.ReadAll(resp.Body)
}

3.4 三种解析器

  • 城市列表解析器 citylist.go
package parser

import (
   "engine"
   "regexp"
)

const cityListRe = `<a href="(http://www.zhenai.com/zhenghun/[0-9a-z]+)"[^>]*>([^<]*)</a>`

// cityList 的 ParserFunc func([]byte) ParseResult
// 解析种子页面 - 获取城市列表
func ParseCityList(contents []byte) engine.ParseResult {
   result := engine.ParseResult{}
   // 正则表达式:()用于提取
   rg := regexp.MustCompile(cityListRe)
   allSubmatch := rg.FindAllSubmatch(contents, -1)
   // 遍历每一个城市的匹配字段(城市 Url 和城市名),并且将 Url 和城市解析器封装为一个 Request
   // 最后将该 Request 添加到 ParseResult 中
   for _, m := range allSubmatch {
      result.Items = append(result.Items, "city "+string(m[2]))
      result.Requests = append(result.Requests, engine.Request{
         Url:        string(m[1]),
         ParserFunc: ParseCity,
      })
   }
   // 返回 ParseResult
   return result
}

学习 Go 正则表达式的使用

  • 城市解析器 city.go
package parser

import (
   "engine"
   "regexp"
)

// match[1]=url match[2]=name
const cityRe = `<a href="(http://album.zhenai.com/u/[0-9]+)"[^>]*>([^<]+)</a>`

// 解析单个城市 - 获取单个城市的用户列表
func ParseCity(contents []byte) engine.ParseResult {
   result := engine.ParseResult{}
   rg := regexp.MustCompile(cityRe)
   allSubmatch := rg.FindAllSubmatch(contents, -1)
   for _, m := range allSubmatch {
      name := string(m[2])
      result.Items = append(result.Items, "user "+name)
      result.Requests = append(result.Requests, engine.Request{
         Url: string(m[1]),
         ParserFunc: func(c []byte) engine.ParseResult {
            return ParseProfile(c, name) // 函数式编程,使用函数包裹函数
         },
      })
   }

   return result
}

学习函数式编程:使用函数包裹函数,即函数的返回值和入参都可以是函数。

  • 用户解析器 profile.go
package parser

import (
   "github.com/zhaojigang/crawler/engine"
   "github.com/zhaojigang/crawler/model"
   "regexp"
   "strconv"
)

var ageRe = regexp.MustCompile(`<td><span class=""label">年龄:</span>([\d])+岁</td>`)
var incomeRe = regexp.MustCompile(`<td><span class=""label">月收入:</span>([^<]+)</td>`)

// 解析单个人的主页
func ParseProfile(contents []byte, name string) engine.ParseResult {
   profile := model.Profile{}

   // 1. 年龄
   age, err := strconv.Atoi(extractString(contents, ageRe))
   if err == nil {
      profile.Age = age
   }

   // 2. 月收入
   profile.Income = extractString(contents, incomeRe)

   // 3. 姓名
   profile.Name = name

   result := engine.ParseResult{
      Items: []interface{}{profile},
   }
   return result
}

func extractString(body []byte, re *regexp.Regexp) string {
   match := re.FindSubmatch(body) // 只找到第一个match的
   if len(match) >= 2 {
      return string(match[1])
   }
   return ""
}

profile 实体类

package model

type Profile struct {
   // 姓名
   Name string
   // 年龄
   Age int
   // 收入
   Income string
}

3.5 启动器 main.go

package main

import (
   "github.com/zhaojigang/crawler/engine"
   "github.com/zhaojigang/crawler/zhenai/parser"
)

func main() {
   engine.Run(engine.Request{
      // 种子 Url
      Url:        "http://www.zhenai.com/zhenghun",
      ParserFunc: parser.ParseCityList,
   })
}

解析器测试类

package parser

import (
   "io/ioutil"
   "testing"
)

func TestParseCityList(t *testing.T) {
   expectRequestsLen := 470
   expectCitiesLen := 470
   // 表格驱动测试
   expectRequestUrls := []string{
      "http://www.zhenai.com/zhenghun/aba",
      "http://www.zhenai.com/zhenghun/akesu",
      "http://www.zhenai.com/zhenghun/alashanmeng",
   }
   expectRequestCities := []string{
      "city 阿坝",
      "city 阿克苏",
      "city 阿拉善盟",
   }

   body, err := ioutil.ReadFile("citylist_test_data.html")
   if err != nil {
      panic(err)
   }
   result := ParseCityList(body)

   if len(result.Requests) != expectRequestsLen {
      t.Errorf("expect requestLen %d, but %d", expectRequestsLen, len(result.Requests))
   }
   if len(result.Items) != expectCitiesLen {
      t.Errorf("expect citiesLen %d, but %d", expectCitiesLen, len(result.Items))
   }

   for i, url := range expectRequestUrls {
      if url != result.Requests[i].Url {
         t.Errorf("expect url %s, but %s", url, result.Requests[i].Url)
      }
   }

   for i, city := range expectRequestCities {
      if city != result.Items[i] {
         t.Errorf("expect url %s, but %s", city, result.Items[i])
      }
   }
}

学习经典的 Go 表格驱动测试。

执行 main 函数发现执行的很慢,因为只有一个 main Go routine 在执行,还有网络 IO,所以比较慢,接下来,将单任务版的改造成多个 Go routine 共同执行的并发版的爬虫。

Go 语言入门11 – 并发之协程池

提供一个 go routine 池,每个 go routine 循环阻塞等待从任务池中执行任务;外界使用者不断的往任务池里丢任务,则 go routine 池中的多个 go routine 会并发的处理这些任务

一、worker/workPool.go

import "sync"

type Worker interface {
   Task()
}

type Pool struct {
   wg sync.WaitGroup
   // 工作池
   taskPool chan Worker
}

func New(maxGoroutineNum int) *Pool {
   // 1. 初始化一个 Pool
   p := Pool{
      taskPool: make(chan Worker),
   }

   p.wg.Add(maxGoroutineNum)
   // 2. 创建 maxGoroutineNum 个 goroutine,并发的从 taskPool 中获取任务
   for i := 0; i < maxGoroutineNum; i++ {
      go func() {
         for task := range p.taskPool { // 阻塞获取,一旦没有任务,阻塞在这里 - 无缓冲 channel
            // 3. 执行任务
            task.Task()
         }
         p.wg.Done()
      }()
   }

   return &p
}

// 提交任务到worker池中
func (p *Pool) Run(worker Worker) {
   p.taskPool <- worker
}

func (p *Pool) Shutdown() {
   // 关闭通道
   close(p.taskPool)
   p.wg.Wait()
}

 

二、namePrinter/namePrinter.go

import (
   "fmt"
   "time"
)

type NamePrinter struct {
   Name string
}

func (np *NamePrinter) Task()  {
   fmt.Println(np.Name)
   time.Sleep(time.Second)
}

三、main.go

import (
   "github.com/zhaojigang/worker/worker"
   "sync"
   "github.com/zhaojigang/worker/namePrinter"
)

var names = []string{
   "steve",
   "bob",
}

func main() {

   // 1. 启动两个 goroutine,等待执行任务
   p := worker.New(2)

   var wg sync.WaitGroup
   wg.Add(3 * len(names))

   // 2. 创建 worker,扔到 goroutine 池中
   for i := 0; i < 3; i++ {
      for _, namex := range names {
         worker := namePrinter.NamePrinter{
            Name:namex,
         }
         go func() {
            p.Run(&worker)
            wg.Done()
         }()
      }
   }

   // 3. 等待添加任务完毕
   wg.Wait()

   // 4. 关闭 goroutine 池
   p.Shutdown()
}

Go 语言入门10 – 并发之资源池

提供一个资源池,类似于数据库连接池的功能;资源池在 go 1.11.1 中有官方实现:sync/pool.go

一、资源池

import "log"

package pool

import (
"sync"
"io"
"errors"
"log"
)

// 声明池类结构体
type Pool struct {
   // 锁
   lock sync.Mutex
   // 池中存储的资源
   resources chan io.Closer
   // 资源创建工厂函数
   factory func() (io.Closer, error)
   // 池是否已经被关闭
   closed bool
}

// 创建池类实例的工厂函数
// 工厂函数名通常使用 New 名字
func New(fn func() (io.Closer, error), size int) (*Pool, error) {
   if size <= 0 {
      return nil, errors.New("size too small");
   }

   return &Pool{
      resources: make(chan io.Closer, size),
      factory:   fn,
   }, nil
}

// 从池中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
   // select - default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default
   // 当然,如果没有 default,那么还是要阻塞在 <-p.resources 上的
   select {
   // 检查是否有空闲的资源
   case r, ok := <-p.resources:
      log.Println("Acquire:", "Shared Resource")
      if !ok {
         return nil, errors.New("pool already closed")
      }
      return r, nil
   default:
      log.Println("Acquire:", "New Resource")
      // 调用资源创建函数创建资源
      return p.factory()
   }
}

// 将一个使用后的资源放回池里
func (p *Pool) Release(r io.Closer) {
   // 注意:Release 和 Close 使用的是同一把锁,就是说二者同时只能执行一个,防止资源池已经关闭了,release 还向资源池放资源
   // 向一个已经关闭的 channel 发送消息,会发生 panic: send on closed channel
   p.lock.Lock()
   defer p.lock.Unlock()

   // 如果池已经被关闭,销毁这个资源
   if p.closed {
      r.Close()
      return
   }

   select {
   // 试图将这个资源放入队列
   case p.resources <- r:
      log.Println("Release:", "In Queue")
   default:
      log.Println("Release:", "Closing")
      r.Close()
   }
}

// 关闭资源池,并关闭所有现有的资源
func (p *Pool) Close() {
   p.lock.Lock()
   defer p.lock.Unlock()

   if p.closed {
      return
   }

   p.closed = true

   // 在清空通道里的资源之前,将通道关闭
   close(p.resources)

   // 关闭资源
   for r := range p.resources {
      r.Close()
   }
}

select – default 经典模式,将阻塞形式的 channel 改为了非阻塞,当 <-p.resources 不能立即返回时,执行 default;当然,如果没有 default,那么还是要阻塞在 <-p.resources

二、具体的资源类

import (
   "io"
   "log"
   "sync/atomic"
)

package db

import (
"log"
"io"
"sync/atomic"
)

// 给每个连接分配一个独一无二的id
var idCounter int32

// 资源 - 数据库连接
type DBConnection struct {
   ID int32
}

// dbConnection 实现了 io.Closer 接口
// 关闭资源
func (conn *DBConnection) Close() error {
   log.Println("conn closed")
   return nil
}

// 创建一个资源 - dbConnection
func CreateConn() (io.Closer, error) {
   id := atomic.AddInt32(&idCounter, 1)
   log.Println("Create conn, id:", id)
   return &DBConnection{
      ID: id,
   }, nil
}

三、使用资源池

package main

import (
    "sync"
    "github.com/zhaojigang/pool/pool"
    "github.com/zhaojigang/pool/db"
    "log"
    "time"
    "math/rand"
)

const (
    maxGoroutines   = 5 // 要使用的goroutine的数量
    pooledResources = 2 // 池中的资源的数量
)

func performQuery(query int, p *pool.Pool) {
    // 1. 获取连接
    conn, err := p.Acquire()
    if err != nil {
        log.Println("acquire conn error, ", err)
        return
    }

    // 使用结束后,释放链接
    defer p.Release(conn)

    // 该 log 模拟对连接的使用
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
    log.Printf("QID[%d] CID[%d]\n", query, conn.(*db.DBConnection).ID)
}

func main() {
    var waitGroup sync.WaitGroup
    waitGroup.Add(maxGoroutines)
    // 1. 创建一个 Pool
    p, err := pool.New(db.CreateConn, pooledResources)
    if err != nil {
        log.Println("create Pool error")
    }

    // 2. 开启 goroutine 执行任务
    for query := 0; query < maxGoroutines; query++ {
        // 每个goroutine需要自己复制一份要、查询值的副本,
        // 不然所有的查询会共享同一个查询变量,即所有的 goroutine 最后的 query 值都是3
        go func(q int) {
            performQuery(q, p)
            waitGroup.Done()
        }(query)
        //time.Sleep(1000*time.Millisecond) // 用于测试从 resources channel 中获取资源
    }

    // 3. 关闭连接池
    waitGroup.Wait()
    p.Close()
    log.Println("pool closed - main")
}

在高并发的创建 go routine 的情况下,从 pool.go # Acquire 方法中可以看到,大家可能都还没有 Release 资源,此时都会创建资源,资源在一瞬间会大量增加,在实际系统中,需要根据需求,做一些措施,例如:提前创建好资源放入池中,go routine 都从池中取资源,资源不够就等待,使用完之后就放入池中,防止资源意外关闭,还可以启用后台线程监控等。

 

Go 语言入门9 – Channel(通道)

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。(这是除了 atomic 和 mutex 之外的第三种处理竞态资源的方式)
Channel分为两种:

  • 无缓冲的 Channel
    • 无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。
  • 有缓冲的 Channel
    • 有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

一、无缓冲的通道

import (
    "sync"
    "fmt"
    "math/rand"
)

var wg sync.WaitGroup

// Channel 完整的类型是 "chan 数据类型"
func player(name string, court chan int) {
    defer wg.Done()

    for {
        // 1. 阻塞等待接球,如果通道关闭,ok返回false
        ball, ok := <-court
        if !ok {
            fmt.Printf("channel already closed! Player %s won\n", name)
            return
        }

        random := rand.Intn(100)
        if random%13 == 0 {
            fmt.Printf("Player %s Lose\n", name)
            // 关闭通道
            close(court)
            return
        }

        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball ++
        // 2. 发球,阻塞等待对方接球
        court <- ball
    }
}

// 两个 player 打网球,即生产者和消费者模式(互为生产者和消费者)
func main() {
    wg.Add(2)

    // 1. 创建一个无缓冲的通道
    // Channel 完整的类型是 "chan 数据类型"
    court := make(chan int)

    // 2. 创建两个 goroutine
    go player("zhangsan", court)
    go player("lisi", court)

    // 3. 发球:向通道发送数据,阻塞等待通道对端接收
    court <- 1

    // 4. 等待输家出现
    wg.Wait()
}

二、有缓冲的通道

import (
   "sync"
   "fmt"
   "time"
)

// 使用4个goroutine来完成10个任务
const (
   taskNum      = 10
   goroutineNum = 4
)

var countDownLatch sync.WaitGroup

func worker(name string, taskChannel chan string) {
   defer countDownLatch.Done()
   for {
      // 1. 不断的阻塞等待分配工作
      task, ok := <-taskChannel
      if !ok {
         fmt.Printf("channel closed and channel is empty\n")
         return
      }

      //fmt.Printf("worker %s start %s\n", name, task)
      time.Sleep(100 * time.Millisecond)
      fmt.Printf("worker %s complete %s\n", name, task)
   }
}

func main() {
   countDownLatch.Add(goroutineNum)
   // 1. 创建有缓冲区的string channel
   taskChannel := make(chan string, taskNum)

   // 2. 创建 4 个goroutine去干活
   for i := 0; i < goroutineNum; i++ {
      go worker(fmt.Sprintf("worker %d", i), taskChannel)
   }

   // 3. 向通道加入task
   for i := 0; i < taskNum; i++ {
      taskChannel <- fmt.Sprintf("task %d", i)
   }

   // 4. 关闭通道:
   // 当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。
   // 能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。
   // 从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值
   close(taskChannel)

   // 5. 等待
   countDownLatch.Wait()
}

当通道关闭后,go routine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。

从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。

Go 语言入门8 – 多线程

Go routine 基于协程 Coroutine,原理总结:

如果创建一个 goroutine 并准备运行,这个 goroutine 就会被放到调度器的全局运行队列中。之后,调度器就将这些队列中的 goroutine 分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中。本地运行队列中的 goroutine 会一直等待直到自己被分配的逻辑处理器执行。

Go routine 机制原理如下图所示:

1556591431-8545-5842684-caf2ddc8d7ed6338

一、示例

import (
   "runtime"
   "sync"
   "fmt"
)

func main() {
   // 1. 分配一个逻辑处理器给调度器使用
   runtime.GOMAXPROCS(1)

   // 2. 设定等待器,类比 Java CountDownLatch
   var waitGroup sync.WaitGroup
   waitGroup.Add(2)

   fmt.Println("=== start ===")
   // 3. 创建第一个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 4. 创建第二个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 5. 阻塞 main goroutine
   waitGroup.Wait() // CountDownLatch#await()
   fmt.Println("=== end ===")
}

使用 go 关键字创建 Go routine

  • 匿名函数实现方式 go func() {xxx}()
  • 普通函数 funcA 实现方式 go funcA()

二、打断正在运行的 Go routine

  • 基于调度器的内部算法,一个正运行的 go routine 在工作结束前,可以被停止并重新调度。
  • 调度器这样做的目的是防止某个 go routine 长时间占用逻辑处理器。当 go routine 占用时间过长时,调度器会停止当前正运行的 go routine,并给其他可运行的 go routine 运行的机会。

该机制的原理如下图所示:
1556591439-2367-5842684-7023f4c17581356e
步骤:

  • 在第 1 步,调度器开始运行 go routine A,而 go routine B 在运行队列里等待调度。
  • 在第 2 步,调度器交换了 go routine A 和 go routine B。 由于 go routine A 并没有完成工作,因此被放回到运行队列。
  • 在第 3 步,go routine B 完成了它的工作并被系统销毁。这也让 go routine A 继续之前的工作。

注意:上述步骤都是由调度器内部实现的,我们不需要编写代码去实现。

三、设置逻辑处理器数量

    // 为每个物理处理器分配一个逻辑处理器给调度器使用
    runtime.GOMAXPROCS(runtime.NumCPU())

四、竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。

同一时刻只能有一个 goroutine 对共享资源进行读和写操作

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   // 两个 goroutine 同时操作的变量,竞态变量
   counter     int
   waitGroup sync.WaitGroup
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      value := counter
      // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
      // 让其他 goroutine 进行执行
      runtime.Gosched()
      value ++
      counter = value
   }
}

func main() {
   runtime.GOMAXPROCS(1)
   waitGroup.Add(2)

   go incCount(1)
   go incCount(2)

   waitGroup.Wait()
   fmt.Println(counter) // 正确为4,实际上为2
}

代码执行图:
1556591451-4320-5842684-0609cefc2b7bb6a5

 

五、锁机制

5.1、原子类 atomic

import (
   "runtime"
   "sync/atomic"
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      // 使用原子类
      atomic.AddInt64(&counter, 1)
      runtime.Gosched()
   }
}

另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式

import (
   "sync"
   "time"
   "sync/atomic"
   "fmt"
)

var (
   shutdown  int64
   waitGroup sync.WaitGroup
)

func doWork(name string) {
   defer waitGroup.Done()
   for {
      time.Sleep(250 * time.Millisecond)
      // 记载关机标志
      if atomic.LoadInt64(&shutdown) == 1 {
         fmt.Println("shutDown, ", name)
         break
      }
   }
}

func main() {
   waitGroup.Add(2)

   go doWork("A")
   go doWork("B")

   // 给定goroutine执行的时间
   time.Sleep(1000 * time.Millisecond)

   // 设定关机标志
   atomic.StoreInt64(&shutdown, 1)

   waitGroup.Wait()
}

5.2、互斥锁 mutex

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码

 

var (
    // 两个 goroutine 同时操作的变量,竞态变量
    counter     int
    waitGroup sync.WaitGroup
        // 锁,定义一段临界区
    lock sync.Mutex
)

func incCount(int) {
    defer waitGroup.Done()
    for count := 0; count < 2; count++ {
        lock.Lock()
        { // Lock() 与 UnLock() 之间的代码都属于临界区,{}是可以省略的,加上看起来清晰
            value := counter
            // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
            // 让其他 goroutine 进行执行
            // 但是因为锁没有释放,调度器还会继续安排执行该 goroutine
            runtime.Gosched()
            value ++
            counter = value
        }
        lock.Unlock()
        // 释放锁,允许其他正在等待的 goroutine 进入临界区
    }
}
12223242526115
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |