Skip to content

Commit

Permalink
WIP rcmgr auto limit scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
marten-seemann committed Jul 3, 2022
1 parent 123f1f6 commit 462224d
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 266 deletions.
4 changes: 2 additions & 2 deletions config/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ type ConnMgr struct {
// <https://github.com/libp2p/go-libp2p-resource-manager#readme>
type ResourceMgr struct {
// Enables the Network Resource Manager feature, default to on.
Enabled Flag `json:",omitempty"`
Limits *rcmgr.BasicLimiterConfig `json:",omitempty"`
Enabled Flag `json:",omitempty"`
Limits *rcmgr.LimitConfig `json:",omitempty"`
}

const (
Expand Down
2 changes: 1 addition & 1 deletion core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel

// set scope limit to new values (when limit.json is passed as a second arg)
if req.Files != nil {
var newLimit rcmgr.BasicLimitConfig
var newLimit rcmgr.BaseLimit
it := req.Files.Entries()
if it.Next() {
file := files.FileFromEntry(it)
Expand Down
165 changes: 40 additions & 125 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"

"github.com/benbjohnson/clock"
config "github.com/ipfs/go-ipfs/config"
"github.com/ipfs/go-ipfs/config"
"github.com/ipfs/go-ipfs/core/node/helpers"
"github.com/ipfs/go-ipfs/repo"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -33,7 +33,7 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {

enabled := cfg.ResourceMgr.Enabled.WithDefault(false)

/// ENV overrides Config (if present)
// ENV overrides Config (if present)
switch os.Getenv("LIBP2P_RCMGR") {
case "0", "false":
enabled = false
Expand All @@ -49,19 +49,17 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
}

defaultLimits := adjustedDefaultLimits(cfg)
limitCfg := adjustedDefaultLimits(cfg)
libp2p.SetDefaultServiceLimits(limitCfg)
limits := limitCfg.AutoScale()

var limits rcmgr.BasicLimiterConfig
if cfg.ResourceMgr.Limits != nil {
limits = *cfg.ResourceMgr.Limits
l := *cfg.ResourceMgr.Limits
l.Apply(limits)
limits = l
}

limiter, err := rcmgr.NewLimiter(limits, defaultLimits)
if err != nil {
return nil, opts, err
}

libp2p.SetDefaultServiceLimits(limiter)
limiter := rcmgr.NewFixedLimiter(limits)

ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics())}

Expand Down Expand Up @@ -194,39 +192,24 @@ func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) {
}
}

func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig, error) {
var result rcmgr.BasicLimitConfig
func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BaseLimit, error) {
var result rcmgr.BaseLimit
getLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}
limit := limiter.Limit()
switch l := limit.(type) {
case *rcmgr.StaticLimit:
result.Dynamic = false
case *rcmgr.BaseLimit:
result.Memory = l.Memory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD

case *rcmgr.DynamicLimit:
result.Dynamic = true
result.MemoryFraction = l.MemoryLimit.MemoryFraction
result.MinMemory = l.MemoryLimit.MinMemory
result.MaxMemory = l.MemoryLimit.MaxMemory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD

result.Streams = l.Streams
result.StreamsInbound = l.StreamsInbound
result.StreamsOutbound = l.StreamsOutbound
result.Conns = l.Conns
result.ConnsInbound = l.ConnsInbound
result.ConnsOutbound = l.ConnsOutbound
result.FD = l.FD
default:
return fmt.Errorf("unknown limit type %T", limit)
}
Expand All @@ -236,89 +219,36 @@ func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig

switch {
case scope == config.ResourceMgrSystemScope:
err := mgr.ViewSystem(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewSystem(func(s network.ResourceScope) error { return getLimit(s) })
case scope == config.ResourceMgrTransientScope:
err := mgr.ViewTransient(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewTransient(func(s network.ResourceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err := mgr.ViewService(svc, func(s network.ServiceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewService(svc, func(s network.ServiceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
pid, err := peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewPeer(pid, func(s network.PeerScope) error { return getLimit(s) })
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
}

// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config.
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BasicLimitConfig) error {
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BaseLimit) error {
setLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}

var newLimit rcmgr.Limit
if limit.Dynamic {
newLimit = &rcmgr.DynamicLimit{
MemoryLimit: rcmgr.MemoryLimit{
MemoryFraction: limit.MemoryFraction,
MinMemory: limit.MinMemory,
MaxMemory: limit.MaxMemory,
},
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
} else {
newLimit = &rcmgr.StaticLimit{
Memory: limit.Memory,
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
}

limiter.SetLimit(newLimit)
limiter.SetLimit(&limit)
return nil
}

Expand All @@ -328,65 +258,50 @@ func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limi
}

if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
cfg.Swarm.ResourceMgr.Limits = &rcmgr.LimitConfig{}
}
configLimits := cfg.Swarm.ResourceMgr.Limits

var setConfigFunc func()
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.System = &limit }

err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.System = limit }
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.Transient = &limit }

err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.Transient = limit }
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
return setLimit(s)
})
err = mgr.ViewService(svc, func(s network.ServiceScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Service == nil {
configLimits.Service = map[string]rcmgr.BasicLimitConfig{}
configLimits.Service = map[string]rcmgr.BaseLimit{}
}
configLimits.Service[svc] = limit
}

case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return setLimit(s)
})
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Protocol == nil {
configLimits.Protocol = map[string]rcmgr.BasicLimitConfig{}
configLimits.Protocol = map[protocol.ID]rcmgr.BaseLimit{}
}
configLimits.Protocol[proto] = limit
configLimits.Protocol[protocol.ID(proto)] = limit
}

case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return setLimit(s)
})
err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Peer == nil {
configLimits.Peer = map[string]rcmgr.BasicLimitConfig{}
configLimits.Peer = map[peer.ID]rcmgr.BaseLimit{}
}
configLimits.Peer[p] = limit
configLimits.Peer[pid] = limit
}

default:
return fmt.Errorf("invalid scope %q", scope)
}
Expand All @@ -396,7 +311,7 @@ func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limi
}

if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
cfg.Swarm.ResourceMgr.Limits = &rcmgr.LimitConfig{}
}
setConfigFunc()

Expand Down
Loading

0 comments on commit 462224d

Please sign in to comment.