golang的channel和条件变量在单生产单消费场景下的性能对比测试

本文相应的代码附在文章末尾处。

场景描述

  1. 1个生产者只与1个消费者相互绑定。
  2. 生产者按固定生产频率,每间隔<PRODUCE_INTERVAL_MS>毫秒生产一个任务。
  3. 生产者每次生产时都检查当前任务队列大小,如果已经超过<QUEUE_NUM_CANCEL_PRODUCE>,则丢弃本次生产的任务。(即消费能力低于生产能力时如何处理)
  4. 总共有<PAIR_NUM>对(pair)单生产者单消费者对,对与对之间没有逻辑关联。(增大模拟性能场景)

测试方法

分别使用golang的channel和条件变量作为生产消费的通信手段。
让测试程序执行<TEST_DURATION_SEC>时长后退出。
观察测试程序执行时cpu的使用情况,以及最后总共生产任务的数量和消费任务的数量。

使用条件变量时,先进先出任务队列底层使用golang的slice实现。
channel使用的是带缓冲的channel。缓冲大小为<QUEUE_NUM_CANCEL_PRODUCE>的2倍。

测试结果

测试时条件设定:1000个生产消费对,每个生产者的生产间隔为50毫秒,总测试时长为1分钟。

理论任务数量:1000 * 600 * 20 = 12000000

实际任务情况:

类型总生产任务数总消费任务数总丢弃任务数
channel11964475119644750
cond11960068119600680

分析:
由于程序中总测试时长和生产者的生产间隔都使用sleep实现,并且程序是执行固定时间后强行退出,所以数量可能存在误差,误差在可接受范围内。
无论是channel还是cond,总生产任务数都等于总消费任务数,符合预期。
由于消费者只做计数统计不做其他业务,不会出现消费能力低于生产能力的情况,
<QUEUE_NUM_CANCEL_PRODUCE>又设置的比较大(1000),所以生产者没有丢弃过任务。

以下是每秒任务数随时间轴变化折线图:

任务数折线图

cpu使用对比

cpu平均使用率

  • channel:8.44%
  • cond:8.37%

以下是每秒cpu使用率随时间轴变化折线图:

cpu使用折线图

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main

import (
"fmt"
"sync"
"sync/atomic"
"time"
)

type Item struct {
index int // 生产消费对的编号
count int // 任务编号,在每个生产者中自增
}

var (
USING_CHANNEL bool = false // true 使用channel,false 使用cond

PAIR_NUM int = 1000 // 有多少对生产者消费者
QUEUE_NUM_CANCEL_PRODUCE int = 1000 // 生产时发现任务队列超过该值时,则丢弃该次生产的任务
PRODUCE_INTERVAL_MS int = 50 // 每个生产者的生产间隔
TEST_DURATION_SEC int = 600 // 总共测试时长
)

var gProduceCount int32 // 总生产任务数量
var gConsumeCount int32 // 总消费任务数量
var gDropCount int32 // 总生产时因任务队列超过阈值而丢弃的任务数量

func channelProduce(ch chan Item, index int) {
for i := 0; ; i++ {
if len(ch) > QUEUE_NUM_CANCEL_PRODUCE {
atomic.AddInt32(&gDropCount, 1)
fmt.Printf("cancel produce. index:%d len:%d\n", index, len(ch))
} else {
atomic.AddInt32(&gProduceCount, 1)
ch <- Item{index: index, count: i}
}
time.Sleep(time.Duration(PRODUCE_INTERVAL_MS) * time.Millisecond)
}
}

func channelConsume(ch chan Item, index int) {
for {
<-ch
atomic.AddInt32(&gConsumeCount, 1)
}
}

func condProduce(cond *sync.Cond, itemQueue *[]Item, index int) {
for i := 0; ; i++ {
cond.L.Lock()
if len(*itemQueue) > QUEUE_NUM_CANCEL_PRODUCE {
atomic.AddInt32(&gDropCount, 1)
fmt.Printf("cancel produce. index:%d len:%d\n", index, len(*itemQueue))
} else {
atomic.AddInt32(&gProduceCount, 1)
*itemQueue = append(*itemQueue, Item{index: index, count: i})
cond.Signal()
}
cond.L.Unlock()
time.Sleep(time.Duration(PRODUCE_INTERVAL_MS) * time.Millisecond)
}
}

func condConsume(cond *sync.Cond, itemQueue *[]Item, index int) {
for {
cond.L.Lock()
for len(*itemQueue) == 0 {
cond.Wait()
}
//fmt.Printf("index:%d item:(%d:%d)\n", index, (*itemQueue)[0].index, (*itemQueue)[0].count)
*itemQueue = (*itemQueue)[1:]
cond.L.Unlock()
atomic.AddInt32(&gConsumeCount, 1)
}
}

func trace() {
var prevProduceCount int32
var prevConsumeCount int32
for {
produceCount := atomic.LoadInt32(&gProduceCount)
consumeCount := atomic.LoadInt32(&gConsumeCount)
fmt.Printf("trace pc:%d cc:%d\n", produceCount-prevProduceCount, consumeCount-prevConsumeCount)
prevProduceCount = produceCount
prevConsumeCount = consumeCount
time.Sleep(time.Duration(1) * time.Second)
}
}

func main() {
fmt.Printf("using channel:%t\n", USING_CHANNEL)

for i := 0; i < PAIR_NUM; i++ {
if USING_CHANNEL {
ch := make(chan Item, QUEUE_NUM_CANCEL_PRODUCE*2)

go channelProduce(ch, i)
go channelConsume(ch, i)
} else {
cond := sync.NewCond(new(sync.Mutex))
var itemQueue []Item

go condProduce(cond, &itemQueue, i)
go condConsume(cond, &itemQueue, i)
}
}
go trace()

time.Sleep(time.Duration(TEST_DURATION_SEC) * time.Second)
produceCount := atomic.LoadInt32(&gProduceCount)
consumeCount := atomic.LoadInt32(&gConsumeCount)
dropCount := atomic.LoadInt32(&gDropCount)
fmt.Printf("produce count: %d, consume count: %d drop count: %d\n", produceCount, consumeCount, dropCount)
}

其他

对channel使用len获取当前大小是否线程安全

用len获取当前channel的大小是线程安全,但是由于获取大小、读取元素、写入元素是三个独立的操作,所以,获取了长度之后在做写入元素的操作时长度可能已经发生变化。

对于我们例子的场景,这种变化是可接受的。即在获取大小并判断得出已超过channel的生产阈值之后,消费协程可能又消费了一些元素,此时channel的大小可能比阈值要小,但我们依然丢弃当前元素的写入。

TODO

希望后续如果有时间有能力可以看看golang中channel的实现原理,slice的实现原理,container list的实现原理。

本文完,作者yoko,尊重劳动人民成果,转载请注明原文出处: https://pengrl.com/p/24468/

0%