1. Go并发
2. 第八章 Go并发
Go语言区别于其他语言的一大特点就是出色的并发性能,最重要的一个特性那就是go关键字。
并发场景:
- UI小姐姐一边开着PS软件,一边微信疯狂的和产品经理打字交流,后台还听着网易云音乐。。
- 双11当天。。大伙疯狂的访问淘宝网站
- CPU从单核向多核发展,计算机程序不该是串行的,浪费资源
- 串行程序由于IO操作被阻塞,整个程序处于停滞状态,其他IO无关的任务无法执行
并发必要性:
- 充分利用CPU核心的优势,提高程序执行效率
实现并发的模型:
- 多进程,多进程是在操作系统层面并发的基本模式,进程间互不影响,但是开销最大,进程由内核管理。
- 多线程,属于系统层面的并发模式,也是用的最多的有效模式,大多数软件使用多线程,开销小于多进程。
- 基于回调的非阻塞/异步IO。此架构处于多线程模式的危机,高并发服务器下,多线程会消耗殆尽服务器的内存和CPU资源。而通过事件驱动的方式使用异步IO,尽可能少用线程,降低开销,Node.js就是如此实践,但是此模式编程复杂度较高。
- 协程,Coroutine是一种用户态线程,寄存于线程中,系统开销极小,可以有效提高线程任务并发性,使用方式简单,结构清晰,避免多线程的缺点。需要编程语言的支持,如不支持,需要用户自行实现调度器。
共享内存系统是比较常用的并发模式,线程之间通信采用共享内存的方式,程序员需要加锁等操作避免死锁、资源竞争等问题。
计算机科学家又研制出了消息传递系统,对线程间共享状态的各种操作被封装在线程之间传递的消息中。
Communicating Sequential Processes(顺序通信进程),在CSP系统中,所有的并发操作都是通过独立线程以异步的方式运行,这些线程必须通过再彼此之间发送消息,从而向另一个线程请求信息。
2.1. 进程和线程
进程是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位。
线程是进程的一个执行实例,是比进程更小的独立运行的基本单位。
进程可以创建或销毁多个线程,同一个进程中的多个线程可以并发执行(如百度云盘进程中的,多个下载任务)。
一个程序至少一个进程,一个进程至少一个线程。

“并发”指的是程序的结构,“并行”指的是程序运行时的状态
2.1.1. 并行(parallelism)
这个概念很好理解。所谓并行,就是同时执行的意思,无需过度解读。判断程序是否处于并行的状态,就看同一时刻是否有超过一个“工作单位”在运行就好了。
所以,单线程永远无法达到并行状态。
要达到并行状态,最简单的就是利用多线程和多进程。
但是 Python 的多线程由于存在著名的 GIL,无法让两个线程真正“同时运行”,所以实际上是无法到达并行状态的。
2.1.2. 并发(concurrency)
要理解“并发”这个概念,必须得清楚,并发指的是程序的“结构”。
当我们说这个程序是并发的,实际上,这句话应当表述成“这个程序采用了支持并发的设计”。好,既然并发指的是人为设计的结构,那么怎样的程序结构才叫做支持并发的设计?
正确的并发设计的标准是:使多个操作可以在重叠的时间段内进行(two tasks can start, run, and complete in overlapping time periods)。
2.1.3. 并发和并行
1)多线程程序在单核CPU上运行,就是并发
2)多线程程序在多核CPU上运行,就是并行


为何人们常说提升并发,而不是提升并行?
因为并发是通过时间片轮转进行进程调度,是通过技术手段提升并发。
而并行是通过硬件提升效率,有钱人可以买一个128核的服务器。
2.2. 协程是什么
执行单位是个抽象的概念,操作系统层面有多个概念与之对应,比如操作系统掌管的进程(process)、进程内的线程(thread)以及进程内的协程(coroutine)。
协程在于轻量级,轻松创建百万个而不会导致系统资源衰竭。
多数语言语法层面不直接支持协程,而是通过库的方式支持,然而库的功能也仅仅是线程的创建、销毁与切换,而无法达到协程调用同一个IO操作,如网络通信,文件读写等。
非抢占式多任务处理,由协程主动交出控制权。消耗的资源更少
编译器/解释器/虚拟机层面的多任务,实现的协程调度。
多个协程可能在一个或多个线程上运行。
2.3. 其他语言的协程
c++ Boost.Coroutine
Java不支持
Python 用yield关键字实现协程 3.5之后async def对协程支持,异步函数的定义
golang 不需要定义时区分是否是异步函数 go func(){}
2.4. goroutine

Golang在语言层面支持协程,名为goroutine,Go语言标准库提供所有系统调用操作,都会让出CPU给其他goroutine,使得协程切换管理不依赖于系统的线程和进程,也不依赖于CPU核数。
一个Go进程,可以启动多个goroutine协程。
main函数也是goroutine。
一个普通的机器运行几十个线程负载已经很高了,然而可以轻松创建百万个goroutine。
go标准库的net包,写出的go web server性能直接媲美Nginx。

2.5. goroutine可能的切换点
I/O
select
channel
函数调用
runtime.Gosched()
等待锁
2.6. goroutine入门
第一个goroutine,开启协程,执行函数hello()
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("hello goroutine")
}
func main() {
go hello()
fmt.Println("main thread terminate")
time.Sleep(time.Second)
}
批量开启协程
package main
import (
"fmt"
"time"
)
func hello(i int) {
fmt.Println("hello goroutine", i)
}
func main() {
//循环开启10个协程,分别执行hello()函数
for i := 0; i < 10; i++ {
go hello(i)
}
time.Sleep(time.Second)
}
编写代码,完成功能
1.在go主进程中,开启goroutine,该协程每秒输出一个你好,我是goroutine
2.在主进程中也每秒输出一个我很好,我是主进程,输出10次后退出程序
3.要求主进程和goroutine同时执行
package main
import (
"fmt"
"strconv"
"time"
)
//定义一个协程任务函数
func test() {
for i := 0; i <= 10; i++ {
fmt.Println("你好,我是goroutine" + strconv.Itoa(i))
time.Sleep(time.Second) //睡眠1秒
}
}
func main() {
go test()
for i := 0; i <= 10; i++ {
fmt.Println("我很好,我是主进程" + strconv.Itoa(i))
time.Sleep(time.Second) //睡眠1秒
}
}

提示:检测主进程结束,协程也立即结束,或是检测主进程未结束,协程提前退出,可以修改for循环的次数!
2.7. runtime包控制goroutine
runtime.Gosched()让出时间片,如同接力赛跑,让出了接力棒。
gosched如同yield作用,暂停当前的goroutine,放回队列等待下次执行。
package main
import (
"fmt"
"runtime"
)
func main() {
go func() {
for i := 0; i < 5; i++ {
fmt.Println("你愁啥")
}
}()
for i := 0; i < 2; i++ {
//让出时间片,让其他协程执行
runtime.Gosched()
fmt.Println("尼古拉斯赵四")
}
}
runtime.Goexit()终止当前协程
package main
import (
"fmt"
"runtime"
"time"
)
func test() {
defer fmt.Println("ccc")
//return //函数终止,打印a c b 结束
runtime.Goexit() //退出所在协程, 打印 a c 退出主进程
fmt.Println("ddd")
}
func main() {
go func() {
fmt.Println("aaa")
test()
fmt.Println("bbb")
}()
//
time.Sleep(time.Second * 3)
}
Go与多核的优势,设置cpu运行数目
go version < 1.8 需要手动设置多核
go version > 1.8 默认用多核,无须设置
package main
import (
"fmt"
"runtime"
)
func main() {
cpuNum := runtime.NumCPU()
//可以在这演示下单核时,时间片无切换,仅仅打印数字0的实验 runtime.GOMAXPROCS(1)
runtime.GOMAXPROCS(cpuNum)
for i := 0; i < 500; i++ {
go fmt.Print(1)
fmt.Print(0)
}
}
2.8. goroutine使用recover
package main
import (
"fmt"
"time"
)
func test() {
defer func() {
if err := recover(); err != nil {
fmt.Println("出错了:", err)
}
}()
var m map[string]int
//map必须初始化才能使用
//m=make(map[string]int,10)
m["stu"] = 111
}
func calc() {
for {
fmt.Println("我是calc函数")
time.Sleep(time.Second)
}
}
func main() {
go test() //协程执行函数,这个函数有错的话,程序会panic退出,做好异常捕捉
for i := 0; i < 2; i++ {
go calc()
}
time.Sleep(time.Second * 10)
}
3. 8.2 Go 锁
案例(坑):多个goroutine操作同一个map。
go提供了一种叫map的数据结构,可以翻译成映射,对应于其他语言的字典、哈希表。借助map,可以定义一个键和值,然后可以从map中获取、设置和删除这个值,尤其适合数据查找的场景。
但是map的使用有一定的限制,如果是在单个协程中读写map,那么不会存在什么问题,如果是多个协程并发访问一个map,有可能会导致程序退出,并打印下面错误信息。
fatal error: concurrent map writes
错误案例代码
package main
var myMap = make(map[int]int, 10)
//要求计算50的阶乘结果
//阶乘就是1*2*3*4...50 =?
func test(n int) {
//定义初始值1
res := 1
//每次循环进行阶乘
for i := 0; i <= n; i++ {
res *= i
}
//最终计算结果,写入map
//由于多个协程同时操作map,引发资源竞争报错
myMap[n] = res
}
func main() {
//开启50个协程
for i := 0; i < 10; i++ {
go test(10)
}
}
并发访问map是不安全的操作,在协程中访问map,必须提供某种同步资源机制,使用sync.Mutex互斥锁同步解决协程的竞争问题。
package main
import (
"fmt"
"sync"
"time"
)
var lock sync.Mutex
func Printer(str string) {
//我现在开始使用打印机了,其他人都等我完事了再来
lock.Lock()
for _, data := range str {
fmt.Printf("%c", data)
time.Sleep(time.Second)
}
//我完事了,你们上吧
lock.Unlock()
fmt.Printf("\n")
}
func Alex() {
Printer("hello")
}
func Wupeiqi() {
Printer("oldboy")
}
func main() {
//coffe(10)//单线程执行函数
go Alex()
go Wupeiqi()
//主线程等待协程结束后 再退出
time.Sleep(time.Second * 10)
}
结论:
1.可以使用加锁的方式解决goroutine的通讯。
2.主线程等待所有goroutine的时间难以确定,设置固定的等待时间肯定不合理。
3.对全局变量加锁同步来通讯,也不利于多个协程对变量的读写。
4.要让主线程等待所有goroutine退出后在退出,如何知道所有goroutine都退出了呢?
5.因此,channel应运而生!!
4. 8.3 Go channel
在Go语言中,关键字go的引入使得Go语言并发编程更加简单而优雅,但是并发编程的复杂性,以及时刻关注并发编程容易出现的问题需要时刻警惕。
并发编程的难度在于协调,然而协调就必须要交流,那么并发单元之间的通信是最大的问题。
之前说了在程序中两种并发通信模型:共享数据和消息。
共享数据是指多个并发单元分别保持对同一个数据的引用,实现对数据的共享。
共享数据可能是内存数据块、磁盘文件、网络数据等。
Go语言既然一并发为核心,它提供了另一种通信模型,以消息机制而非共享内存作为通信方式。
消息机制:每个并发单元的输入和输入只有一种。
4.1. channel介绍
Go语言提供的消息通信机制被称为channel。
channel是go语言在语言级别提供的goroutine间的通信方式。
channel是有类型的,一种channel只能传递一种类型值,这个类型在声明channel时定义。
1.channel本质是一个数据结构(队列)
2.channel数据遵循FIFO,first in first out。
3.channel本身是线程安全的,多个goroutine访问时不需要加锁。
4.一个string类型的channel只能放入string类型数据。

4.2. 定义channel
基本语法:
var 管道名 chan 管道类型
如:
var intChan chan int
var strChan chan string
var stuChan chan Student //结构体类型channel
var mapChan chan map[string]string //map类型channel
var boolChan chan bool //布尔类型channel
注意:
channel是引用类型
channel必须make初始化后方可使用
4.3. channel使用
package main
import "fmt"
func main() {
//初始化一个可以存放3个int类型数据的管道
//创建方式一
//var intChan chan int
//intChan = make(chan int, 3)
//创建方式二,简短声明创建,如果不make初始化,默认chan是nil
intChan := make(chan int, 3)
//查看下channel里面有什么
fmt.Printf("intChan值:%v intChan地址:%p\n", intChan, &intChan)
fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))
//向管道写入数据,只能写入int类型
intChan <- 10
//放入第二个数据
num := 999
intChan <- num
//放入第三个数据
intChan <- 666
//注意我们初始化chan的容量限制在3,数据不能超过chan的容量
fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))
//从管道中读取数据
//定义一个变量接收chan的数据,注意先进先出的规律
res := <-intChan //
res2 := <-intChan
fmt.Println(res, res2)
fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))
//数据遵循,有多少读多少,读空的channel也报错
res3 := <-intChan
fmt.Println(res3)
fmt.Printf("intChan长度:%v 容量%v\n", len(intChan), cap(intChan))
}
4.4. channel注意事项
1.channel只能存放声明的数据类型
2.channel数据放满了,不得再放入,否则panic报错,死锁
3.取出一个channel数据,少了一个坑,那可以再放入一个数据
4.数据取空后,不得在取,否则panic报错
4.5. channel存放数据类型
1.map类型channel
package main
import "fmt"
func main() {
mapChan := make(chan map[string]string, 10) //初始化创建map类型的chan,可以放入map数据
myMap := make(map[string]string, 10) //初始化创建map
myMap["姓名"] = "黑旋风"
myMap["年纪"] = "28"
mymap2 := make(map[string]string, 10) //初始化创建第二个map
mymap2["姓名"] = "小妖精"
mapChan <- myMap
mapChan <- mymap2
fmt.Printf("值:%v 长度:%v 容量:%v\n", mapChan, len(mapChan), cap(mapChan))
}
2.结构体类型channel
package main
import "fmt"
type Student struct {
Name string
Age int
}
func main() {
//创建管道
structChan := make(chan Student,2)
//创建结构体对象
s1 := Student{"艾利克斯", 18}
s2 := Student{"银角大王吧", 19}
//数据放入管道
structChan <- s1
structChan <- s2
//取出管道数据
stu1 := <-structChan
stu2 := <-structChan
fmt.Println(stu1, stu2)
}
3.存放指针数据的管道
package main
import "fmt"
type Student struct {
Name string
Age int
}
func main() {
//创建管道
stuChan := make(chan *Student, 10)
//创建结构体对象
s1 := Student{"王二狗", 18}
s2 := Student{"王八犊子", 19}
//传入结构体对象到管道
stuChan <- &s1
stuChan <- &s2
//取出管道的值
stu1 := <-stuChan
stu2 := <-stuChan
fmt.Printf("%v %v\n", stu1, stu2)
}
4.创建可以接收任意数据类型的channel,注意,空接口可以存放任意数据类型。
package main
import "fmt"
type Student struct {
Name string
Age int
}
func main() {
allChan := make(chan interface{}, 10)
allChan <- 123
allChan <- "我是字符串"
res1 := <-allChan
res2 := <-allChan
fmt.Printf("res1值:%v 类型:%T\n", res1, res1)
fmt.Printf("res2值:%v 类型:%T\n", res2, res2)
//定义结构体
s1 := Student{"王麻子", 11}
allChan <- s1
//取出结构体,查看类型
newStu := <-allChan
fmt.Printf("%T %v\n", newStu, newStu)
//newStu此时无法直接使用,必须类型断言后使用
r := newStu.(Student) //断言它是结构体类型
fmt.Printf("r值:%v 类型:%T\n", r, r)
}
4.6. channel缓冲区
初始化channel的方式是make函数。
ch :=make(chan int) //无缓冲区的管道
ch2:=make(chan int,0)//无缓冲区的管道
ch3:=make(chan int,10)//有缓冲区的管道,且容量是10
如图所示:
两个协程,传递数据,无缓存的管道,就是不可以保存数据的通道。

无缓冲的管道:在管道接受数据前,不保存任何值。
无缓存的管道:这种类型的管道要求两个goroutine同时接、收值。
如果goroutine没同时准备好,会导致有一方阻塞状态。
这种管道进行接、收值的行为,本身是同步状态。
无缓冲管道案例
package main
import (
"fmt"
"time"
)
//运动员1
func write(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
fmt.Printf("棒子%d号来了\n", i)
//每秒塞一个棒子
time.Sleep(time.Second)
fmt.Println("-------------------")
}
}
//运动员2
func read(ch chan int) {
for v := range ch {
fmt.Printf("拿到了棒子%d号\n", v)
}
}
func main() {
intChan := make(chan int, 0)
go write(intChan)
go read(intChan)
time.Sleep(10 * time.Second)
}
有缓冲的管道
有缓冲的管道:在被接收前能存储一个或多个值的通道。
这种管道不要求goroutine必须同时发送和接收。
只有管道中没有要接收的值,接收动作才会阻塞。
只有管道塞满了数据,发送动作才会阻塞。
示意图

package main
import (
"fmt"
"time"
)
//运动员1
func write(ch chan int) {
for i := 0; i < 10; i++ {
ch <- i
fmt.Printf("棒子%d号来了\n", i)
//每秒塞一个棒子
fmt.Println("-------------------")
}
}
//运动员2
func read(ch chan int) {
for v := range ch {
fmt.Printf("拿到了棒子%d号\n", v)
}
}
func main() {
intChan := make(chan int, 5)
go write(intChan)
go read(intChan)
time.Sleep(2 * time.Second)
}
4.7. 关闭channel
使用内置函数close可以关闭channel。
channel关闭后,就不能再向channel写入数据了,但是仍然可以从channel读取数据。
package main
import "fmt"
func main() {
intChan := make(chan int, 3)
intChan <- 123
intChan <- 456
close(intChan) //通道关闭,游客禁止入内,数据也无法写入
//intChan<-123 //此处已经不能加载 panic: send on closed channel
fmt.Println("管道已经关闭,在管道中的游客请尽快出来")
n1 := <-intChan
n2 := <-intChan
fmt.Println(n1, n2)
}
遍历channel的方式
package main
import "fmt"
func main() {
intChan := make(chan int, 100)
for i := 0; i < 100; i++ {
intChan <- i * i //循环写入一百个数据
}
//遍历方式1,取出管道值
//for i := 0; i < len(intChan); i++ {
// fmt.Println(i)
//}
//遍历方式2,for range,参数只有一个
//在遍历管道时,必须先关闭,否则死锁报错
close(intChan)
for v := range intChan {
fmt.Printf("类型:%T 值:%d\n", v, v)
}
}
4.8. 单向channel
默认情况下,channel是双向的,既可以写入数据,也可以读取数据。
但是有些场景下,管道当做参数传递,且希望仅仅是单向使用(只读,只写),此时可以使用单向管道。
语法
var ch1 chan int //普通管道
var ch2 chan <- string //只能写入string类型的数据
var ch3 <-chan int //只能读取int类型的数据
提示
chan <- 表示数据写入channel
<-chan 表示读取channel中的数据
示例
4.8.1. 生产消费者模型
package main
import (
"fmt"
)
func Producer(baozi chan<- int) {
for i := 0; i < 10; i++ {
baozi <- i //把包子放到管道
fmt.Printf("三全食品,生产了有毒包子%d号\n", i)
}
//每天就只卖10个包子,卖完为止
close(baozi)
}
func Consumer(baozi <-chan int) {
for baozi := range baozi {
fmt.Printf("消费者吃了包子%d号.....\n", baozi)
}
}
func main() {
ch := make(chan int, 10)
//生产者
go Producer(ch)
//消费者
Consumer(ch)
}
执行结果图

案例2
package main
import "fmt"
//只能写入数据
func sendData(sendch chan<- int) {
sendch <- 10
//只写模式下,不能读取操作
//invalid operation: <-sendch (receive from send-only type chan<- int)
//<-sendch
}
//只能读取数据
func readData(sendch <-chan int) {
//sendch <- 10
data := <-sendch
fmt.Println(data)
}
func main() {
chnl := make(chan int)
go sendData(chnl) //开启协程,写入数据到管道
readData(chnl)
}
5. 8.4 Go select
Go语言引入了select关键字,用于处理异步IO问题,语义和switch特别相似。语法由select开始,每个条件由case语句来描述。每个case语句必须是IO操作。
select {
case <-chan1:
//如果读取到channel的数据,就执行这里
case chan2<-1:
//如果成功向chan2写入数据,就执行这里
default:
//上述都失败了,进入这里
}
案例
使用select可以解决从管道取数据的阻塞问题
package main
import (
"fmt"
"time"
)
func main() {
//定义一个管道
intChan := make(chan int, 10)
//循环写入数据
for i := 0; i < 10; i++ {
intChan <- i
}
//定义管道,可以写入string
strChan := make(chan string, 5)
for i := 0; i < 5; i++ {
//格式化后写入string数据
strChan <- "hello" + fmt.Sprintf("%d", i)
}
//传统的for循环遍历,必须close关闭channel,否则造成死锁
//但是到底关闭哪个channel,并不那么容易选择
//加上for无线循环,匹配所有的case,直到结束return
for {
select {
//如果有数据被读取到,进入这个分支,并且intChan没关闭的话,也会自动匹配下一个case
case v := <-intChan:
fmt.Printf("从intChan读取到数据%d\n", v)
time.Sleep(time.Second)
case v2 := <-strChan:
fmt.Printf("从strChan中读取到数据%s\n", v2)
time.Sleep(time.Second)
default:
fmt.Printf("什么也没读到,再见\n")
time.Sleep(time.Second)
return
}
}
}
5.1.1. waitGroup等待组
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
/*
同步等待组:WaitGourp,执行了wait的goroutine,要等待同步等待组中的其他的goroutine执行完毕。。
内置的计数器:counter:0
Add(),设置counter的值
Done(),将counter减一,同Add(-1)
以上两个方法可以设置counter的值,注意不能为负数,否则会引发恐慌。
Wait(),哪个goroutine执行了,那么就会被阻塞,直到counter为0。解除阻塞
*/
var wg sync.WaitGroup
//fmt.Printf("%T\n",wg)
//fmt.Println(wg)
wg.Add(2)
go printNum1(&wg)
go printNum2(&wg)
wg.Wait() //main,进入阻塞状态,底层计数器为0,接触阻塞。。
//time.Sleep(1*time.Second)
fmt.Println("main。。接触阻塞。。结束了。。。")
}
func printNum1(wg *sync.WaitGroup) {
rand.Seed(time.Now().UnixNano())
for i := 1; i <= 100; i++ {
fmt.Println("子goroutine1,i:", i)
time.Sleep(time.Duration(rand.Intn(1000))) //
}
wg.Done() //计数器减一
}
func printNum2(wg *sync.WaitGroup) {
for j := 1; j <= 100; j++ {
fmt.Println("\t子goroutine2,j:", j)
time.Sleep(time.Duration(rand.Intn(1000)))
}
wg.Done()
}