Go 语言入门9 – Channel(通道)

当一个资源需要在 goroutine 之间共享时,通道在 goroutine 之间架起了一个管道,并提供了确保同步交换数据的机制。(这是除了 atomic 和 mutex 之外的第三种处理竞态资源的方式)
Channel分为两种:

  • 无缓冲的 Channel
    • 无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操作。如果两个 goroutine 没有同时准备好,通道会导致先执行发送或接收操作的 goroutine 阻塞等待。这种对通道进行发送和接收的交互行为本身就是同步的。
  • 有缓冲的 Channel
    • 有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。只有在通道中没有要接收的值时,接收动作才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动作才会阻塞。

一、无缓冲的通道

import (
    "sync"
    "fmt"
    "math/rand"
)

var wg sync.WaitGroup

// Channel 完整的类型是 "chan 数据类型"
func player(name string, court chan int) {
    defer wg.Done()

    for {
        // 1. 阻塞等待接球,如果通道关闭,ok返回false
        ball, ok := <-court
        if !ok {
            fmt.Printf("channel already closed! Player %s won\n", name)
            return
        }

        random := rand.Intn(100)
        if random%13 == 0 {
            fmt.Printf("Player %s Lose\n", name)
            // 关闭通道
            close(court)
            return
        }

        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball ++
        // 2. 发球,阻塞等待对方接球
        court <- ball
    }
}

// 两个 player 打网球,即生产者和消费者模式(互为生产者和消费者)
func main() {
    wg.Add(2)

    // 1. 创建一个无缓冲的通道
    // Channel 完整的类型是 "chan 数据类型"
    court := make(chan int)

    // 2. 创建两个 goroutine
    go player("zhangsan", court)
    go player("lisi", court)

    // 3. 发球:向通道发送数据,阻塞等待通道对端接收
    court <- 1

    // 4. 等待输家出现
    wg.Wait()
}

二、有缓冲的通道

import (
   "sync"
   "fmt"
   "time"
)

// 使用4个goroutine来完成10个任务
const (
   taskNum      = 10
   goroutineNum = 4
)

var countDownLatch sync.WaitGroup

func worker(name string, taskChannel chan string) {
   defer countDownLatch.Done()
   for {
      // 1. 不断的阻塞等待分配工作
      task, ok := <-taskChannel
      if !ok {
         fmt.Printf("channel closed and channel is empty\n")
         return
      }

      //fmt.Printf("worker %s start %s\n", name, task)
      time.Sleep(100 * time.Millisecond)
      fmt.Printf("worker %s complete %s\n", name, task)
   }
}

func main() {
   countDownLatch.Add(goroutineNum)
   // 1. 创建有缓冲区的string channel
   taskChannel := make(chan string, taskNum)

   // 2. 创建 4 个goroutine去干活
   for i := 0; i < goroutineNum; i++ {
      go worker(fmt.Sprintf("worker %d", i), taskChannel)
   }

   // 3. 向通道加入task
   for i := 0; i < taskNum; i++ {
      taskChannel <- fmt.Sprintf("task %d", i)
   }

   // 4. 关闭通道:
   // 当通道关闭后,goroutine 依旧可以从通道接收数据,但是不能再向通道里发送数据。
   // 能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。
   // 从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值
   close(taskChannel)

   // 5. 等待
   countDownLatch.Wait()
}

当通道关闭后,go routine 依旧可以从通道接收数据,但是不能再向通道里发送数据。能够从已经关闭的通道接收数据这一点非常重要,因为这允许通道关闭后依旧能取出其中缓冲的全部值,而不会有数据丢失。

从一个已经关闭且没有数据的通道里获取数据,总会立刻返回,并返回一个通道类型的零值。

Go 语言入门8 – 多线程

Go routine 基于协程 Coroutine,原理总结:

如果创建一个 goroutine 并准备运行,这个 goroutine 就会被放到调度器的全局运行队列中。之后,调度器就将这些队列中的 goroutine 分配给一个逻辑处理器,并放到这个逻辑处理器对应的本地运行队列中。本地运行队列中的 goroutine 会一直等待直到自己被分配的逻辑处理器执行。

Go routine 机制原理如下图所示:

1556591431-8545-5842684-caf2ddc8d7ed6338

一、示例

import (
   "runtime"
   "sync"
   "fmt"
)

func main() {
   // 1. 分配一个逻辑处理器给调度器使用
   runtime.GOMAXPROCS(1)

   // 2. 设定等待器,类比 Java CountDownLatch
   var waitGroup sync.WaitGroup
   waitGroup.Add(2)

   fmt.Println("=== start ===")
   // 3. 创建第一个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 4. 创建第二个 goroutine
   go func() {
      defer waitGroup.Done() // CountDownLatch#countDown()

      // 打印3遍字母表
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            fmt.Printf("%c", char)
         }
      }
   }()

   // 5. 阻塞 main goroutine
   waitGroup.Wait() // CountDownLatch#await()
   fmt.Println("=== end ===")
}

使用 go 关键字创建 Go routine

  • 匿名函数实现方式 go func() {xxx}()
  • 普通函数 funcA 实现方式 go funcA()

二、打断正在运行的 Go routine

  • 基于调度器的内部算法,一个正运行的 go routine 在工作结束前,可以被停止并重新调度。
  • 调度器这样做的目的是防止某个 go routine 长时间占用逻辑处理器。当 go routine 占用时间过长时,调度器会停止当前正运行的 go routine,并给其他可运行的 go routine 运行的机会。

该机制的原理如下图所示:
1556591439-2367-5842684-7023f4c17581356e
步骤:

  • 在第 1 步,调度器开始运行 go routine A,而 go routine B 在运行队列里等待调度。
  • 在第 2 步,调度器交换了 go routine A 和 go routine B。 由于 go routine A 并没有完成工作,因此被放回到运行队列。
  • 在第 3 步,go routine B 完成了它的工作并被系统销毁。这也让 go routine A 继续之前的工作。

注意:上述步骤都是由调度器内部实现的,我们不需要编写代码去实现。

三、设置逻辑处理器数量

    // 为每个物理处理器分配一个逻辑处理器给调度器使用
    runtime.GOMAXPROCS(runtime.NumCPU())

四、竞争状态

如果两个或者多个 goroutine 在没有互相同步的情况下,访问某个共享的资源,并试图同时读和写这个资源,就处于相互竞争的状态,这种情况被称作竞争状态(race candition)。

同一时刻只能有一个 goroutine 对共享资源进行读和写操作

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   // 两个 goroutine 同时操作的变量,竞态变量
   counter     int
   waitGroup sync.WaitGroup
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      value := counter
      // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
      // 让其他 goroutine 进行执行
      runtime.Gosched()
      value ++
      counter = value
   }
}

func main() {
   runtime.GOMAXPROCS(1)
   waitGroup.Add(2)

   go incCount(1)
   go incCount(2)

   waitGroup.Wait()
   fmt.Println(counter) // 正确为4,实际上为2
}

代码执行图:
1556591451-4320-5842684-0609cefc2b7bb6a5

 

五、锁机制

5.1、原子类 atomic

import (
   "runtime"
   "sync/atomic"
)

func incCount(int) {
   defer waitGroup.Done()
   for count := 0; count < 2; count++ {
      // 使用原子类
      atomic.AddInt64(&counter, 1)
      runtime.Gosched()
   }
}

另外两个有用的原子函数是 LoadInt64 和 StoreInt64。这两个函数提供了一种安全地读和写一个整型值的方式

import (
   "sync"
   "time"
   "sync/atomic"
   "fmt"
)

var (
   shutdown  int64
   waitGroup sync.WaitGroup
)

func doWork(name string) {
   defer waitGroup.Done()
   for {
      time.Sleep(250 * time.Millisecond)
      // 记载关机标志
      if atomic.LoadInt64(&shutdown) == 1 {
         fmt.Println("shutDown, ", name)
         break
      }
   }
}

func main() {
   waitGroup.Add(2)

   go doWork("A")
   go doWork("B")

   // 给定goroutine执行的时间
   time.Sleep(1000 * time.Millisecond)

   // 设定关机标志
   atomic.StoreInt64(&shutdown, 1)

   waitGroup.Wait()
}

5.2、互斥锁 mutex

互斥锁用于在代码上创建一个临界区,保证同一时间只有一个 goroutine 可以 执行这个临界区代码

 

var (
    // 两个 goroutine 同时操作的变量,竞态变量
    counter     int
    waitGroup sync.WaitGroup
        // 锁,定义一段临界区
    lock sync.Mutex
)

func incCount(int) {
    defer waitGroup.Done()
    for count := 0; count < 2; count++ {
        lock.Lock()
        { // Lock() 与 UnLock() 之间的代码都属于临界区,{}是可以省略的,加上看起来清晰
            value := counter
            // 当前的 goroutine 主动让出资源,从线程退出,并放回到队列,
            // 让其他 goroutine 进行执行
            // 但是因为锁没有释放,调度器还会继续安排执行该 goroutine
            runtime.Gosched()
            value ++
            counter = value
        }
        lock.Unlock()
        // 释放锁,允许其他正在等待的 goroutine 进入临界区
    }
}

Go 语言入门7 – 资源管理与错误处理

一、资源管理

Java 有 try-finally,可以在 finally 中进行资源的关闭;Go 可以使用 defer

  • defer 在函数结束时发生调用
  • defer 的调用是栈类型 – 先进后出
  • defer 通常用于资源关闭 Open/Close,Lock/UnLock 等

一句话总结:defer 的调用机制是 “将defer语句加入栈中,当函数结束时(包括正常执行结束 / return / panic 出错结束等),从栈中依次执行 defer”

func writeFile(filename string) {
   file, err := os.Create(filename)
   if err != nil {
      panic(err)
   }
   defer file.Close() // 将 "file.Close()" 压入 defer 栈中

   writer := bufio.NewWriter(file)
   defer writer.Flush() // 将 "writer.Flush()" 压入 defer 栈中

   fmt.Fprintln(writer, "123")
   // 当函数执行结束时,从 defer 栈中执行语句 - 后进先出,先 "writer.Flush()",再 "file.Close()"
}

func main() {
   writeFile("defer.txt")
}

二、错误简单处理

使用机制

通过被调用函数的注释查看其可能发生的错误,然后依据错误类型并进行处理;
错误处理结束后要 return

func main() {
   file, err := os.Open(filename)
   // 错误处理
   if err != nil {
      // 判断 err 是否是 *os.PathError,因为 os.Open(filename) 的注释:"If there is an error, it will be of type *PathError."
      if pathError, ok := err.(*os.PathError); ok {
         fmt.Printf("error: %s", pathError.Error())
      } else {
         fmt.Printf("notKnown error:%s", err.Error())
      }
      return // 返回
   }
}

//error 是一个接口,定义如下:

// The error built-in interface type is the conventional interface for
// representing an error condition, with the nil value representing no error.
type error interface {
   Error() string
}

基于该接口我们可以实现自定义的 error 实现。( errors 是 error 接口的一个实现,可以直接仿照,也可以直接使用,如 err := errors.New(“my custom error”))

三、panic & recover

  • panic
    • 停止当前程序运行
    • 一直向上返回,执行每一层的 defer
    • 如果没有遇见 recover,程序退出
  • recover(相当于对 panic 的 catch 语句)
    • 仅在 defer 调用中使用
    • 获取 panic 的值
    • 如果无法处理,可重新 panic
import (
   "fmt"
   "errors"
)

func recove() {
   defer func() {
      // func recover() interface{},表示 recover() 函数的返回类型可以是各种类型,所以要判断是否是 error
      // 使用 recover() catch panic,防止程序直接退出
      r := recover()
      if err, ok := r.(error); ok {
         fmt.Println(err) // runtime error: integer divide by zero
      } else {
         panic(errors.New("not known error"))
      }
   }()

   b := 0
   a := 5/b // panic: runtime error: integer divide by zero
   fmt.Println(a)

   //panic("123") // panic: not known error
}

func main() {
   recove()
}

四、错误统一处理

一个生产系统通常包含两种异常

  • 不可直接暴露给用户的异常:例如系统内部异常
  • 需要暴露给用户的异常:例如部分自定义异常信息用于提示用户操作

本节写一个需求:实现一个读取文件的 httpServer 处理器。
代码结构如下:

1556591347-7773-5842684-d463e5d73785616d

4.1 userError 自定义用户异常接口

package exception

type UserError interface {
   error // 内嵌类型
   Message() string
}

4.2 myCustomError 自定义用户异常实现

package exception

// 基于基本类型创建自定义类型
type MyCustomError string

func (e MyCustomError) Error() string {
   return e.Message()
}

func (e MyCustomError) Message() string {
   return string(e)
}

4.3 handler 核心业务逻辑处理器

package handler

import (
"net/http"
"os"
"io/ioutil"
"strings"
"exception"
)

const PathPrefix = "/list/"

// 实现一个读取文件的 httpServer 处理器
// 假设访问 http://localhost:8888/list/abc.txt
func HandleFileListing(writer http.ResponseWriter, request *http.Request) error {
   // 1. 如果 urlPath 不是以 /list/ 开头的,则自定义用户错误
   if strings.Index(request.URL.Path, PathPrefix) != 0 {
      return exception.MyCustomError("url path need startWith /list/")
   }
   //fmt.Println("path", request.URL.Path)    // /list/abc.txt
   path := request.URL.Path[len(PathPrefix):] // abc.txt 字符串切割,subString

   // 2. 打开文件
   file, err := os.Open(path)
   if err != nil {
      // 遇到错误直接返回,由错误统一处理器进行处理
      return err
   }
   defer file.Close()

   // 3. 读取文件到 byte[]
   all, err := ioutil.ReadAll(file)
   if err != nil {
      return err
   }

   // 4. 将 byte[] all 写出到响应流
   writer.Write(all)
   return nil
}

4.4 errorWrapperHandler 统一错误处理器

package exception

import (
"net/http"
"log"
"os"
)

// 定义一个 function 类型的 type,返回值是 error
type appHandler func(writer http.ResponseWriter, request *http.Request) error

// 输入 appHandler 是一个函数,输出也是一个函数 - 函数式编程
func ErrWrapper(handler appHandler) func(http.ResponseWriter, *http.Request) {
   return func(writer http.ResponseWriter, request *http.Request) {
      // 1. 处理业务逻辑
      err := handler(writer, request)
      if err != nil {
         log.Printf("error occured, %s", err) // 2018/11/04 10:10:12 error occured, open abc.txt1: no such file or directory

         // 2. 处理可以抛给用户的错误
         if err, ok := err.(UserError); ok {
            // 将错误写回到 http.ResponseWriter
            http.Error(writer, err.Message(), http.StatusBadRequest)
         }

         // 3. 处理不可以抛给用户的错误
         code := http.StatusOK
         switch {
         case os.IsNotExist(err):
            code = http.StatusNotFound
         default:
            code = http.StatusInternalServerError
         }
         http.Error(writer, http.StatusText(code), code) // 浏览器:Not Found
      }
   }
}

注意这样的姿势:定义一个 function 类型的 type,返回值是 error

type appHandler func(xx) error

4.5 web httpServer 服务器

package main

import (
"net/http"
"handler"
"exception"
)

func main() {
   // 1. 注册处理 handler.PathPrefix 开头的业务逻辑处理器
   http.HandleFunc(handler.PathPrefix, exception.ErrWrapper(handler.HandleFileListing))

   // 2. 启动 httpServer,监听端口
   err := http.ListenAndServe("127.0.0.1:8888", nil)

   // 3. 如果启动失败,则直接抛出错误
   if err != nil {
      panic(err)
   }
}

Go 语言入门6 – 闭包

// 该函数的返回值是一个函数:func(value int) int
func adder() func(value int) int {
    sum := 0 // 自由变量,Go 的闭包是可以含有自由变量的
    return func(value int) int {
        sum += value
        return sum
    }
}

func main() {
    add := adder() // 此处返回一个函数
    for i := 0; i < 10; i++ {
        fmt.Printf("0 + ... + %d = %d", i, add(i)) //调用函数add
        fmt.Println()
    }
}
  • adder() 函数的返回值是一个函数
  • 注意:在 Go 语言中,闭包是可以含有自由变量的

Go 语言入门5 – 接口

一、接口

1.1、定义接口

// notifier 是一个定义了通知类行为的接口
type notifier interface {
    // 接口方法
    notify()
}

1.2、实现接口

  • 使用值接收者实现接口
import "fmt"

// notify 是使用值接收者实现 notifier interface 接口的方法
// sendNotification(&u) 和 sendNotification(u) 都可
func (u user) notify() {
   fmt.Println("notify", u)
}

对于值接收者,sendNotification(&u) 和 sendNotification(u) 都可

  • 使用指针接收者实现接口
// notify 是使用指针接收者实现 notifier interface 接口的方法
// 只能使用 sendNotification(&u)
func (u *user) notify() {
    fmt.Println("notify", *u)
}

对于指针接收者,只能使用 sendNotification(&u)

非常重要的点 - 值接收者与指针接收的对比
https://stackoverflow.com/questions/27775376/value-receiver-vs-pointer-receiver-in-golang:详细的介绍了什么时候使用值接收者,什么时候使用指针接收者
https://studygolang.com/articles/1113
https://blog.csdn.net/suiban7403/article/details/78899671
type User struct {
    Name  string
    Email string
}
// 注意这个是值接收者
func (u User) Notify() error {
    u.Name = "alimon"
    return nil
}

func main() {
    u := &User{"Damon", "damon@xxoo.com"}
    u.Notify()
    log.Println(u.Name)
}

注意

  • 值接收者(func (u User) Notify() error)操作的是 User 的副本(不管调用者使用值还是地址),所以不会改变其内部的值,这里输出还是 Damon
  • 指针接收者(func (u *User) Notify() error)操作的是 User 本身,所以其内部的值会发生变化,这里输出是 alimon

什么时候使用值接收者,什么时候使用指针接收者

If the receiver is a map, func or chan, don’t use a pointer to it.
If the receiver is a slice and the method doesn’t reslice or reallocate the slice, don’t use a pointer to it.
If the method needs to mutate(使 receiver 改变) the receiver, the receiver must be a pointer.
If the receiver is a struct that contains a sync.Mutex or similar synchronizing field, the receiver must be a pointer to avoid copying.
If the receiver is a large struct or array, a pointer receiver is more efficient. How large is large? Assume it’s equivalent to passing all its elements as arguments to the method. If that feels too large, it’s also too large for the receiver.
Can function or methods, either concurrently or when called from this method, be mutating the receiver? A value type creates a copy of the receiver when the method is invoked, so outside updates will not be applied to this receiver. If changes must be visible in the original receiver, the receiver must be a pointer.
If the receiver is a struct, array or slice and any of its elements is a pointer to something that might be mutating, prefer a pointer receiver, as it will make the intention more clear to the reader.
If some of the methods of the type must have pointer receivers, the rest should too, so the method set is consistent regardless of how the type is used
If the receiver is a small array or struct that is naturally a value type (for instance, something like the time.Time type), with no mutable fields and no pointers, or is just a simple basic type such as int or string, a value receiver makes sense.
A value receiver can reduce the amount of garbage that can be generated; if a value is passed to a value method, an on-stack copy can be used instead of allocating on the heap. (The compiler tries to be smart about avoiding this allocation, but it can’t always succeed.) Don’t choose a value receiver type for this reason without profiling first.

Finally, when in doubt, use a pointer receiver.

总结一下:

  • 如果需要改变 receiver 内部的属性值,选择指针接收者;
  • 如果 struct 中的一个方法使用了指针接收者,那么该 struct 内的全部方法都是用指针接收者 – 一致性

1.3、使用接口

// sendNotification 接受一个实现了 notifier 接口的值并发送通知
func sendNotification(n notifier) {
    n.notify()
}

func main() {
    u := user{"nana"}
    sendNotification(&u) // notify {nana}
}

二、实现多态

api.go

package api

// 定义接口
type Notifier interface {
    Notify()
}

接口名和接口方法大写表示 public,小写表示 java 的 default(即包隔离级别)

user.go

package impl

import "fmt"

// 定义实现类 user
type User struct {
    Name string
}

func (u *User) Notify() {
    fmt.Println("user", *u)
}

admin.go

package impl

import "fmt"

// 定义实现类 Admin,首字母大写表示 public
type Admin struct {
    Name string
}

func (a *Admin) Notify() {
    fmt.Println("admin", *a)
}

main.go

package main

import (
    // 相对于 GOPATH/src 下的地址
    "github.com/zhaojigang/helloworld/ooi/api"
    "github.com/zhaojigang/helloworld/ooi/impl"
)

func sendNotification(n api.Notifier) {
    n.Notify()
}

func main() {
    u := impl.User{"nana"}
    sendNotification(&u) // user {nana}

    a := impl.Admin{"zhao"}
    sendNotification(&a) // admin {zhao}
}

三、嵌入类型内部接口实现提升

基于上述程序修改 admin.go 和 main.go。

admin.go

package impl

// 定义实现类 Admin,首字母大写表示 public
type Admin struct {
    User // 嵌入类型
    Name string
}

注意:该类没有实现接口 notifier 的 notify() 方法,但是由于该类的内嵌类 User 实现了,内嵌类会提升到外部类中,所以 Admin 类也是 notifier 接口的实现类,其实现函数就是 User#Notify() ,当然,可以 Admin 可以自己实现 Notify() 来覆盖 User#Notify()。

main.go

package main

import (
    // 相对于 GOPATH/src 下的地址
    "github.com/zhaojigang/helloworld/ooi/api"
    "github.com/zhaojigang/helloworld/ooi/impl"
)

func sendNotification(n api.Notifier) {
    n.Notify()
}

func main() {
    u := impl.User{"nana"}
    sendNotification(&u) // user {nana}

    a := impl.Admin{u,"zhao"}
    sendNotification(&a) // user {nana}
}

四、组合接口

=========================== Api1 ===========================
package api

type Api1 interface {
    Api1()
}
=========================== Api2 ===========================
package api

type Api2 interface {
    Api2()
} 
=========================== Api12(组合接口) ===========================
package api

type Api12 interface {
    Api1
    Api2
}
=========================== Api12Impl ===========================
package impl

import "fmt"

type Api12Impl struct {
    Name string
}

func (api12 *Api12Impl) Api1()  {
    fmt.Println("api1", api12.Name)
}

func (api12 *Api12Impl) Api2()  {
    fmt.Println("api2", api12.Name)
}
=========================== main ===========================
package main

import (
    // 相对于 GOPATH/src 下的地址
    "github.com/zhaojigang/helloworld/ooi/api"
    "github.com/zhaojigang/helloworld/ooi/impl"
)

func sendNotification(n api.Api12) {
    n.Api1() // api1 nana
    n.Api2() // api2 nana
}

func main() {
    api12 := impl.Api12Impl{"nana"}
    sendNotification(&api12)
}
1171819202133
 
Copyright © 2008-2021 lanxinbase.com Rights Reserved. | 粤ICP备14086738号-3 |