1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| package main
import ( "fmt" "sync" "sync/atomic" "time" )
type Item struct { index int count int }
var ( USING_CHANNEL bool = false
PAIR_NUM int = 1000 QUEUE_NUM_CANCEL_PRODUCE int = 1000 PRODUCE_INTERVAL_MS int = 50 TEST_DURATION_SEC int = 600 )
var gProduceCount int32 var gConsumeCount int32 var gDropCount int32
func channelProduce(ch chan Item, index int) { for i := 0; ; i++ { if len(ch) > QUEUE_NUM_CANCEL_PRODUCE { atomic.AddInt32(&gDropCount, 1) fmt.Printf("cancel produce. index:%d len:%d\n", index, len(ch)) } else { atomic.AddInt32(&gProduceCount, 1) ch <- Item{index: index, count: i} } time.Sleep(time.Duration(PRODUCE_INTERVAL_MS) * time.Millisecond) } }
func channelConsume(ch chan Item, index int) { for { <-ch atomic.AddInt32(&gConsumeCount, 1) } }
func condProduce(cond *sync.Cond, itemQueue *[]Item, index int) { for i := 0; ; i++ { cond.L.Lock() if len(*itemQueue) > QUEUE_NUM_CANCEL_PRODUCE { atomic.AddInt32(&gDropCount, 1) fmt.Printf("cancel produce. index:%d len:%d\n", index, len(*itemQueue)) } else { atomic.AddInt32(&gProduceCount, 1) *itemQueue = append(*itemQueue, Item{index: index, count: i}) cond.Signal() } cond.L.Unlock() time.Sleep(time.Duration(PRODUCE_INTERVAL_MS) * time.Millisecond) } }
func condConsume(cond *sync.Cond, itemQueue *[]Item, index int) { for { cond.L.Lock() for len(*itemQueue) == 0 { cond.Wait() } *itemQueue = (*itemQueue)[1:] cond.L.Unlock() atomic.AddInt32(&gConsumeCount, 1) } }
func trace() { var prevProduceCount int32 var prevConsumeCount int32 for { produceCount := atomic.LoadInt32(&gProduceCount) consumeCount := atomic.LoadInt32(&gConsumeCount) fmt.Printf("trace pc:%d cc:%d\n", produceCount-prevProduceCount, consumeCount-prevConsumeCount) prevProduceCount = produceCount prevConsumeCount = consumeCount time.Sleep(time.Duration(1) * time.Second) } }
func main() { fmt.Printf("using channel:%t\n", USING_CHANNEL)
for i := 0; i < PAIR_NUM; i++ { if USING_CHANNEL { ch := make(chan Item, QUEUE_NUM_CANCEL_PRODUCE*2)
go channelProduce(ch, i) go channelConsume(ch, i) } else { cond := sync.NewCond(new(sync.Mutex)) var itemQueue []Item
go condProduce(cond, &itemQueue, i) go condConsume(cond, &itemQueue, i) } } go trace()
time.Sleep(time.Duration(TEST_DURATION_SEC) * time.Second) produceCount := atomic.LoadInt32(&gProduceCount) consumeCount := atomic.LoadInt32(&gConsumeCount) dropCount := atomic.LoadInt32(&gDropCount) fmt.Printf("produce count: %d, consume count: %d drop count: %d\n", produceCount, consumeCount, dropCount) }
|