// 这里可以看到是对runtimeTimer的wrap type Timer struct { C <-chan Time r runtimeTimer }
funcNewTimer(d Duration) *Timer { // 注意,这里的channel是带缓冲的,保证了业务层如果不接收这个channel,底层的 // 桶协程不会因为发送channel而被阻塞 c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), // 向底层timer传入sendTime回调函数 f: sendTime, arg: c, }, } startTimer(&t.r) return t }
// 将底层的超时回调转化为channel发送,并写入了当前时间 funcsendTime(c interface{}, seq uintptr) { // Non-blocking send of time on c. // Used in NewTimer, it cannot block anyway (buffer). // Used in NewTicker, dropping sends on the floor is // the desired behavior when the reader gets behind, // because the sends are periodic. select { case c.(chan Time) <- Now(): default: } }
// timer结构体 type timer struct { tb *timersBucket // timer所属的桶 i int// 最小堆中的下标,为-1时则不可用了
// Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. when int64// 超时时间点 period int64// 如果是Ticker,会有这个值,周期性触发 f func(interface{}, uintptr) // 回调 arginterface{} // time.Timer会传入channel变量,一会回调时把channel带回去 seq uintptr// 这个变量目前没有用 }
// 桶数量固定为64 const timersLen = 64
// 全局桶数组,还对cache伪共享做了优化 var timers [timersLen]struct { timersBucket
// The padding should eliminate false sharing // between timersBucket values. pad [sys.CacheLineSize - unsafe.Sizeof(timersBucket{})%sys.CacheLineSize]byte }
// 桶协程,注意,这里有两层for循环,最外面的for是永远不会退出的 functimerproc(tb *timersBucket) { tb.gp = getg() for { // 进互斥锁 lock(&tb.lock) // 睡眠标志修改 tb.sleeping = false // 获取当前时间 now := nanotime() delta := int64(-1) for { // 如果桶内没有timer,直接退出内层for iflen(tb.t) == 0 { delta = -1 break } // 获取最早触发timer,并检查是否到达触发时间 t := tb.t[0] delta = t.when - now // 还没到时间,直接退出内层for if delta > 0 { break } ok := true // 如果是period有值,说明需要周期性触发,我们将该timer修改触发时间后,重新 // 插入最小堆中 if t.period > 0 { // leave in heap but adjust next time to fire t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(tb.t, 0) { ok = false } } else { // 从最小堆中删除 last := len(tb.t) - 1 if last > 0 { tb.t[0] = tb.t[last] tb.t[0].i = 0 } tb.t[last] = nil tb.t = tb.t[:last] if last > 0 { if !siftdownTimer(tb.t, 0) { ok = false } } // 下标设置为-1,deltimer时发现下标为-1则不用删除了 t.i = -1// mark as removed } // 把t中变量拷贝出来,就可以出锁了 f := t.f arg := t.arg seq := t.seq unlock(&tb.lock) // 堆调整时如果下标设置越界了,则丢到这里来处理,badTimer会直接panic if !ok { badTimer() } // 如果开了race检查的话 if raceenabled { raceacquire(unsafe.Pointer(t)) } f(arg, seq) lock(&tb.lock) } // 如果桶内没有timer了,把协程挂起 if delta < 0 || faketime > 0 { // No timers left - put goroutine to sleep. tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } // At least one timer pending. Sleep until then. // 如果还有协程,睡眠直到桶内最早触发时间点到达后唤醒 tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) unlock(&tb.lock) notetsleepg(&tb.waitnote, delta) } }
// Delete timer t from the heap. // Do not need to update the timerproc: if it wakes up early, no big deal. funcdeltimer(t *timer)bool { if t.tb == nil { // t.tb can be nil if the user created a timer // directly, without invoking startTimer e.g // time.Ticker{C: c} // In this case, return early without any deletion. // See Issue 21874. returnfalse }
tb := t.tb
lock(&tb.lock) // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i last := len(tb.t) - 1 // 如果已经触发过或已经被删除了,则返回false告知调用方 if i < 0 || i > last || tb.t[i] != t { unlock(&tb.lock) returnfalse } if i != last { tb.t[i] = tb.t[last] tb.t[i].i = i } tb.t[last] = nil tb.t = tb.t[:last] ok := true if i != last { if !siftupTimer(tb.t, i) { ok = false } if !siftdownTimer(tb.t, i) { ok = false } } unlock(&tb.lock) if !ok { badTimer() } returntrue }