抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

工作来了!我们现在有一些工作要做了。

1
2
3
package work

var workload []data.Payload // len(workload) == setSize
1
2
3
package data

type Payload *string

我们要从 workload 中一个一个取出工作并处理,因此我们需要进行循环。

我们很快实现了一个简单的循环:

1
2
3
4
5
6
7
8
9
func Loop(workload []data.Payload) {

var count = 0

for _, v := range workload {
work.Work(v)
count++
}
}

但我想更快完成工作。是否有加速这个循环的办法?从感性层面想,如果能并行工作,那么应该可以提升效率。

并发加速

我们立刻将原有的逻辑代码包在了匿名函数里,并启动 Goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
func GoloopSimple(workload []data.Payload) {

var count = 0

for _, v := range workload {

go func() {
work.Work(v)
count++
}()
}
}

这是最直接的写法。但这里包含了多个问题。

  • 启动的 Goroutine 可能尚未结束,但主 Goroutine 已经结束了。因此我们需要 sync.Waitgroup
  • 循环变量 v 被匿名函数捕获了。如果你使用 go vet,你会收到警告 range variable captured by func literal
  • 多个 Goroutine 同时操作 count 变量会导致数据竞态。

问题1很好理解,且在 sync 部分提及过,不作更多说明。

问题2关于闭包捕获和 range 的。在 Go 中使用闭包时,闭包会捕获变量地址,而非值(同时变量逃逸)。而 range 是问题的关键,对于一次循环,v 是一个固定地址的循环变量,每次把切片中的值赋给 v。因此,闭包捕获的是同一个地址的 v,最后很可能每一个 Goroutine 都拿到的是切片最后的 v

因此,先不考虑数据计数,我们将原本的代码修改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func GoloopImprove(workload []data.Payload) {
var wg sync.WaitGroup

for _, v := range workload {

wg.Add(1)

go func(v data.Payload) {
defer wg.Done()

work.Work(v)
}(v)
}

wg.Wait()
}

这样就可以合理的等待结束,并且每个 Goroutine 处理一个结果了。

接下来,我们考虑统计 Work 被执行的次数问题,Goroutine 该如何合理的传递消息?这个问题的答案是显然的,channel。但怎么设计通道比较合适呢?这里直接给出方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func Goloop(workload []data.Payload) {
var wg sync.WaitGroup
var countCh = make(chan struct{}, len(workload))
var count = 0

for _, v := range workload {

wg.Add(1)

go func(v data.Payload) {
defer wg.Done()

work.Work(v)
countCh <- struct{}{}
}(v)
}

go func() {
wg.Wait()
close(countCh)
}()

for range countCh {
count++
}
}

我们将会启动一个协程负责等待 Waitgroup,在此之后关闭计数通道并退出。主循环协程在通道关闭前都会持续进行计数工作,也保证了该协程处于等待 Waitgroup 的状态。

并发加速?

现在我们已经把并行版循环写好了。然而令人遗憾的是,有些时候这种操作并不能带来明显的性能提升。
我尝试举出一些例子,但我觉得他们并不一定准确:

  • Work() 需要竞争某种资源,Goroutine 即使已创建也必须等待其他 Goroutine 结束才能继续执行,从而退化为串行。
  • Work() 的瓶颈在于一些 IO 操作。使用协程反而增加了资源分配和调度开销(这是否和上面的情况相同?) 这是一个错误的想法,会在后面的章节作出解释。
  • setSize 太小,虽然快了,但优势不明显。

我们可以用 Go 自带的 Benchmark 去针对一些模拟情景做一些测试。

测试机器配置如下,下文不再重复。

goos: linux
goarch: amd64
cpu: 12th Gen Intel(R) Core(TM) i7-1260P

简单工作

我们先来看一下真正的加速。假设我们的工作内容如下:

1
2
3
func Work(w data.Payload) {
time.Sleep(time.Duration(5) * time.Millisecond)
}

单纯沉睡了 5 毫秒。与此同类的操作可能涉及数学计算等。 实际上,此处协程沉睡是由 Go 调度器编排的。

1
2
3
(setSize = 1000)
BenchmarkLoop-16 1 5469752291 ns/op 112 B/op 4 allocs/op
BenchmarkGoloop-16 195 6110784 ns/op 128675 B/op 3005 allocs/op

此时性能的提升是巨大的。可以看到 Goloop 的操作速度是 Loop 的千分之一。

文件追加操作

我们模拟一个文件追加行为。

1
2
3
4
5
func Work(v data.Payload) {
file, _ := os.OpenFile(`./file.txt`, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
defer file.Close()
file.WriteString(*v + "\n")
}

测试结果显示出,Goloop 并没有比 Loop 快很多。

1
2
3
(setSize = 100)
BenchmarkLoop-16 4148 286834 ns/op 12000 B/op 300 allocs/op
BenchmarkGoloop-16 4130 273167 ns/op 16977 B/op 503 allocs/op

我们是触及了 Goroutine 的瓶颈?但我们尚未考虑到的是,我们单次可能启动了太多的协程,从而加大了竞争和调度(在这个例子中,100个协程争夺文件IO)。我们需要考虑一下是否应当限制协程的数量。

限制并发量

有一个伟大的 Gopher 曾说过,把任务给协程,而不是为任务启协程(1)。

因此我们可以考虑在任务开始时,就使用几个 Goroutine 监视着工作 Channel,主协程的任务是把这些工作派发给现有的 Goroutines。而当任务完成后,我们关闭工作 Channel,使循环监听着的协程退出。

我们的函数整体会变得更复杂一些:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func GoloopWithPool(workload []data.Payload) {

var workerNum = 3
jobs := make(chan data.Payload, len(workload))

var wg sync.WaitGroup
var countCh = make(chan struct{}, len(workload))
var count = 0

// worker is a working goroutine
// jobs is for recerving Workload
// wg is for declaring job done
// done is for counting (countCh)
var worker = func(jobs <-chan data.Payload, wg *sync.WaitGroup, done chan struct{}) {
defer wg.Done()

for v := range jobs {
work.Work(v)
done <- struct{}{}
}
}

for i := 0; i < workerNum; i++ {
wg.Add(1)
go worker(jobs, &wg, countCh)
}

for _, v := range workload {
jobs <- v
}
close(jobs)

go func() {
wg.Wait()
close(countCh)
}()

for range countCh {
count++
}
}

我们可以看一下这个方法在上述文件 IO 测试中的表现力:

1
2
3
4
(setSize = 10000, workerNum = 3)
BenchmarkLoop-16 40 28808030 ns/op 1200091 B/op 30000 allocs/op
BenchmarkGoloop-16 56 19230372 ns/op 1680611 B/op 50004 allocs/op
BenchmarkGoloopWithPool-16 68 15914682 ns/op 1282808 B/op 30009 allocs/op

真是快得飞起!我们有工作池的 GoloopWithPool 比单纯的循环快了两倍,同时在比 Goloop 快一点的情况下保持着良好的内存效率。

这提示了我们雇佣一定数量的员工比需要工作的时候疯狂雇佣员工要有效得多(?)。

除非你的工作只是在工位上睡觉(简单工作,使用 GoloopWithPool):

1
2
3
(setSize = 1000)
BenchmarkGoloop-16 195 6110784 ns/op 128675 B/op 3005 allocs/op
BenchmarkGoloopWithPool-16 1 1823969911 ns/op 10952 B/op 25 allocs/op

(1) 注:没人这么说过。

并发减速

什么情况下会更慢?我们来看看。

锁操作

查看一下如果工作涉及到极端互斥锁的情况(真是太糟了):

1
2
3
4
5
func Work(v data.Payload) {
mu.Lock()
defer mu.Unlock()
time.Sleep(time.Duration(5) * time.Millisecond)
}

同样是沉睡,但需要获取锁。与此同类的操作可能设计任何操作同内存区域的行为。

1
2
3
4
(setSize = 1000)
BenchmarkLoop-16 1 5405933355 ns/op 112 B/op 4 allocs/op
BenchmarkGoloop-16 1 5443755010 ns/op 695616 B/op 5019 allocs/op
BenchmarkGoloopWithPool-16 1 5385224617 ns/op 11504 B/op 28 allocs/op

此时,性能降低了一些,并且内存的开销很大。非常不值得。GoloopWithPool 的速度好像快了一点点,但也没什么用。

协程滥用

这个例子来自 go-100-mistakes。具体的内容,可以查看链接。这里引用出其给出的代码的简写形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func parallelMergesort(s []int) {

// guarding ...

middle := len(s) / 2

var wg sync.WaitGroup
wg.Add(2)

go func() {
defer wg.Done()
parallelMergesort(s[:middle])
}()

go func() {
defer wg.Done()
parallelMergesort(s[middle:])
}()

wg.Wait()
merge(s, middle)
}

//...

可以看到,这个归并排序递归地创建了很多 Goroutines,最终单个协程执行的任务可能仅仅是对两个数字进行排序。在创建协程和进行栈调度的工作上,许多的时间被浪费了。

原文中给出了一种优化方法,同样是基于我们“更正确的循环”思想的,即限制 Goroutine 的数量,在一定深度后退化为普通排序。这样仍然可以做到比普通归并排序更快。

并发减速?

如果工作涉及到发送 HTTP 请求呢?这其实也是我进行这一整个测试的原因。在业务中遇到了需要批量发送 HTTP 请求的行为,但使用 Goroutine 之后险些踩坑,并且也没有见到明显的性能提升。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func Work(v data.Payload) {

body := bytes.NewReader([]byte(*v))

req, err := http.NewRequest(http.MethodGet, `http://localhost:7777/abc`, body)
if err != nil {
panic(err)
}

client := &http.Client{Timeout: time.Second * 3}

resp, err := client.Do(req)
if err != nil {
panic(err)
}

resp.Body.Close()
client.CloseIdleConnections()
}

如果你对网络请求比较熟悉,应该能指出这份 Work 的一个错误。对于每个 Work,http.Client 被新建了,而非复用。但我们接下来会看到这并非一个必须解决的问题。

由于 Benchmarking 会快速多次调用 Loop, OS 的四元组会在非常快的时间内被耗尽。我们会获得一个错误 connect: cannot assign requested address,因为没有新的端口可以被绑定了。不过,将测试次数降低,我们已经足够获取到结果了。

1
2
3
4
5
> go test -bench=. -benchtime=5x

BenchmarkLoop-16 5 13372071 ns/op
BenchmarkGoloop-16 5 3196918 ns/op
BenchmarkGoloopWithPool-16 5 7558866 ns/op

在 Benchmark 测试中,显然 Goloop 是最快的。难道网络 IO 和 文件 IO 是不同的吗(没错)?

协程调度和IO阻塞

最终,我们可以把以上提及的所有问题分为两种受限类型:

  1. 受限于 CPU 速度。例如,跑一个归并排序算法。
  2. 受限于 IO 速度。例如,网络服务器。

对于 <1>,提升效率的方式在于多核并行。如果你可以调用CPU的所有核心并行地去进行计算,那么就可以更快速的完成任务。但注意,不应当让调度器开销大于计算开销,否则得不偿失。

对于 <2>,提升效率的方式在于Go调度器行为。对于网络IO,Go调度器实现了自己的 epoll,阻塞发生在协程级。当协程由于网络IO被阻塞时,Go调度器可以让出该协程并执行其他协程。也正因为如此,Go才天然适合作为HTTP服务器。因为其可以以最小的代价为每一个请求都开启一个 Goroutine。

最终,我在业务中观察到的 Go 作为 HTTP Client 没有提升效率的原因,目前分析看来是下游的并发量受限,而 Go 客户端建立了过于多的连接。无论我作为 Client 如何调度,都无法再进一步提升效率。同时,大量的无超时 Goroutie 可能会导致 Go 客户端产生 OOM 情况。因此最后选择了将请求任务提交至一个协程池来完成工作。未来,可以考虑完全更换为消息队列,能够更符合生产者-消费者模式。

评论