
本文共 3562 字,大约阅读时间需要 11 分钟。
协程与通讯
协程
概念
在 Go 中,应用程序并发处理的部分被称作协程
(goroutines),它可以进行更有效的并发运算。协程和操作系统线程之间并无一对一的关系,协程由Go的协程调度器进行调度,调度器会将协程调度到操作系统线程上运行。
协程工作在相同的地址空间中,所以共享内存的方式一定是同步的,这个可以使用 sync 包来实现,不过我们很不鼓励这样做:Go 使用 channels 来同步协程。
协程是轻量的,比线程更轻,使用 4K 的栈内存就可以在堆中创建它们,协程的栈会根据需要进行伸缩,不出现栈溢出。
使用GOMAXPROCS
GOMAXPROCS
用于设置需要参与运算的CPU核数,例如一个8核的系统中,将GOMAXPROCS
设置为8后,Go调度器会将协程调度到8个核心上执行。
设置GOMAXPROCS
的方法:
runtime.GOMAXPROCS(numCores)
创建协程
创建协程的方法非常简单:
go somefunc()
通道
Go提供了通道(channel)用于协程之间通讯,通过通道进行通信的方式保证了同步性。通道就像一个可以用于发送类型化数据的管道:在任何给定时间,一个数据被设计为只有一个协程可以对其访问,所以不会发生数据竞争。 数据的所有权(可以读写数据的能力)也因此被传递。
通道定义(一个通道只能传输一种类型的数据):
var chName chan datatype
通道需要使用make
创建,未初始化的通道的值是nil,范例:
var ch1 chan string = make(chan string)ch2 := make(chan string)ch3 := make(chan int)
通讯操作符<-
此操作符体现了通道的方向:
- 流向通道(发送):
ch <- int1
表示:用通道 ch 发送变量 int1; - 从通道流出(接收),有几种方式:
int2 = <- ch
:变量 int2 从通道 ch接收数据;<- ch
:获取通道中的当前值,例如使用if判断:if <- ch != 1000
通道缓冲
默认情况下,通信是同步且无缓冲的:在有接受者接收数据之前,发送是阻塞的;同样对于接收者来说,再收到数据之前,接收是阻塞的。
我们可以给通道设置缓冲区,在缓冲器满了之前,发送是不会阻塞的,这样就变成了异步通讯。
方法:make通道时指定缓冲区大小:ch :=make(chan type, bufferSize)
- bufferSize == 0 -> synchronous, unbuffered (阻塞)
- bufferSize > 0 -> asynchronous, buffered(非阻塞)取决于value元素
通道的方向
通道本身是双向的,但是作为参数传递时,可以增加注解标识通道的方向:
var send_only chan<- int // channel can only send datavar recv_only <-chan int // channel can only receive data
实际应用:
var c = make(chan int) // 双向通道go source(c)go sink(c)func source(ch chan<- int){ // 这个函数限制通道为发送 for { ch <- 1 }}func sink(ch <-chan int) { // 限制通道为接收 for { <-ch }}
关闭通道
发送端可以关闭通道,接收端永远不需要关闭通道。
一般可以通过defer
来关闭通道:
ch := make(chan float64)defer close(ch)
通过下面的语句可以实现检测通道是否关闭:
if v, ok := <-ch; ok { // ok为true时表示通道未关闭 process(v)}
还有一个更简单的办法,使用 for-range 语句来读取通道是更好的办法,因为这会自动检测通道是否关闭:
for input := range ch { process(input)}
使用Select选择通道
select监听所有case指定的通道,当有数据到达时,就调用对应的case下的语句:
select { case u:= <- ch1: ... case v:= <- ch2: ... ... default: // no value ready to be received ...}
需要注意的是,select不支持fallthrough
,任何case中执行break
或者return
将会导致select函数结束。
select 做的就是:选择处理列出的多个通信情况中的一个。
- 如果都阻塞了,会等待直到其中一个可以处理
- 如果多个可以处理,随机选择一个
- 如果没有通道操作可以处理并且写了 default 语句,它就会执行:default 永远是可运行的(这就是准备好了,可以执行)。
在 select 中使用发送操作并且有 default 可以确保发送不被阻塞!如果没有 default,select 就会一直阻塞。
通道定时器
time.Ticker
结构体,这个对象以指定的时间间隔重复的向通道 C 发送时间值:
type Ticker struct { C <-chan Time // the channel on which the ticks are delivered. ...}
创建time.Ticker
的函数:
func NewTicker(dur) *Ticker // `time.Ticker`的时间间隔的单位是 ns
使用范例:
ticker := time.NewTicker(updateInterval)defer ticker.Stop()...select { case u:= <-ch1: ... case v:= <-ch2: ... case <-ticker.C: logState(status) // call some logging function logState default: // no value ready to be received ...}
select会定时收到ticker.C
通道的数据,相当于周期性唤醒。
time.After
: ch := make(chan error, 1)select { case resp := <-ch // use resp and reply case <-time.After(timeoutNs): // call timed out break}
我们也可以通过此定时器实现sleep功能:
chRate := time.Tick(1e9) // 1秒...<- chRate // Sleep 1秒
并行计算范例
计算两个矩阵的逆的乘积,我们可以同时运行2个矩阵的逆,然后再做矩阵的乘积,代码如下:
func InverseProduct(a Matrix, b Matrix) { a_inv_future := InverseFuture(a) // start as a goroutine b_inv_future := InverseFuture(b) // start as a goroutine a_inv := <-a_inv_future // 等待a的结果 b_inv := <-b_inv_future // 等待b的结果 return Product(a_inv, b_inv) // 计算a、b的乘积}
InverseFuture
函数是这里的重点,go
启动的是一个闭包函数,该函数将future
通道封闭进去了,所以上面的函数可以正常使用通道。
func InverseFuture(a Matrix) chan Matrix { future := make(chan Matrix) go func() { future <- Inverse(a) }() return future}
发表评论
最新留言
关于作者
