谭浩的博客

Simple is beauty.

golang 并发基础

Share by communication

并发编程在很多时候因为需要正确地访问共享变量而变得非常困难。go 提供了一种将共享值在 channel 上传递的方法,任何时候都只有一个 goroutine 可以访问该值,从而避免了数据竞争。这也正是 go 主张的:

不要通过共享内存来实现通信,而是通过通信来实现共享内存。

通过使用 channel ,可以更加容易编写清晰正确的并发程序。

Goroutines

goroutine 有一个简单的模型,它和其他 goroutine 在同一个地址空间执行,因此它时轻量级的,不需要花费太多时间分配栈空间,其一开始是一个小的栈空间,按需分配堆内存来增长。

在一个函数之间加上前缀关键词 go 就可以让一个函数调用运行在一个新的 goroutine 上。当调用完成时,goroutine 则安静地退出。

lago l
1
go list.Sort() // 并发的运行排序函数

只是使用 goroutine 则不够实用,我们需要信号沟通机制,因此我们需要 channel 。

Channels

和 map 结构一样,channel 也使用关键词 make 进行分配,如果一个可选整数参数被使用,则设置一个具有缓存大小的 channel,其默认值为0。

1
2
3
cj := make(chan int)
cj := make(chan int, 0)
cs := make(chan *os.File, 100) // 文件指针的缓存channel

一个 channel 可以被用来等待一个 goroutine 执行结束。

1
2
3
4
5
6
7
c := make(chan int)
go func() {
list.Sort()
c <- 1
}
doSomething()
<- c //登台排序结束

接收者将会一直阻塞直到有数据可以接收,如果 channel 没有缓存,则发送者将会一直阻塞直到数据被接收。

一个具有缓存的 channel 可以用作信号量,限制吞吐量。下面这个例子中,收到的请求需要传递给handler 进行处理。这个 channel 的缓存容量限制了同时处理的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var sem = make(chan int, MaxOutstanding)

func handler(r *Request) {
sem <- 1
process(r) // 可能需要花费很长时间
<- sem
}

func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // 迅速处理
}
}

上面的处理方法存在一定的问题,如果进来的请求很快,还是会消耗无限的资源。我们可以改变我们创建 goroutine 的方式。

1
2
3
4
5
6
7
8
9
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req)
<-sem
}()
}
}

上述方法限制了 goroutine 创建的数量,但是由于 req 变量在循环中重复使用,从而导致了 req 变量在 goroutine 之间共享。为了解决该问题,我们可以将 req 的值作为变量传递给闭包。

1
2
3
4
go func(req *Request) {
process(req)
<-sem
}(req)

除了上述方法,我们还可以在循环内部创建一个新的同名变量。req := req

除了上面的方法之外,另一种方案则是创建固定数量的 goroutine ,goroutine 的数量限制了并行调用的数量。

1
2
3
4
5
6
7
8
9
10
11
12
func handle(queue chan *Request) {
for req := range queue {
process(req)
}
}

func Server(clientRequests chan *Request, quit chan bool) {
for i = 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit //等待被告知退出方法
}

Channels of channels

在 Go 语言中,channel 是第一公民。对于该属性的普遍应用便是实现一个安全并行复用。

在上一个例子中,我们没有定义 handle 处理的请求的类型。如果请求包含一个 channel 用于回复消息,每一个客户端都可以提供自己处理方式,则以下是 Request 的类型定义:

1
2
3
4
5
type Request struct {
args []int
f func([]int) int
resultChan chan int
}

客户端提供一个方法以及它的参数,还包含一个 channel 用于收到结果。

1
2
3
4
5
6
7
8
9
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}
request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
clientRequest <- request
fmt.Printf("answer: %d\n", <-request.resultChan)

在服务端,只需要改变处理方法。

1
2
3
4
5
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}

Parallelization

以上想法的另一个应用就是跨 cpu 多核的并行化计算。如果一个计算可以被分离为多个独立的计算,则它可以被并行化,对于每一个分块使用一个 channel 作为其完成的信号。

1
2
3
4
5
6
7
8
type Vector []float64

func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1
}
1
2
3
4
5
6
7
8
9
10
const numCPU = 4
func (v Vector) DoAll(u Vector) {
c := make(chan int, numCPU)
for i := 0; i < numCPU, i++ {
go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
}
for i := 0; i< numCPU; i++ {
<-c
}
}

我们可以通过 runtime.NumCPU获取机器的CPU核数。方法runtime.GOMAXPROCS可以报告一个 Go 程序可以同时运行的核数。它的默认值为runtime.NumCPU但是可以通过设置类似命名的 shell 变量和调用带有正数参数的该方法来设置。以0为参数则仅仅查询该值。

A leaky buffer

并发编程的工具甚至可以更容易地表达非并发的想法。下面是一个从 rpc 包中抽象而来的例子。客户端 goroutine 从一个资源处循环接收数据,为了避免分配和释放缓存,它用一个具有缓存的 channel 维持了一个自由列表。如果 channel 为空,一个新的缓存被分配,一旦消息缓存准备好了,它就被发送给服务端的 serverChan。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
for {
var b *Buffer
select {
case b = <-freeList:
default:
b = new(Buffer)
}
load(b)
serverChan <- b
}
}

服务端循环接收从客户端传过来的消息并处理它,返回 buffer 到列表中去。

1
2
3
4
5
6
7
8
9
10
func server() {
for {
b := <-serverChan
process(b)
select {
case freeList <- b:
default:
}
}
}

客户端尝试去从freeList 获取一个 buffer,如果没有可用,它则分配一个新的。在服务端,只要自由列表不满,服务端则将收到的缓存放回空闲列表,如果自由列表已满,buffer 会被丢到垃圾收集器以便回收。上述代码依靠缓存channel和垃圾收集器,只用几行便实现了一个缓存泄露列表。

参考资料

  1. https://golang.org/doc/effective_go.html#concurrency