Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: go-libp2p v0.21 (rcmgr auto scaling) #9074

Merged
merged 27 commits into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c3589a1
WIP rcmgr auto limit scaling
marten-seemann Jul 2, 2022
679d3d2
Add obs to resource manager
MarcoPolo Jun 21, 2022
2b2efc6
Add allowlist config option
MarcoPolo Jul 5, 2022
68d9073
Use RC version
MarcoPolo Jul 5, 2022
62a9829
Reintroduce connmgr hi watermark logic
MarcoPolo Jul 6, 2022
66b974d
Add sharness test for allowlist
MarcoPolo Jul 6, 2022
550663f
Fix lint
MarcoPolo Jul 25, 2022
1ff4e48
Fix err
MarcoPolo Jul 25, 2022
f4f4128
Update rcmgr dep
MarcoPolo Jul 26, 2022
dffdd35
Update go-libp2p to latest master
MarcoPolo Jul 26, 2022
0b3c7bc
mod tidy
MarcoPolo Jul 26, 2022
30fb10a
Update pinned defaults
MarcoPolo Jul 26, 2022
7245e8d
Update checkImplicitDefaults
MarcoPolo Jul 26, 2022
0c58d6e
Use released version of go-libp2p v0.21;
MarcoPolo Aug 11, 2022
629245a
Use released version of rcmgr, use specific commit for kad-dht
MarcoPolo Aug 11, 2022
3f42d96
Use newer rcmgr
MarcoPolo Aug 11, 2022
b0adea3
Update go.mod/sum in examples/kubo-as-a-library
MarcoPolo Aug 11, 2022
4cd437d
Use replace in kubo example as a library
MarcoPolo Aug 11, 2022
27b046f
Move register exporter to metrics file
MarcoPolo Aug 12, 2022
3ed83e4
Update go-libp2p-resource-manager to v0.5.3
MarcoPolo Aug 12, 2022
da54d5c
Check err for view.Register
MarcoPolo Aug 12, 2022
194eb2f
Go mod tidy
MarcoPolo Aug 12, 2022
831135c
Update go-libp2p-kad-dht to v0.17.0
MarcoPolo Aug 15, 2022
5c858d5
Go mod tidy kubo-as-a-library
MarcoPolo Aug 15, 2022
5ca74c7
docs(config): Swarm.ResourceMgr.Allowlist
lidel Aug 16, 2022
a42848a
chore: switch back to goleveldb v1.0.0
lidel Aug 16, 2022
77251b6
test(rcmgr): regression test prometherus metrics
lidel Aug 16, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ func serveHTTPApi(req *cmds.Request, cctx *oldcmds.Context) (<-chan error, error
var opts = []corehttp.ServeOption{
corehttp.MetricsCollectionOption("api"),
corehttp.MetricsOpenCensusCollectionOption(),
corehttp.MetricsOpenCensusDefaultPrometheusRegistry(),
corehttp.CheckVersionOption(),
corehttp.CommandsOption(*cctx),
corehttp.WebUIOption,
Expand Down
8 changes: 6 additions & 2 deletions config/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ type ConnMgr struct {
// <https://github.com/libp2p/go-libp2p-resource-manager#readme>
type ResourceMgr struct {
// Enables the Network Resource Manager feature, default to on.
Enabled Flag `json:",omitempty"`
Limits *rcmgr.BasicLimiterConfig `json:",omitempty"`
Enabled Flag `json:",omitempty"`
Limits *rcmgr.LimitConfig `json:",omitempty"`
// A list of multiaddrs that can bypass normal system limits (but are still
// limited by the allowlist scope). Convenience config around
// https://pkg.go.dev/github.com/libp2p/go-libp2p-resource-manager#Allowlist.Add
Allowlist []string `json:",omitempty"`
}

const (
Expand Down
2 changes: 1 addition & 1 deletion core/commands/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ Changes made via command line are persisted in the Swarm.ResourceMgr.Limits fiel

// set scope limit to new values (when limit.json is passed as a second arg)
if req.Files != nil {
var newLimit rcmgr.BasicLimitConfig
var newLimit rcmgr.BaseLimit
it := req.Files.Entries()
if it.Next() {
file := files.FileFromEntry(it)
Expand Down
24 changes: 24 additions & 0 deletions core/corehttp/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,30 @@ func MetricsOpenCensusCollectionOption() ServeOption {
}
}

// MetricsOpenCensusDefaultPrometheusRegistry registers the default prometheus
// registry as an exporter to OpenCensus metrics. This means that OpenCensus
// metrics will show up in the prometheus metrics endpoint
func MetricsOpenCensusDefaultPrometheusRegistry() ServeOption {
return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
log.Info("Init OpenCensus with default prometheus registry")

pe, err := ocprom.NewExporter(ocprom.Options{
Registry: prometheus.DefaultRegisterer.(*prometheus.Registry),
OnError: func(err error) {
log.Errorw("OC default registry ERROR", "error", err)
},
})
if err != nil {
return nil, err
}

// register prometheus with opencensus
view.RegisterExporter(pe)

return mux, nil
}
}

// MetricsCollectionOption adds collection of net/http-related metrics.
func MetricsCollectionOption(handlerName string) ServeOption {
return func(_ *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
Expand Down
182 changes: 61 additions & 121 deletions core/node/libp2p/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
rcmgrObs "github.com/libp2p/go-libp2p-resource-manager/obs"
"github.com/multiformats/go-multiaddr"
"go.opencensus.io/stats/view"

"go.uber.org/fx"
)
Expand All @@ -34,7 +37,7 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {

enabled := cfg.ResourceMgr.Enabled.WithDefault(false)

/// ENV overrides Config (if present)
// ENV overrides Config (if present)
switch os.Getenv("LIBP2P_RCMGR") {
case "0", "false":
enabled = false
Expand All @@ -50,21 +53,41 @@ func ResourceManager(cfg config.SwarmConfig) interface{} {
return nil, opts, fmt.Errorf("opening IPFS_PATH: %w", err)
}

defaultLimits := adjustedDefaultLimits(cfg)
limits := adjustedDefaultLimits(cfg)

var limits rcmgr.BasicLimiterConfig
if cfg.ResourceMgr.Limits != nil {
limits = *cfg.ResourceMgr.Limits
l := *cfg.ResourceMgr.Limits
l.Apply(limits)
limits = l
}

limiter, err := rcmgr.NewLimiter(limits, defaultLimits)
limiter := rcmgr.NewFixedLimiter(limits)

str, err := rcmgrObs.NewStatsTraceReporter()
if err != nil {
return nil, opts, err
}

libp2p.SetDefaultServiceLimits(limiter)
ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics()), rcmgr.WithTraceReporter(str)}

if len(cfg.ResourceMgr.Allowlist) > 0 {
var mas []multiaddr.Multiaddr
for _, maStr := range cfg.ResourceMgr.Allowlist {
ma, err := multiaddr.NewMultiaddr(maStr)
if err != nil {
log.Errorf("failed to parse multiaddr=%v for allowlist, skipping. err=%v", maStr, err)
continue
}
mas = append(mas, ma)
}
ropts = append(ropts, rcmgr.WithAllowlistedMultiaddrs(mas))
log.Infof("Setting allowlist to: %v", mas)
}

ropts := []rcmgr.Option{rcmgr.WithMetrics(createRcmgrMetrics())}
err = view.Register(rcmgrObs.DefaultViews...)
if err != nil {
return nil, opts, fmt.Errorf("registering rcmgr obs views: %w", err)
}

if os.Getenv("LIBP2P_DEBUG_RCMGR") != "" {
traceFilePath := filepath.Join(repoPath, NetLimitTraceFilename)
Expand Down Expand Up @@ -195,39 +218,24 @@ func NetStat(mgr network.ResourceManager, scope string) (NetStatOut, error) {
}
}

func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig, error) {
var result rcmgr.BasicLimitConfig
func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BaseLimit, error) {
var result rcmgr.BaseLimit
getLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}
limit := limiter.Limit()
switch l := limit.(type) {
case *rcmgr.StaticLimit:
result.Dynamic = false
case *rcmgr.BaseLimit:
result.Memory = l.Memory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD

case *rcmgr.DynamicLimit:
result.Dynamic = true
result.MemoryFraction = l.MemoryLimit.MemoryFraction
result.MinMemory = l.MemoryLimit.MinMemory
result.MaxMemory = l.MemoryLimit.MaxMemory
result.Streams = l.BaseLimit.Streams
result.StreamsInbound = l.BaseLimit.StreamsInbound
result.StreamsOutbound = l.BaseLimit.StreamsOutbound
result.Conns = l.BaseLimit.Conns
result.ConnsInbound = l.BaseLimit.ConnsInbound
result.ConnsOutbound = l.BaseLimit.ConnsOutbound
result.FD = l.BaseLimit.FD

result.Streams = l.Streams
result.StreamsInbound = l.StreamsInbound
result.StreamsOutbound = l.StreamsOutbound
result.Conns = l.Conns
result.ConnsInbound = l.ConnsInbound
result.ConnsOutbound = l.ConnsOutbound
result.FD = l.FD
default:
return fmt.Errorf("unknown limit type %T", limit)
}
Expand All @@ -237,89 +245,36 @@ func NetLimit(mgr network.ResourceManager, scope string) (rcmgr.BasicLimitConfig

switch {
case scope == config.ResourceMgrSystemScope:
err := mgr.ViewSystem(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewSystem(func(s network.ResourceScope) error { return getLimit(s) })
case scope == config.ResourceMgrTransientScope:
err := mgr.ViewTransient(func(s network.ResourceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewTransient(func(s network.ResourceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to worry about spans here?

svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err := mgr.ViewService(svc, func(s network.ServiceScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewService(svc, func(s network.ServiceScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err := mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return getLimit(s) })
case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
pid, err := peer.Decode(p)
if err != nil {
return result, fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return getLimit(s)
})
return result, err

return result, mgr.ViewPeer(pid, func(s network.PeerScope) error { return getLimit(s) })
default:
return result, fmt.Errorf("invalid scope %q", scope)
}
}

// NetSetLimit sets new ResourceManager limits for the given scope. The limits take effect immediately, and are also persisted to the repo config.
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BasicLimitConfig) error {
func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limit rcmgr.BaseLimit) error {
setLimit := func(s network.ResourceScope) error {
limiter, ok := s.(rcmgr.ResourceScopeLimiter)
if !ok { // NullResourceManager
return NoResourceMgrError
}

var newLimit rcmgr.Limit
if limit.Dynamic {
newLimit = &rcmgr.DynamicLimit{
MemoryLimit: rcmgr.MemoryLimit{
MemoryFraction: limit.MemoryFraction,
MinMemory: limit.MinMemory,
MaxMemory: limit.MaxMemory,
},
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
} else {
newLimit = &rcmgr.StaticLimit{
Memory: limit.Memory,
BaseLimit: rcmgr.BaseLimit{
Streams: limit.Streams,
StreamsInbound: limit.StreamsInbound,
StreamsOutbound: limit.StreamsOutbound,
Conns: limit.Conns,
ConnsInbound: limit.ConnsInbound,
ConnsOutbound: limit.ConnsOutbound,
FD: limit.FD,
},
}
}

limiter.SetLimit(newLimit)
limiter.SetLimit(&limit)
return nil
}

Expand All @@ -329,65 +284,50 @@ func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limi
}

if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
cfg.Swarm.ResourceMgr.Limits = &rcmgr.LimitConfig{}
}
configLimits := cfg.Swarm.ResourceMgr.Limits

var setConfigFunc func()
switch {
case scope == config.ResourceMgrSystemScope:
err = mgr.ViewSystem(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.System = &limit }

err = mgr.ViewSystem(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.System = limit }
case scope == config.ResourceMgrTransientScope:
err = mgr.ViewTransient(func(s network.ResourceScope) error {
return setLimit(s)
})
setConfigFunc = func() { configLimits.Transient = &limit }

err = mgr.ViewTransient(func(s network.ResourceScope) error { return setLimit(s) })
setConfigFunc = func() { configLimits.Transient = limit }
case strings.HasPrefix(scope, config.ResourceMgrServiceScopePrefix):
svc := strings.TrimPrefix(scope, config.ResourceMgrServiceScopePrefix)
err = mgr.ViewService(svc, func(s network.ServiceScope) error {
return setLimit(s)
})
err = mgr.ViewService(svc, func(s network.ServiceScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Service == nil {
configLimits.Service = map[string]rcmgr.BasicLimitConfig{}
configLimits.Service = map[string]rcmgr.BaseLimit{}
}
configLimits.Service[svc] = limit
}

case strings.HasPrefix(scope, config.ResourceMgrProtocolScopePrefix):
proto := strings.TrimPrefix(scope, config.ResourceMgrProtocolScopePrefix)
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error {
return setLimit(s)
})
err = mgr.ViewProtocol(protocol.ID(proto), func(s network.ProtocolScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Protocol == nil {
configLimits.Protocol = map[string]rcmgr.BasicLimitConfig{}
configLimits.Protocol = map[protocol.ID]rcmgr.BaseLimit{}
}
configLimits.Protocol[proto] = limit
configLimits.Protocol[protocol.ID(proto)] = limit
}

case strings.HasPrefix(scope, config.ResourceMgrPeerScopePrefix):
p := strings.TrimPrefix(scope, config.ResourceMgrPeerScopePrefix)
var pid peer.ID
pid, err = peer.Decode(p)
if err != nil {
return fmt.Errorf("invalid peer ID: %q: %w", p, err)
}
err = mgr.ViewPeer(pid, func(s network.PeerScope) error {
return setLimit(s)
})
err = mgr.ViewPeer(pid, func(s network.PeerScope) error { return setLimit(s) })
setConfigFunc = func() {
if configLimits.Peer == nil {
configLimits.Peer = map[string]rcmgr.BasicLimitConfig{}
configLimits.Peer = map[peer.ID]rcmgr.BaseLimit{}
}
configLimits.Peer[p] = limit
configLimits.Peer[pid] = limit
}

default:
return fmt.Errorf("invalid scope %q", scope)
}
Expand All @@ -397,7 +337,7 @@ func NetSetLimit(mgr network.ResourceManager, repo repo.Repo, scope string, limi
}

if cfg.Swarm.ResourceMgr.Limits == nil {
cfg.Swarm.ResourceMgr.Limits = &rcmgr.BasicLimiterConfig{}
cfg.Swarm.ResourceMgr.Limits = &rcmgr.LimitConfig{}
}
setConfigFunc()

Expand Down
Loading