Skip to content
This repository has been archived by the owner on Aug 19, 2022. It is now read-only.

Commit

Permalink
include the current limits in memory allocation errors
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jun 7, 2022
1 parent cffc128 commit 59c71ed
Showing 1 changed file with 40 additions and 4 deletions.
44 changes: 40 additions & 4 deletions scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ func (e *errStreamOrConnLimitExceeded) AppendLogValues(v []interface{}) []interf
)
}

type errMemoryLimitExceeded struct {
current, attempted, limit int64
priority uint8
err error
}

func (e *errMemoryLimitExceeded) Error() string { return e.err.Error() }
func (e *errMemoryLimitExceeded) Unwrap() error { return e.err }
func (e *errMemoryLimitExceeded) AppendLogValues(v []interface{}) []interface{} {
return append(v,
"current", e.current,
"attempted", e.attempted,
"priority", e.priority,
"limit", e.limit,
)
}

// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
Expand Down Expand Up @@ -95,9 +112,14 @@ func (rc *resources) checkMemory(rsvp int64, prio uint8) error {
threshold := (1 + int64(prio)) * limit / 256

if newmem > threshold {
return network.ErrResourceLimitExceeded
return &errMemoryLimitExceeded{
current: rc.memory,
attempted: rsvp,
limit: limit,
priority: prio,
err: network.ErrResourceLimitExceeded,
}
}

return nil
}

Expand Down Expand Up @@ -307,7 +329,14 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
}

if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "stat", s.rc.stat(), "error", err)
logValues := make([]interface{}, 0, 7)
logValues = append(logValues, "scope", s.name)
var limitErr *errMemoryLimitExceeded
if errors.As(err, &limitErr) {
logValues = limitErr.AppendLogValues(logValues)
}
logValues = append(logValues, "stat", s.Stat(), "error", err)
log.Debugw("blocked memory reservation", logValues...)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
s.metrics.BlockMemory(size)
return s.wrapError(err)
Expand All @@ -333,7 +362,14 @@ func (s *resourceScope) reserveMemoryForEdges(size int, prio uint8) error {
var err error
for _, e := range s.edges {
if err = e.ReserveMemoryForChild(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation from constraining edge", "scope", s.name, "edge", e.name, "size", size, "priority", prio, "stat", e.Stat(), "error", err)
logValues := make([]interface{}, 0, 7)
logValues = append(logValues, "scope", s.name, "edge", e.name)
var limitErr *errMemoryLimitExceeded
if errors.As(err, &limitErr) {
logValues = limitErr.AppendLogValues(logValues)
}
logValues = append(logValues, "stat", e.Stat(), "error", err)
log.Debugw("blocked memory reservation from constraining edge", logValues...)
break
}

Expand Down

0 comments on commit 59c71ed

Please sign in to comment.