Go 语言入门9 – Channel(通道)

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。(这是除了 atomic 和 mutex 之外的第三种处理竞态资源的方式)
Channel分为两种:

  • 无缓冲的 Channel
    • 无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。
  • 有缓冲的 Channel
    • 有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

一、无缓冲的通道

import (
    "sync"
    "fmt"
    "math/rand"
)

var wg sync.WaitGroup

// Channel 完整的类型是 "chan 数据类型"
func player(name string, court chan int) {
    defer wg.Done()

    for {
        // 1. 阻塞等待接球,如果通道关闭,ok返回false
        ball, ok := <-court
        if !ok {
            fmt.Printf("channel already closed! Player %s won\n", name)
            return
        }

        random := rand.Intn(100)
        if random%13 == 0 {
            fmt.Printf("Player %s Lose\n", name)
            // 关闭通道
            close(court)
            return
        }

        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball ++
        // 2. 发球,阻塞等待对方接球
        court <- ball
    }
}

// 两个 player 打网球,即生产者和消费者模式(互为生产者和消费者)
func main() {
    wg.Add(2)

    // 1. 创建一个无缓冲的通道
    // Channel 完整的类型是 "chan 数据类型"
    court := make(chan int)

    // 2. 创建两个 goroutine
    go player("zhangsan", court)
    go player("lisi", court)

    // 3. 发球:向通道发送数据,阻塞等待通道对端接收
    court <- 1

    // 4. 等待输家出现
    wg.Wait()
}

二、有缓冲的通道

import (
   "sync"
   "fmt"
   "time"
)

// 使用4个goroutine来完成10个任务
const (
   taskNum      = 10
   goroutineNum = 4
)

var countDownLatch sync.WaitGroup

func worker(name string, taskChannel chan string) {
   defer countDownLatch.Done()
   for {
      // 1. 不断的阻塞等待分配工作
      task, ok := <-taskChannel
      if !ok {
         fmt.Printf("channel closed and channel is empty\n")
         return
      }

      //fmt.Printf("worker %s start %s\n", name, task)
      time.Sleep(100 * time.Millisecond)
      fmt.Printf("worker %s complete %s\n", name, task)
   }
}

func main() {
   countDownLatch.Add(goroutineNum)
   // 1. 创建有缓冲区的string channel
   taskChannel := make(chan string, taskNum)

   // 2. 创建 4 个goroutine去干活
   for i := 0; i < goroutineNum; i++ {
      go worker(fmt.Sprintf("worker %d", i), taskChannel)
   }

   // 3. 向通道加入task
   for i := 0; i < taskNum; i++ {
      taskChannel <- fmt.Sprintf("task %d", i)
   }

   // 4. 关闭通道:
   // 当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。
   // 能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。
   // 从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值
   close(taskChannel)

   // 5. 等待
   countDownLatch.Wait()
}

当通道关闭后,go routine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。

从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。

Leave a Comment