-
Notifications
You must be signed in to change notification settings - Fork 167
/
Copy pathtimeseries_anomalies.py
500 lines (392 loc) · 17 KB
/
timeseries_anomalies.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
"""
Time Series anomaly detection functions.
Some of the implementation is inspired by the paper https://arxiv.org/pdf/1802.04431.pdf
"""
import numpy as np
import pandas as pd
from scipy.optimize import fmin
def deltas(errors, epsilon, mean, std):
"""Compute mean and std deltas.
delta_mean = mean(errors) - mean(all errors below epsilon)
delta_std = std(errors) - std(all errors below epsilon)
Args:
errors (ndarray):
Array of errors.
epsilon (ndarray):
Threshold value.
mean (float):
Mean of errors.
std (float):
Standard deviation of errors.
Returns:
float, float:
* delta_mean.
* delta_std.
"""
below = errors[errors <= epsilon]
if not len(below):
return 0, 0
return mean - below.mean(), std - below.std()
def count_above(errors, epsilon):
"""Count number of errors and continuous sequences above epsilon.
Continuous sequences are counted by shifting and counting the number
of positions where there was a change and the original value was true,
which means that a sequence started at that position.
Args:
errors (ndarray):
Array of errors.
epsilon (ndarray):
Threshold value.
Returns:
int, int:
* Number of errors above epsilon.
* Number of continuous sequences above epsilon.
"""
above = errors > epsilon
total_above = len(errors[above])
above = pd.Series(above)
shift = above.shift(1)
change = above != shift
total_consecutive = sum(above & change)
return total_above, total_consecutive
def z_cost(z, errors, mean, std):
"""Compute how bad a z value is.
The original formula is::
(delta_mean/mean) + (delta_std/std)
------------------------------------------------------
number of errors above + (number of sequences above)^2
which computes the "goodness" of `z`, meaning that the higher the value
the better the `z`.
In this case, we return this value inverted (we make it negative), to convert
it into a cost function, as later on we will use scipy.fmin to minimize it.
Args:
z (ndarray):
Value for which a cost score is calculated.
errors (ndarray):
Array of errors.
mean (float):
Mean of errors.
std (float):
Standard deviation of errors.
Returns:
float:
Cost of z.
"""
epsilon = mean + z * std
delta_mean, delta_std = deltas(errors, epsilon, mean, std)
above, consecutive = count_above(errors, epsilon)
numerator = -(delta_mean / mean + delta_std / std)
denominator = above + consecutive ** 2
if denominator == 0:
return np.inf
return numerator / denominator
def _find_threshold(errors, z_range):
"""Find the ideal threshold.
The ideal threshold is the one that minimizes the z_cost function. Scipy.fmin is used
to find the minimum, using the values from z_range as starting points.
Args:
errors (ndarray):
Array of errors.
z_range (list):
List of two values denoting the range out of which the start points for the
scipy.fmin function are chosen.
Returns:
float:
Calculated threshold value.
"""
mean = errors.mean()
std = errors.std()
min_z, max_z = z_range
best_z = min_z
best_cost = np.inf
for z in range(min_z, max_z):
best = fmin(z_cost, z, args=(errors, mean, std), full_output=True, disp=False)
z, cost = best[0:2]
if cost < best_cost:
best_z = z[0]
best_cost = cost
return mean + best_z * std
def _fixed_threshold(errors, k=4):
"""Calculate the threshold.
The fixed threshold is defined as k standard deviations away from the mean.
Args:
errors (ndarray):
Array of errors.
Returns:
float:
Calculated threshold value.
"""
mean = errors.mean()
std = errors.std()
return mean + k * std
def _find_sequences(errors, epsilon, anomaly_padding):
"""Find sequences of values that are above epsilon.
This is done following this steps:
* create a boolean mask that indicates which values are above epsilon.
* mark certain range of errors around True values with a True as well.
* shift this mask by one place, filing the empty gap with a False.
* compare the shifted mask with the original one to see if there are changes.
* Consider a sequence start any point which was true and has changed.
* Consider a sequence end any point which was false and has changed.
Args:
errors (ndarray):
Array of errors.
epsilon (float):
Threshold value. All errors above epsilon are considered an anomaly.
anomaly_padding (int):
Number of errors before and after a found anomaly that are added to the
anomalous sequence.
Returns:
ndarray, float:
* Array containing start, end of each found anomalous sequence.
* Maximum error value that was not considered an anomaly.
"""
above = pd.Series(errors > epsilon)
index_above = np.argwhere(above.values)
for idx in index_above.flatten():
above[max(0, idx - anomaly_padding):min(idx + anomaly_padding + 1, len(above))] = True
shift = above.shift(1).fillna(False)
change = above != shift
if above.all():
max_below = 0
else:
max_below = max(errors[~above])
index = above.index
starts = index[above & change].tolist()
ends = (index[~above & change] - 1).tolist()
if len(ends) == len(starts) - 1:
ends.append(len(above) - 1)
return np.array([starts, ends]).T, max_below
def _get_max_errors(errors, sequences, max_below):
"""Get the maximum error for each anomalous sequence.
Also add a row with the max error which was not considered anomalous.
Table containing a ``max_error`` column with the maximum error of each
sequence and the columns ``start`` and ``stop`` with the corresponding start and stop
indexes, sorted descendingly by the maximum error.
Args:
errors (ndarray):
Array of errors.
sequences (ndarray):
Array containing start, end of anomalous sequences
max_below (float):
Maximum error value that was not considered an anomaly.
Returns:
pandas.DataFrame:
DataFrame object containing columns ``start``, ``stop`` and ``max_error``.
"""
max_errors = [{
'max_error': max_below,
'start': -1,
'stop': -1
}]
for sequence in sequences:
start, stop = sequence
sequence_errors = errors[start: stop + 1]
max_errors.append({
'start': start,
'stop': stop,
'max_error': max(sequence_errors)
})
max_errors = pd.DataFrame(max_errors).sort_values('max_error', ascending=False)
return max_errors.reset_index(drop=True)
def _prune_anomalies(max_errors, min_percent):
"""Prune anomalies to mitigate false positives.
This is done by following these steps:
* Shift the errors 1 negative step to compare each value with the next one.
* Drop the last row, which we do not want to compare.
* Calculate the percentage increase for each row.
* Find rows which are below ``min_percent``.
* Find the index of the latest of such rows.
* Get the values of all the sequences above that index.
Args:
max_errors (pandas.DataFrame):
DataFrame object containing columns ``start``, ``stop`` and ``max_error``.
min_percent (float):
Percentage of separation the anomalies need to meet between themselves and the
highest non-anomalous error in the window sequence.
Returns:
ndarray:
Array containing start, end, max_error of the pruned anomalies.
"""
next_error = max_errors['max_error'].shift(-1).iloc[:-1]
max_error = max_errors['max_error'].iloc[:-1]
increase = (max_error - next_error) / max_error
too_small = increase < min_percent
if too_small.all():
last_index = -1
else:
last_index = max_error[~too_small].index[-1]
return max_errors[['start', 'stop', 'max_error']].iloc[0: last_index + 1].values
def _compute_scores(pruned_anomalies, errors, threshold, window_start):
"""Compute the score of the anomalies.
Calculate the score of the anomalies proportional to the maximum error in the sequence
and add window_start timestamp to make the index absolute.
Args:
pruned_anomalies (ndarray):
Array of anomalies containing the start, end and max_error for all anomalies in
the window.
errors (ndarray):
Array of errors.
threshold (float):
Threshold value.
window_start (int):
Index of the first error value in the window.
Returns:
list:
List of anomalies containing start-index, end-index, score for each anomaly.
"""
anomalies = list()
denominator = errors.mean() + errors.std()
for row in pruned_anomalies:
max_error = row[2]
score = (max_error - threshold) / denominator
anomalies.append([row[0] + window_start, row[1] + window_start, score])
return anomalies
def _merge_sequences(sequences):
"""Merge consecutive and overlapping sequences.
We iterate over a list of start, end, score triples and merge together
overlapping or consecutive sequences.
The score of a merged sequence is the average of the single scores,
weighted by the length of the corresponding sequences.
Args:
sequences (list):
List of anomalies, containing start-index, end-index, score for each anomaly.
Returns:
ndarray:
Array containing start-index, end-index, score for each anomaly after merging.
"""
if len(sequences) == 0:
return np.array([])
sorted_sequences = sorted(sequences, key=lambda entry: entry[0])
new_sequences = [sorted_sequences[0]]
score = [sorted_sequences[0][2]]
weights = [sorted_sequences[0][1] - sorted_sequences[0][0]]
for sequence in sorted_sequences[1:]:
prev_sequence = new_sequences[-1]
if sequence[0] <= prev_sequence[1] + 1:
score.append(sequence[2])
weights.append(sequence[1] - sequence[0])
weighted_average = np.average(score, weights=weights)
new_sequences[-1] = (prev_sequence[0], max(prev_sequence[1], sequence[1]),
weighted_average)
else:
score = [sequence[2]]
weights = [sequence[1] - sequence[0]]
new_sequences.append(sequence)
return np.array(new_sequences)
def _find_window_sequences(window, z_range, anomaly_padding, min_percent, window_start,
fixed_threshold):
"""Find sequences of values that are anomalous.
We first find the threshold for the window, then find all sequences above that threshold.
After that, we get the max errors of the sequences and prune the anomalies. Lastly, the
score of the anomalies is computed.
Args:
window (ndarray):
Array of errors in the window that is analyzed.
z_range (list):
List of two values denoting the range out of which the start points for the
dynamic find_threshold function are chosen.
anomaly_padding (int):
Number of errors before and after a found anomaly that are added to the anomalous
sequence.
min_percent (float):
Percentage of separation the anomalies need to meet between themselves and the
highest non-anomalous error in the window sequence.
window_start (int):
Index of the first error value in the window.
fixed_threshold (bool):
Indicates whether to use fixed threshold or dynamic threshold.
Returns:
ndarray:
Array containing the start-index, end-index, score for each anomalous sequence
that was found in the window.
"""
if fixed_threshold:
threshold = _fixed_threshold(window)
else:
threshold = _find_threshold(window, z_range)
window_sequences, max_below = _find_sequences(window, threshold, anomaly_padding)
max_errors = _get_max_errors(window, window_sequences, max_below)
pruned_anomalies = _prune_anomalies(max_errors, min_percent)
window_sequences = _compute_scores(pruned_anomalies, window, threshold, window_start)
return window_sequences
def find_anomalies(errors, index, z_range=(0, 10), window_size=None, window_size_portion=None,
window_step_size=None, window_step_size_portion=None, min_percent=0.1,
anomaly_padding=50, lower_threshold=False, fixed_threshold=None, inverse=False):
"""Find sequences of error values that are anomalous.
We first define the window of errors, that we want to analyze. We then find the anomalous
sequences in that window and store the start/stop index pairs that correspond to each
sequence, along with its score. Optionally, we can flip the error sequence around the mean
and apply the same procedure, allowing us to find unusually low error sequences.
We then move the window and repeat the procedure.
Lastly, we combine overlapping or consecutive sequences.
Args:
errors (ndarray):
Array of errors.
index (ndarray):
Array of indices of the errors.
z_range (list):
Optional. List of two values denoting the range out of which the start points for
the scipy.fmin function are chosen. If not given, (0, 10) is used.
window_size (int):
Optional. Size of the window for which a threshold is calculated. If not given,
`None` is used, which finds one threshold for the entire sequence of errors.
window_size_portion (float):
Optional. Specify the size of the window to be a portion of the sequence of errors.
If not given, `None` is used, and window size is used as is.
window_step_size (int):
Optional. Number of steps the window is moved before another threshold is
calculated for the new window.
window_step_size_portion (float):
Optional. Specify the number of steps to be a portion of the window size. If not given,
`None` is used, and window step size is used as is.
min_percent (float):
Optional. Percentage of separation the anomalies need to meet between themselves and
the highest non-anomalous error in the window sequence. It nof given, 0.1 is used.
anomaly_padding (int):
Optional. Number of errors before and after a found anomaly that are added to the
anomalous sequence. If not given, 50 is used.
lower_threshold (bool):
Optional. Indicates whether to apply a lower threshold to find unusually low errors.
If not given, `False` is used.
fixed_threshold (bool):
Optional. Indicates whether to use fixed threshold or dynamic threshold. If not
given, `False` is used.
inverse (bool):
Optional. Indicate whether to take the inverse of errors.
Returns:
ndarray:
Array containing start-index, end-index, score for each anomalous sequence that
was found.
"""
window_size = window_size or len(errors)
if window_size_portion:
window_size = np.ceil(len(errors) * window_size_portion).astype('int')
window_step_size = window_step_size or window_size
if window_step_size_portion:
window_step_size = np.ceil(window_size * window_step_size_portion).astype('int')
if inverse:
errors = max(errors) + (-1 * errors)
window_start = 0
window_end = 0
sequences = list()
while window_end < len(errors):
window_end = window_start + window_size
window = errors[window_start:window_end]
window_sequences = _find_window_sequences(window, z_range, anomaly_padding, min_percent,
window_start, fixed_threshold)
sequences.extend(window_sequences)
if lower_threshold:
# Flip errors sequence around mean
mean = window.mean()
inverted_window = mean - (window - mean)
inverted_window_sequences = _find_window_sequences(inverted_window, z_range,
anomaly_padding, min_percent,
window_start, fixed_threshold)
sequences.extend(inverted_window_sequences)
window_start = window_start + window_step_size
sequences = _merge_sequences(sequences)
anomalies = list()
for start, stop, score in sequences:
anomalies.append([index[int(start)], index[int(stop)], score])
return np.asarray(anomalies)