1. Go并发

2. 第八章 Go并发

Go语言区别于其他语言的一大特点就是出色的并发性能,最重要的一个特性那就是go关键字。

并发场景:

  • UI小姐姐一边开着PS软件,一边微信疯狂的和产品经理打字交流,后台还听着网易云音乐。。
  • 双11当天。。大伙疯狂的访问淘宝网站
  • CPU从单核向多核发展,计算机程序不该是串行的,浪费资源
  • 串行程序由于IO操作被阻塞,整个程序处于停滞状态,其他IO无关的任务无法执行

并发必要性:

  • 充分利用CPU核心的优势,提高程序执行效率

实现并发的模型:

  • 多进程,多进程是在操作系统层面并发的基本模式,进程间互不影响,但是开销最大,进程由内核管理。
  • 多线程,属于系统层面的并发模式,也是用的最多的有效模式,大多数软件使用多线程,开销小于多进程。
  • 基于回调的非阻塞/异步IO。此架构处于多线程模式的危机,高并发服务器下,多线程会消耗殆尽服务器的内存和CPU资源。而通过事件驱动的方式使用异步IO,尽可能少用线程,降低开销,Node.js就是如此实践,但是此模式编程复杂度较高。
  • 协程,Coroutine是一种用户态线程,寄存于线程中,系统开销极小,可以有效提高线程任务并发性,使用方式简单,结构清晰,避免多线程的缺点。需要编程语言的支持,如不支持,需要用户自行实现调度器。

共享内存系统是比较常用的并发模式,线程之间通信采用共享内存的方式,程序员需要加锁等操作避免死锁、资源竞争等问题。

计算机科学家又研制出了消息传递系统对线程间共享状态的各种操作被封装在线程之间传递的消息中

Communicating Sequential Processes(顺序通信进程),在CSP系统中,所有的并发操作都是通过独立线程以异步的方式运行,这些线程必须通过再彼此之间发送消息,从而向另一个线程请求信息。

2.1. 进程和线程

进程是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。

线程是进程的一个执行实例,是比进程更小的独立运行的基本单位。

进程可以创建或销毁多个线程,同一个进程中的多个线程可以并发执行(如百度云盘进程中的,多个下载任务)。

一个程序至少一个进程,一个进程至少一个线程。

“并发”指的是程序的结构,“并行”指的是程序运行时的状态

2.1.1. 并行(parallelism)

这个概念很好理解。所谓并行,就是同时执行的意思,无需过度解读。判断程序是否处于并行的状态,就看同一时刻是否有超过一个“工作单位”在运行就好了。

所以,单线程永远无法达到并行状态

要达到并行状态,最简单的就是利用多线程和多进程。

但是 Python 的多线程由于存在著名的 GIL,无法让两个线程真正“同时运行”,所以实际上是无法到达并行状态的。

2.1.2. 并发(concurrency)

要理解“并发”这个概念,必须得清楚,并发指的是程序的“结构”

当我们说这个程序是并发的,实际上,这句话应当表述成“这个程序采用了支持并发的设计”。好,既然并发指的是人为设计的结构,那么怎样的程序结构才叫做支持并发的设计?

正确的并发设计的标准是:使多个操作可以在重叠的时间段内进行(two tasks can start, run, and complete in overlapping time periods)

2.1.3. 并发和并行

1)多线程程序在单核CPU上运行,就是并发

2)多线程程序在多核CPU上运行,就是并行


为何人们常说提升并发,而不是提升并行

因为并发是通过时间片轮转进行进程调度,是通过技术手段提升并发。

并行是通过硬件提升效率,有钱人可以买一个128核的服务器。


2.2. 协程是什么

执行单位是个抽象的概念,操作系统层面有多个概念与之对应,比如操作系统掌管的进程(process)、进程内的线程(thread)以及进程内的协程(coroutine)

协程在于轻量级,轻松创建百万个而不会导致系统资源衰竭。

多数语言语法层面不直接支持协程,而是通过库的方式支持,然而库的功能也仅仅是线程的创建、销毁与切换,而无法达到协程调用同一个IO操作,如网络通信,文件读写等。

非抢占式多任务处理,由协程主动交出控制权。消耗的资源更少

编译器/解释器/虚拟机层面的多任务,实现的协程调度。

多个协程可能在一个或多个线程上运行。

2.3. 其他语言的协程

c++ Boost.Coroutine
Java不支持
Python 用yield关键字实现协程  3.5之后async def对协程支持,异步函数的定义
golang 不需要定义时区分是否是异步函数 go func(){}

2.4. goroutine

Golang在语言层面支持协程,名为goroutine,Go语言标准库提供所有系统调用操作,都会让出CPU给其他goroutine,使得协程切换管理不依赖于系统的线程和进程,也不依赖于CPU核数。

一个Go进程,可以启动多个goroutine协程。

main函数也是goroutine。

一个普通的机器运行几十个线程负载已经很高了,然而可以轻松创建百万个goroutine。

go标准库的net包,写出的go web server性能直接媲美Nginx。



2.5. goroutine可能的切换点

I/O 
select
channel
函数调用
runtime.Gosched()
等待锁

2.6. goroutine入门

第一个goroutine,开启协程,执行函数hello()

package main

import (
    "fmt"
    "time"
)

func hello() {
    fmt.Println("hello goroutine")
}

func main() {
    go hello()
    fmt.Println("main thread terminate")
    time.Sleep(time.Second)
}

批量开启协程

package main

import (
    "fmt"
    "time"
)

func hello(i int) {
    fmt.Println("hello goroutine", i)
}

func main() {
    //循环开启10个协程,分别执行hello()函数
    for i := 0; i < 10; i++ {
        go hello(i)
    }
    time.Sleep(time.Second)
}

编写代码,完成功能

1.在go主进程中,开启goroutine,该协程每秒输出一个你好,我是goroutine

2.在主进程中也每秒输出一个我很好,我是主进程,输出10次后退出程序

3.要求主进程和goroutine同时执行

package main

import (
    "fmt"
    "strconv"
    "time"
)

//定义一个协程任务函数
func test() {
    for i := 0; i <= 10; i++ {
        fmt.Println("你好,我是goroutine" + strconv.Itoa(i))
        time.Sleep(time.Second) //睡眠1秒
    }
}

func main() {
    go test()
    for i := 0; i <= 10; i++ {
        fmt.Println("我很好,我是主进程" + strconv.Itoa(i))
        time.Sleep(time.Second) //睡眠1秒
    }
}


提示:检测主进程结束,协程也立即结束,或是检测主进程未结束,协程提前退出,可以修改for循环的次数!

2.7. runtime包控制goroutine

runtime.Gosched()让出时间片,如同接力赛跑,让出了接力棒。

gosched如同yield作用,暂停当前的goroutine,放回队列等待下次执行。

package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func() {
        for i := 0; i < 5; i++ {
            fmt.Println("你愁啥")
        }
    }()

    for i := 0; i < 2; i++ {
        //让出时间片,让其他协程执行
        runtime.Gosched()
        fmt.Println("尼古拉斯赵四")
    }
}

runtime.Goexit()终止当前协程

package main

import (
    "fmt"
    "runtime"
    "time"
)

func test() {
    defer fmt.Println("ccc")
    //return  //函数终止,打印a  c  b  结束
    runtime.Goexit() //退出所在协程,  打印 a c  退出主进程
    fmt.Println("ddd")
}

func main() {
    go func() {
        fmt.Println("aaa")
        test()
        fmt.Println("bbb")
    }()
    //
    time.Sleep(time.Second * 3)
}

Go与多核的优势,设置cpu运行数目

go version < 1.8 需要手动设置多核

go version > 1.8 默认用多核,无须设置

package main

import (
    "fmt"
    "runtime"
)

func main() {
    cpuNum := runtime.NumCPU()
    //可以在这演示下单核时,时间片无切换,仅仅打印数字0的实验 runtime.GOMAXPROCS(1)
    runtime.GOMAXPROCS(cpuNum)
    for i := 0; i < 500; i++ {
        go fmt.Print(1)
        fmt.Print(0)
    }
}

2.8. goroutine使用recover

package main

import (
    "fmt"
    "time"
)

func test() {
    defer func() {
        if err := recover(); err != nil {
            fmt.Println("出错了:", err)
        }
    }()

    var m map[string]int
    //map必须初始化才能使用
    //m=make(map[string]int,10)
    m["stu"] = 111

}

func calc() {
    for {
        fmt.Println("我是calc函数")
        time.Sleep(time.Second)
    }
}

func main() {
    go test() //协程执行函数,这个函数有错的话,程序会panic退出,做好异常捕捉
    for i := 0; i < 2; i++ {
        go calc()
    }
    time.Sleep(time.Second * 10)
}

3. 8.2 Go 锁

案例(坑):多个goroutine操作同一个map。

go提供了一种叫map的数据结构,可以翻译成映射,对应于其他语言的字典、哈希表。借助map,可以定义一个键和值,然后可以从map中获取、设置和删除这个值,尤其适合数据查找的场景。

但是map的使用有一定的限制,如果是在单个协程中读写map,那么不会存在什么问题,如果是多个协程并发访问一个map,有可能会导致程序退出,并打印下面错误信息。

fatal error: concurrent map writes

错误案例代码

package main

var myMap = make(map[int]int, 10)

//要求计算50的阶乘结果
//阶乘就是1*2*3*4...50 =?

func test(n int) {
    //定义初始值1
    res := 1
    //每次循环进行阶乘
    for i := 0; i <= n; i++ {
        res *= i
    }
    //最终计算结果,写入map
    //由于多个协程同时操作map,引发资源竞争报错
    myMap[n] = res

}
func main() {
    //开启50个协程
    for i := 0; i < 10; i++ {
        go test(10)
    }
}

并发访问map是不安全的操作,在协程中访问map,必须提供某种同步资源机制,使用sync.Mutex互斥锁同步解决协程的竞争问题。

package main

import (
    "fmt"
    "sync"
    "time"
)

var lock sync.Mutex

func Printer(str string) {
    //我现在开始使用打印机了,其他人都等我完事了再来
    lock.Lock()
    for _, data := range str {
        fmt.Printf("%c", data)
        time.Sleep(time.Second)
    }
    //我完事了,你们上吧
    lock.Unlock()
    fmt.Printf("\n")
}

func Alex() {
    Printer("hello")
}
func Wupeiqi() {
    Printer("oldboy")
}

func main() {
    //coffe(10)//单线程执行函数
    go Alex()
    go Wupeiqi()
    //主线程等待协程结束后 再退出
    time.Sleep(time.Second * 10)
}

结论:

1.可以使用加锁的方式解决goroutine的通讯。
2.主线程等待所有goroutine的时间难以确定,设置固定的等待时间肯定不合理。
3.对全局变量加锁同步来通讯,也不利于多个协程对变量的读写。
4.要让主线程等待所有goroutine退出后在退出,如何知道所有goroutine都退出了呢?
5.因此,channel应运而生!!

4. 8.3 Go channel

在Go语言中,关键字go的引入使得Go语言并发编程更加简单而优雅,但是并发编程的复杂性,以及时刻关注并发编程容易出现的问题需要时刻警惕。

并发编程的难度在于协调,然而协调就必须要交流,那么并发单元之间的通信是最大的问题。

之前说了在程序中两种并发通信模型:共享数据和消息

共享数据是指多个并发单元分别保持对同一个数据的引用,实现对数据的共享。

共享数据可能是内存数据块、磁盘文件、网络数据等。

Go语言既然一并发为核心,它提供了另一种通信模型,以消息机制而非共享内存作为通信方式

消息机制:每个并发单元的输入和输入只有一种。

4.1. channel介绍

Go语言提供的消息通信机制被称为channel

channel是go语言在语言级别提供的goroutine间的通信方式。
channel是有类型的,一种channel只能传递一种类型值,这个类型在声明channel时定义。

1.channel本质是一个数据结构(队列)

2.channel数据遵循FIFO,first in first out。

3.channel本身是线程安全的,多个goroutine访问时不需要加锁。

4.一个string类型的channel只能放入string类型数据。

4.2. 定义channel

基本语法:
var 管道名 chan  管道类型
如:
var intChan chan int
var strChan chan string
var stuChan chan Student //结构体类型channel
var mapChan chan map[string]string //map类型channel
var boolChan chan bool //布尔类型channel

注意:

channel是引用类型
channel必须make初始化后方可使用

4.3. channel使用

package main

import "fmt"

func main() {
   //初始化一个可以存放3个int类型数据的管道
   //创建方式一
   //var intChan chan int
   //intChan = make(chan int, 3)

   //创建方式二,简短声明创建,如果不make初始化,默认chan是nil
   intChan := make(chan int, 3)
   //查看下channel里面有什么
   fmt.Printf("intChan值:%v    intChan地址:%p\n", intChan, &intChan)
   fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))

   //向管道写入数据,只能写入int类型
   intChan <- 10
   //放入第二个数据
   num := 999
   intChan <- num
   //放入第三个数据
   intChan <- 666
   //注意我们初始化chan的容量限制在3,数据不能超过chan的容量
   fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))

   //从管道中读取数据
   //定义一个变量接收chan的数据,注意先进先出的规律
   res := <-intChan //
   res2 := <-intChan
   fmt.Println(res, res2)
   fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))

   //数据遵循,有多少读多少,读空的channel也报错
   res3 := <-intChan
   fmt.Println(res3)
   fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))
}

4.4. channel注意事项

1.channel只能存放声明的数据类型

2.channel数据放满了,不得再放入,否则panic报错,死锁

3.取出一个channel数据,少了一个坑,那可以再放入一个数据

4.数据取空后,不得在取,否则panic报错

4.5. channel存放数据类型

1.map类型channel

package main

import "fmt"

func main() {
    mapChan := make(chan map[string]string, 10) //初始化创建map类型的chan,可以放入map数据
    myMap := make(map[string]string, 10)        //初始化创建map
    myMap["姓名"] = "黑旋风"
    myMap["年纪"] = "28"

    mymap2 := make(map[string]string, 10) //初始化创建第二个map
    mymap2["姓名"] = "小妖精"

    mapChan <- myMap
    mapChan <- mymap2

    fmt.Printf("值:%v 长度:%v 容量:%v\n", mapChan, len(mapChan), cap(mapChan))
}

2.结构体类型channel

package main

import "fmt"

type Student struct {
    Name string
    Age  int
}

func main() {
    //创建管道
    structChan := make(chan Student,2)
    //创建结构体对象
    s1 := Student{"艾利克斯", 18}
    s2 := Student{"银角大王吧", 19}
    //数据放入管道
    structChan <- s1
    structChan <- s2

    //取出管道数据
    stu1 := <-structChan
    stu2 := <-structChan
    fmt.Println(stu1, stu2)
}

3.存放指针数据的管道

package main

import "fmt"

type Student struct {
    Name string
    Age  int
}

func main() {
    //创建管道
    stuChan := make(chan *Student, 10)
    //创建结构体对象
    s1 := Student{"王二狗", 18}
    s2 := Student{"王八犊子", 19}
    //传入结构体对象到管道
    stuChan <- &s1
    stuChan <- &s2
    //取出管道的值
    stu1 := <-stuChan
    stu2 := <-stuChan
    fmt.Printf("%v %v\n", stu1, stu2)
}

4.创建可以接收任意数据类型的channel,注意,空接口可以存放任意数据类型。

package main

import "fmt"

type Student struct {
    Name string
    Age  int
}

func main() {
    allChan := make(chan interface{}, 10)
    allChan <- 123
    allChan <- "我是字符串"
    res1 := <-allChan
    res2 := <-allChan
    fmt.Printf("res1值:%v 类型:%T\n", res1, res1)
    fmt.Printf("res2值:%v 类型:%T\n", res2, res2)

    //定义结构体
    s1 := Student{"王麻子", 11}
    allChan <- s1
    //取出结构体,查看类型
    newStu := <-allChan
    fmt.Printf("%T %v\n", newStu, newStu)
    //newStu此时无法直接使用,必须类型断言后使用
    r := newStu.(Student) //断言它是结构体类型
    fmt.Printf("r值:%v 类型:%T\n", r, r)
}

4.6. channel缓冲区

初始化channel的方式是make函数。

ch :=make(chan int) //无缓冲区的管道
ch2:=make(chan int,0)//无缓冲区的管道
ch3:=make(chan int,10)//有缓冲区的管道,且容量是10

如图所示:

两个协程,传递数据,无缓存的管道,就是不可以保存数据的通道。

无缓冲的管道:在管道接受数据前,不保存任何值。
无缓存的管道:这种类型的管道要求两个goroutine同时接、收值。
如果goroutine没同时准备好,会导致有一方阻塞状态。
这种管道进行接、收值的行为,本身是同步状态。

无缓冲管道案例

package main

import (
    "fmt"
    "time"
)

//运动员1
func write(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
        fmt.Printf("棒子%d号来了\n", i)
        //每秒塞一个棒子
        time.Sleep(time.Second)
        fmt.Println("-------------------")
    }
}

//运动员2
func read(ch chan int) {
    for v := range ch {
        fmt.Printf("拿到了棒子%d号\n", v)
    }
}

func main() {
    intChan := make(chan int, 0)
    go write(intChan)
    go read(intChan)
    time.Sleep(10 * time.Second)
}

有缓冲的管道

有缓冲的管道:在被接收前能存储一个或多个值的通道。
这种管道不要求goroutine必须同时发送和接收。
只有管道中没有要接收的值,接收动作才会阻塞。
只有管道塞满了数据,发送动作才会阻塞。

示意图

package main

import (
    "fmt"
    "time"
)

//运动员1
func write(ch chan int) {
    for i := 0; i < 10; i++ {
        ch <- i
        fmt.Printf("棒子%d号来了\n", i)
        //每秒塞一个棒子
        fmt.Println("-------------------")
    }
}

//运动员2
func read(ch chan int) {
    for v := range ch {
        fmt.Printf("拿到了棒子%d号\n", v)
    }
}

func main() {
    intChan := make(chan int, 5)
    go write(intChan)
    go read(intChan)
    time.Sleep(2 * time.Second)
}

4.7. 关闭channel

使用内置函数close可以关闭channel。

channel关闭后,就不能再向channel写入数据了,但是仍然可以从channel读取数据。

package main

import "fmt"

func main() {
    intChan := make(chan int, 3)
    intChan <- 123
    intChan <- 456
    close(intChan) //通道关闭,游客禁止入内,数据也无法写入
    //intChan<-123  //此处已经不能加载 panic: send on closed channel

    fmt.Println("管道已经关闭,在管道中的游客请尽快出来")
    n1 := <-intChan
    n2 := <-intChan
    fmt.Println(n1, n2)
}

遍历channel的方式

package main

import "fmt"

func main() {
    intChan := make(chan int, 100)
    for i := 0; i < 100; i++ {
        intChan <- i * i //循环写入一百个数据
    }

    //遍历方式1,取出管道值
    //for i := 0; i < len(intChan); i++ {
    //    fmt.Println(i)
    //}

    //遍历方式2,for range,参数只有一个
    //在遍历管道时,必须先关闭,否则死锁报错
    close(intChan)
    for v := range intChan {
        fmt.Printf("类型:%T 值:%d\n", v, v)
    }
}

4.8. 单向channel

默认情况下,channel是双向的,既可以写入数据,也可以读取数据。

但是有些场景下,管道当做参数传递,且希望仅仅是单向使用(只读,只写),此时可以使用单向管道。

语法

var ch1 chan int //普通管道
var ch2 chan <- string //只能写入string类型的数据
var ch3 <-chan int  //只能读取int类型的数据

提示

chan <-   表示数据写入channel
<-chan      表示读取channel中的数据

示例

4.8.1. 生产消费者模型

package main

import (
    "fmt"
)

func Producer(baozi chan<- int) {
    for i := 0; i < 10; i++ {
        baozi <- i //把包子放到管道
        fmt.Printf("三全食品,生产了有毒包子%d号\n", i)
    }
    //每天就只卖10个包子,卖完为止
    close(baozi)
}

func Consumer(baozi <-chan int) {
    for baozi := range baozi {
        fmt.Printf("消费者吃了包子%d号.....\n", baozi)
    }
}

func main() {
    ch := make(chan int, 10)
    //生产者
    go Producer(ch)
    //消费者
    Consumer(ch)
}

执行结果图

案例2

package main

import "fmt"

//只能写入数据
func sendData(sendch chan<- int) {
    sendch <- 10
    //只写模式下,不能读取操作
    //invalid operation: <-sendch (receive from send-only type chan<- int)
    //<-sendch
}

//只能读取数据
func readData(sendch <-chan int) {
    //sendch <- 10
    data := <-sendch
    fmt.Println(data)
}

func main() {
    chnl := make(chan int)
    go sendData(chnl) //开启协程,写入数据到管道
    readData(chnl)
}

5. 8.4 Go select

Go语言引入了select关键字,用于处理异步IO问题,语义和switch特别相似。语法由select开始,每个条件由case语句来描述。每个case语句必须是IO操作

select {
    case <-chan1:
    //如果读取到channel的数据,就执行这里
    case chan2<-1:
    //如果成功向chan2写入数据,就执行这里
    default:
    //上述都失败了,进入这里
}

案例

使用select可以解决从管道取数据的阻塞问题

package main

import (
    "fmt"
    "time"
)

func main() {
    //定义一个管道
    intChan := make(chan int, 10)

    //循环写入数据
    for i := 0; i < 10; i++ {
        intChan <- i
    }
    //定义管道,可以写入string
    strChan := make(chan string, 5)
    for i := 0; i < 5; i++ {
        //格式化后写入string数据
        strChan <- "hello" + fmt.Sprintf("%d", i)
    }
    //传统的for循环遍历,必须close关闭channel,否则造成死锁
    //但是到底关闭哪个channel,并不那么容易选择
    //加上for无线循环,匹配所有的case,直到结束return
    for {
        select {
        //如果有数据被读取到,进入这个分支,并且intChan没关闭的话,也会自动匹配下一个case
        case v := <-intChan:
            fmt.Printf("从intChan读取到数据%d\n", v)
            time.Sleep(time.Second)
        case v2 := <-strChan:
            fmt.Printf("从strChan中读取到数据%s\n", v2)
            time.Sleep(time.Second)
        default:
            fmt.Printf("什么也没读到,再见\n")
            time.Sleep(time.Second)
            return
        }
    }

}

5.1.1. waitGroup等待组

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    /*
        同步等待组:WaitGourp,执行了wait的goroutine,要等待同步等待组中的其他的goroutine执行完毕。。
            内置的计数器:counter:0
            Add(),设置counter的值
            Done(),将counter减一,同Add(-1)

            以上两个方法可以设置counter的值,注意不能为负数,否则会引发恐慌。

            Wait(),哪个goroutine执行了,那么就会被阻塞,直到counter为0。解除阻塞
    */
    var wg sync.WaitGroup
    //fmt.Printf("%T\n",wg)
    //fmt.Println(wg)
    wg.Add(2)

    go printNum1(&wg)
    go printNum2(&wg)

    wg.Wait() //main,进入阻塞状态,底层计数器为0,接触阻塞。。
    //time.Sleep(1*time.Second)
    fmt.Println("main。。接触阻塞。。结束了。。。")
}

func printNum1(wg *sync.WaitGroup) {
    rand.Seed(time.Now().UnixNano())
    for i := 1; i <= 100; i++ {
        fmt.Println("子goroutine1,i:", i)
        time.Sleep(time.Duration(rand.Intn(1000))) //
    }
    wg.Done() //计数器减一
}

func printNum2(wg *sync.WaitGroup) {
    for j := 1; j <= 100; j++ {
        fmt.Println("\t子goroutine2,j:", j)
        time.Sleep(time.Duration(rand.Intn(1000)))
    }
    wg.Done()
}

results matching ""

    No results matching ""