Skip to content

Commit

Permalink
time: garbage collect unstopped Tickers and Timers
Browse files Browse the repository at this point in the history
From the beginning of Go, the time package has had a gotcha:
if you use a select on <-time.After(1*time.Minute), even if the select
finishes immediately because some other case is ready, the underlying
timer from time.After keeps running until the minute is over. This
pins the timer in the timer heap, which keeps it from being garbage
collected and in extreme cases also slows down timer operations.
The lack of garbage collection is the more important problem.

The docs for After warn against this scenario and suggest using
NewTimer with a call to Stop after the select instead, purely to work
around this garbage collection problem.

Oddly, the docs for NewTimer and NewTicker do not mention this
problem, but they have the same issue: they cannot be collected until
either they are Stopped or, in the case of Timer, the timer expires.
(Tickers repeat, so they never expire.) People have built up a shared
knowledge that timers and tickers need to defer t.Stop even though the
docs do not mention this (it is somewhat implied by the After docs).

This CL fixes the garbage collection problem, so that a timer that is
unreferenced can be GC'ed immediately, even if it is still running.
The approach is to only insert the timer into the heap when some
channel operation is blocked on it; the last channel operation to stop
using the timer takes it back out of the heap. When a timer's channel
is no longer referenced, there are no channel operations blocked on
it, so it's not in the heap, so it can be GC'ed immediately.

This CL adds an undocumented GODEBUG asynctimerchan=1
that will disable the change. The documentation happens in
the CL 568341.

Fixes #8898.
Fixes #61542.

Change-Id: Ieb303b6de1fb3527d3256135151a9e983f3c27e6
Reviewed-on: https://go-review.googlesource.com/c/go/+/512355
Reviewed-by: Austin Clements <[email protected]>
LUCI-TryBot-Result: Go LUCI <[email protected]>
Auto-Submit: Russ Cox <[email protected]>
  • Loading branch information
rsc committed Mar 13, 2024
1 parent 74a0e31 commit 508bb17
Show file tree
Hide file tree
Showing 16 changed files with 699 additions and 73 deletions.
2 changes: 2 additions & 0 deletions doc/godebug.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ and the [go command documentation](/cmd/go#hdr-Build_and_test_caching).

### Go 1.23

TODO: `asynctimerchan` setting.

Go 1.23 changed the mode bits reported by [`os.Lstat`](/pkg/os#Lstat) and [`os.Stat`](/pkg/os#Stat)
for reparse points, which can be controlled with the `winsymlink` setting.
As of Go 1.23 (`winsymlink=1`), mount points no longer have [`os.ModeSymlink`](/pkg/os#ModeSymlink)
Expand Down
1 change: 1 addition & 0 deletions src/internal/godebugs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Info struct {
// Note: After adding entries to this table, update the list in doc/godebug.md as well.
// (Otherwise the test in this package will fail.)
var All = []Info{
{Name: "asynctimerchan", Package: "time", Opaque: true},
{Name: "execerrdot", Package: "os/exec"},
{Name: "gocachehash", Package: "cmd/go"},
{Name: "gocachetest", Package: "cmd/go"},
Expand Down
25 changes: 24 additions & 1 deletion src/runtime/chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type hchan struct {
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
timer *timer // timer feeding this chan
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
Expand Down Expand Up @@ -426,12 +427,19 @@ func closechan(c *hchan) {
}

// empty reports whether a read from c would block (that is, the channel is
// empty). It uses a single atomic read of mutable state.
// empty). It is atomically correct and sequentially consistent at the moment
// it returns, but since the channel is unlocked, the channel may become
// non-empty immediately afterward.
func empty(c *hchan) bool {
// c.dataqsiz is immutable.
if c.dataqsiz == 0 {
return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
}
// c.timer is also immutable (it is set after make(chan) but before any channel operations).
// All timer channels have dataqsiz > 0.
if c.timer != nil {
c.timer.maybeRunChan()
}
return atomic.Loaduint(&c.qcount) == 0
}

Expand Down Expand Up @@ -470,6 +478,10 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
throw("unreachable")
}

if c.timer != nil {
c.timer.maybeRunChan()
}

// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
Expand Down Expand Up @@ -570,11 +582,16 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg

mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
if c.timer != nil {
blockTimerChan(c)
}

// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
Expand All @@ -586,6 +603,9 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
Expand Down Expand Up @@ -728,6 +748,9 @@ func chanlen(c *hchan) int {
if c == nil {
return 0
}
if c.timer != nil {
c.timer.maybeRunChan()
}
return int(c.qcount)
}

Expand Down
48 changes: 24 additions & 24 deletions src/runtime/lockrank.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/runtime/mgcscavenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (s *scavengerState) init() {
s.g = getg()

s.timer = new(timer)
f := func(s any, _ uintptr) {
f := func(s any, _ uintptr, _ int64) {
s.(*scavengerState).wake()
}
s.timer.init(f, s)
Expand Down
9 changes: 5 additions & 4 deletions src/runtime/mklockrank.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ assistQueue,
< SCHED
# Below SCHED is the scheduler implementation.
< allocmR,
execR
< sched;
execR;
allocmR, execR, hchan < sched;
sched < allg, allp;
hchan, pollDesc, wakeableSleep < timers;
timers < timer < netpollInit;
# Channels
NONE < notifyList;
hchan, notifyList < sudog;
hchan, pollDesc, wakeableSleep < timers;
timers < timer < netpollInit;
# Semaphores
NONE < root;
Expand Down
6 changes: 3 additions & 3 deletions src/runtime/netpoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -658,15 +658,15 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
netpollAdjustWaiters(delta)
}

func netpollDeadline(arg any, seq uintptr) {
func netpollDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, true)
}

func netpollReadDeadline(arg any, seq uintptr) {
func netpollReadDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, true, false)
}

func netpollWriteDeadline(arg any, seq uintptr) {
func netpollWriteDeadline(arg any, seq uintptr, delta int64) {
netpolldeadlineimpl(arg.(*pollDesc), seq, false, true)
}

Expand Down
25 changes: 18 additions & 7 deletions src/runtime/runtime1.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,12 +339,25 @@ var debug struct {
sbrk int32

panicnil atomic.Int32

// asynctimerchan controls whether timer channels
// behave asynchronously (as in Go 1.22 and earlier)
// instead of their Go 1.23+ synchronous behavior.
// The value can change at any time (in response to os.Setenv("GODEBUG"))
// and affects all extant timer channels immediately.
// Programs wouldn't normally change over an execution,
// but allowing it is convenient for testing and for programs
// that do an os.Setenv in main.init or main.main.
asynctimerchan atomic.Int32
}

var dbgvars = []*dbgVar{
{name: "adaptivestackstart", value: &debug.adaptivestackstart},
{name: "allocfreetrace", value: &debug.allocfreetrace},
{name: "clobberfree", value: &debug.clobberfree},
{name: "asyncpreemptoff", value: &debug.asyncpreemptoff},
{name: "asynctimerchan", atomic: &debug.asynctimerchan},
{name: "cgocheck", value: &debug.cgocheck},
{name: "clobberfree", value: &debug.clobberfree},
{name: "disablethp", value: &debug.disablethp},
{name: "dontfreezetheworld", value: &debug.dontfreezetheworld},
{name: "efence", value: &debug.efence},
Expand All @@ -353,21 +366,19 @@ var dbgvars = []*dbgVar{
{name: "gcshrinkstackoff", value: &debug.gcshrinkstackoff},
{name: "gcstoptheworld", value: &debug.gcstoptheworld},
{name: "gctrace", value: &debug.gctrace},
{name: "harddecommit", value: &debug.harddecommit},
{name: "inittrace", value: &debug.inittrace},
{name: "invalidptr", value: &debug.invalidptr},
{name: "madvdontneed", value: &debug.madvdontneed},
{name: "panicnil", atomic: &debug.panicnil},
{name: "runtimecontentionstacks", atomic: &debug.runtimeContentionStacks},
{name: "sbrk", value: &debug.sbrk},
{name: "scavtrace", value: &debug.scavtrace},
{name: "scheddetail", value: &debug.scheddetail},
{name: "schedtrace", value: &debug.schedtrace},
{name: "traceadvanceperiod", value: &debug.traceadvanceperiod},
{name: "tracebackancestors", value: &debug.tracebackancestors},
{name: "asyncpreemptoff", value: &debug.asyncpreemptoff},
{name: "inittrace", value: &debug.inittrace},
{name: "harddecommit", value: &debug.harddecommit},
{name: "adaptivestackstart", value: &debug.adaptivestackstart},
{name: "tracefpunwindoff", value: &debug.tracefpunwindoff},
{name: "panicnil", atomic: &debug.panicnil},
{name: "traceadvanceperiod", value: &debug.traceadvanceperiod},
}

func parsedebugvars() {
Expand Down
11 changes: 11 additions & 0 deletions src/runtime/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo
continue
}

if cas.c.timer != nil {
cas.c.timer.maybeRunChan()
}

j := cheaprandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
Expand Down Expand Up @@ -315,6 +319,10 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo
} else {
c.recvq.enqueue(sg)
}

if c.timer != nil {
blockTimerChan(c)
}
}

// wait for someone to wake us up
Expand Down Expand Up @@ -351,6 +359,9 @@ func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, blo

for _, casei := range lockorder {
k = &scases[casei]
if k.c.timer != nil {
unblockTimerChan(k.c)
}
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
Expand Down
Loading

0 comments on commit 508bb17

Please sign in to comment.