-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathlimiter.go
101 lines (90 loc) · 2.54 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// Copyright 2011, 2012, 2013 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.
package utils
import (
"fmt"
"math/rand"
"time"
"github.com/juju/clock"
)
type empty struct{}
type limiter struct {
wait chan empty
minPause time.Duration
maxPause time.Duration
clock clock.Clock
}
// Limiter represents a limited resource (eg a semaphore).
type Limiter interface {
// Acquire another unit of the resource.
// Acquire returns false to indicate there is no more availability,
// until another entity calls Release.
Acquire() bool
// AcquireWait requests a unit of resource, but blocks until one is
// available.
AcquireWait()
// Release returns a unit of the resource. Calling Release when there
// are no units Acquired is an error.
Release() error
}
// NewLimiter creates a limiter.
func NewLimiter(maxAllowed int) Limiter {
return NewLimiterWithPause(maxAllowed, 0, 0, nil)
}
// NewLimiterWithPause creates a limiter. If minpause and maxPause is > 0,
// there will be a random delay in that duration range before attempting an Acquire.
func NewLimiterWithPause(maxAllowed int, minPause, maxPause time.Duration, clk clock.Clock) Limiter {
rand.Seed(time.Now().UTC().UnixNano())
if clk == nil {
clk = clock.WallClock
}
return limiter{
wait: make(chan empty, maxAllowed),
minPause: minPause,
maxPause: maxPause,
clock: clk,
}
}
// Acquire requests some resources that you can return later
// It returns 'true' if there are resources available, but false if they are
// not. Callers are responsible for calling Release if this returns true, but
// should not release if this returns false.
func (l limiter) Acquire() bool {
// Pause before attempting to grab a slot.
// This is optional depending on what was used to
// construct this limiter, and is used to throttle
// incoming connections.
l.pause()
e := empty{}
select {
case l.wait <- e:
return true
default:
return false
}
}
// AcquireWait waits for the resource to become available before returning.
func (l limiter) AcquireWait() {
e := empty{}
l.wait <- e
}
// Release returns the resource to the available pool.
func (l limiter) Release() error {
select {
case <-l.wait:
return nil
default:
return fmt.Errorf("Release without an associated Acquire")
}
}
func (l limiter) pause() {
if l.minPause <= 0 || l.maxPause <= 0 {
return
}
pauseRange := int((l.maxPause - l.minPause) / time.Millisecond)
pauseTime := time.Duration(rand.Intn(pauseRange)) * time.Millisecond
pauseTime += l.minPause
select {
case <-l.clock.After(pauseTime):
}
}