Skip to content

Commit

Permalink
fix: Fix for cache of ES transport clients that can result in too man…
Browse files Browse the repository at this point in the history
…y open files
  • Loading branch information
driskell committed Mar 6, 2023
1 parent cb1a196 commit bf45f4e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 15 deletions.
32 changes: 18 additions & 14 deletions lc-lib/transports/es/transportes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type payload struct {
events []*event.Event
}

type clientCache struct {
type clientCacheItem struct {
client *http.Client
expires time.Time
}
Expand All @@ -62,7 +62,7 @@ type transportES struct {
config *TransportESFactory
netConfig *transports.Config
poolEntry *addresspool.PoolEntry
clients map[*addresspool.Address]*clientCache
clientCache map[*addresspool.Address]*clientCacheItem
eventChan chan<- transports.Event

// Internal
Expand All @@ -88,8 +88,11 @@ func (t *transportES) startController() {
// controllerRoutine is the master routine which handles submission
func (t *transportES) controllerRoutine() {
defer func() {
// Wait for all routines to close
// Wait for all routines to close and close all connections
t.wait.Wait()
for _, cacheItem := range t.clientCache {
cacheItem.client.CloseIdleConnections()
}
t.eventChan <- transports.NewStatusEvent(t.ctx, transports.Finished, nil)
}()

Expand Down Expand Up @@ -486,16 +489,17 @@ func (t *transportES) getClient(addr *addresspool.Address) *http.Client {
defer t.poolMutex.Unlock()

now := time.Now()
cacheItem, ok := t.clients[addr]
expires := time.Now().Add(time.Second * 300)
cacheItem, ok := t.clientCache[addr]
if ok {
if cacheItem.expires.After(now) {
return cacheItem.client
}
delete(t.clients, addr)
for key, cacheItem := range t.clients {
if cacheItem.expires.Before(now) {
delete(t.clients, key)
}
cacheItem.expires = expires
return cacheItem.client
}

for key, cacheItem := range t.clientCache {
if cacheItem.expires.Before(now) {
cacheItem.client.CloseIdleConnections()
delete(t.clientCache, key)
}
}

Expand All @@ -516,7 +520,7 @@ func (t *transportES) getClient(addr *addresspool.Address) *http.Client {
},
Timeout: t.netConfig.Timeout,
}
expires := time.Now().Add(time.Second * 300)
t.clients[addr] = &clientCache{client, expires}

t.clientCache[addr] = &clientCacheItem{client, expires}
return client
}
2 changes: 1 addition & 1 deletion lc-lib/transports/es/transportesfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (f *TransportESFactory) NewTransport(ctx context.Context, poolEntry *addres
netConfig: transports.FetchConfig(f.config),
poolEntry: poolEntry,
eventChan: eventChan,
clients: make(map[*addresspool.Address]*clientCache),
clientCache: make(map[*addresspool.Address]*clientCacheItem),
}

ret.startController()
Expand Down

0 comments on commit bf45f4e

Please sign in to comment.