-
Notifications
You must be signed in to change notification settings - Fork 867
/
Copy pathbayes_changept_detector.py
182 lines (143 loc) · 7.34 KB
/
bayes_changept_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright 2016 Numenta Inc.
#
# Copyright may exist in Contributors' modifications
# and/or contributions to the work.
#
# Use of this source code is governed by the MIT
# license that can be found in the LICENSE file or at
# https://opensource.org/licenses/MIT.
import numpy
from scipy import stats
from nab.detectors.base import AnomalyDetector
class BayesChangePtDetector(AnomalyDetector):
""" Implementation of the online Bayesian changepoint detection algorithm as
described in Ryan P. Adams, David J.C. MacKay, "Bayesian Online Changepoint
Detection", arXiv 0710.3742 (2007).
The algorithm computes, for each record at step x in a data stream, the
probability that the current record is part of a stream of length n for all
n <= x. For a given record, if the maximimum of all the probabilities
corresponds to a stream length of zero, the record represents a changepoint in
the data stream. These probabilities are used to calculate anomaly scores for
NAB results.
The algorithm implemented here is a port from MATLAB code posted by R. Adams
(http://hips.seas.harvard.edu/content/bayesian-online-changepoint-detection).
It has been modified from the author's code to run online: rather than
initializing a run-length matrix with the size of the dataset, we use an array
that recursively overwrites old data (that is no longer needed by the
algorithm). Calculating anomaly scores for this changepoint algorithm is not
in the author's original code -- this is our own contribution.
We started with the parameters specified in the publication above, as well as
those found in the author's MATLAB implementation. Attempts at tuning these
parameters showed insignificant change in the overall NAB score. The
maxRunLength parameter (for this online implementation) was tuned to yield
the best NAB score. We tried a variety of methods for calculating anomaly
scores from run length probabilities, and implemented the best performing
option -- discussed in the comments at the end of handleRecord().
"""
def __init__(self, *args, **kwargs):
super(BayesChangePtDetector, self).__init__(*args, **kwargs)
# Setup the matrix that will hold our beliefs about the current
# run lengths. We'll initialize it all to zero at first. For efficiency
# we preallocate a data structure to hold only the info we need to detect
# change points: columns for the current and next recordNumber, and a
# sufficient number of rows (where each row represents probabilites of a
# run of that length).
self.maxRunLength = 500
self.runLengthProbs = numpy.zeros((self.maxRunLength + 2, 2))
# Record 0 is a boundary condition, where we know the run length is 0.
self.runLengthProbs[0, 0] = 1.0
# Init variables for state.
self.recordNumber = 0
self.previousMaxRun = 1
# Define algorithm's helpers.
self.observationLikelihoood = StudentTDistribution(alpha=0.1,
beta=0.001,
kappa=1.0,
mu=0.0)
self.lambdaConst = 250
self.hazardFunction = constantHazard
def handleRecord(self, inputData):
""" Returns a list [anomalyScore]. Algorithm details are in the comments."""
# To accomodate this next record, shift the columns of the run length
# probabilities matrix.
if self.recordNumber > 0:
self.runLengthProbs[:,0] = self.runLengthProbs[:,1]
self.runLengthProbs[:,1] = 0
# Evaluate the predictive distribution for the new datum under each of
# the parameters. This is standard Bayesian inference.
predProbs = self.observationLikelihoood.pdf(inputData["value"])
# Evaluate the hazard function for this interval
hazard = self.hazardFunction(self.recordNumber+1, self.lambdaConst)
# We only care about the probabilites up to maxRunLength.
runLengthIndex = min(self.recordNumber, self.maxRunLength)
# Evaluate the growth probabilities -- shift the probabilities down and to
# the right, scaled by the hazard function and the predictive probabilities.
self.runLengthProbs[1:runLengthIndex+2, 1] = (
self.runLengthProbs[:runLengthIndex+1, 0] *
predProbs[:runLengthIndex+1] *
(1-hazard)[:runLengthIndex+1]
)
# Evaluate the probability that there *was* a changepoint and we're
# accumulating the probability mass back down at run length = 0.
self.runLengthProbs[0, 1] = numpy.sum(
self.runLengthProbs[:runLengthIndex+1, 0] *
predProbs[:runLengthIndex+1] *
hazard[:runLengthIndex+1]
)
# Renormalize the run length probabilities for improved numerical stability.
self.runLengthProbs[:, 1] = (self.runLengthProbs[:, 1] /
self.runLengthProbs[:, 1].sum())
# Update the parameter sets for each possible run length.
self.observationLikelihoood.updateTheta(inputData["value"])
# Get the current run length with the highest probability.
maxRecursiveRunLength = self.runLengthProbs[:, 1].argmax()
# To calculate anomaly scores from run length probabilites we have several
# options, implemented below:
# 1. If the max probability for any run length is the run length of 0, we
# have a changepoint, thus anomaly score = 1.0.
# 2. The anomaly score is the probability of run length 0.
# 3. Compute a score by assuming a change in sequence from a previously
# long run is more anomalous than a change from a short run.
# Option 3 results in the best anomaly detections (by far):
if maxRecursiveRunLength < self.previousMaxRun:
anomalyScore = 1 - (float(maxRecursiveRunLength) / self.previousMaxRun)
else:
anomalyScore = 0.0
# Update state vars.
self.recordNumber += 1
self.previousMaxRun = maxRecursiveRunLength
return [anomalyScore]
def constantHazard(arraySize, lambdaConst):
""" The hazard function helps estimate the changepoint prior. Parameter
lambdaConst is the timescale on the prior distribution of the changepoint.
"""
return numpy.ones(arraySize) / float(lambdaConst)
class StudentTDistribution:
def __init__(self, alpha, beta, kappa, mu):
self.alpha0 = self.alpha = numpy.array([alpha])
self.beta0 = self.beta = numpy.array([beta])
self.kappa0 = self.kappa = numpy.array([kappa])
self.mu0 = self.mu = numpy.array([mu])
def pdf(self, data):
""" Probability density function for the Student's T continuous random
variable. More details here:
http://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.t.html
"""
return stats.t.pdf(x=data,
df=2*self.alpha,
loc=self.mu,
scale=numpy.sqrt( (self.beta * (self.kappa+1)) /
(self.alpha * self.kappa) )
)
def updateTheta(self, data):
""" Update parameters of the distribution."""
muT0 = numpy.concatenate(
(self.mu0, (self.kappa * self.mu + data) / (self.kappa + 1)))
kappaT0 = numpy.concatenate((self.kappa0, self.kappa + 1.))
alphaT0 = numpy.concatenate((self.alpha0, self.alpha + 0.5))
betaT0 = numpy.concatenate((self.beta0, self.beta + (self.kappa * (data -
self.mu)**2) / (2. * (self.kappa + 1.))))
self.mu = muT0
self.kappa = kappaT0
self.alpha = alphaT0
self.beta = betaT0