Skip to content

Commit 9015b2b

Browse files
authored
[AnomalyDetection] Add detectors and some necessary univariate trackers. (#34232)
* Add detectors and some necessary univariate trackers. * Make score_one return float or None. * Raise exception when more than one variable used in univariate detectors. * Fix lints.
1 parent d89ccc9 commit 9015b2b

14 files changed

+907
-12
lines changed

sdks/python/apache_beam/ml/anomaly/base.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,15 @@ def learn_one(self, x: beam.Row) -> None:
165165
raise NotImplementedError
166166

167167
@abc.abstractmethod
168-
def score_one(self, x: beam.Row) -> float:
168+
def score_one(self, x: beam.Row) -> Optional[float]:
169169
"""Scores a single data instance for anomalies.
170170
171171
Args:
172172
x: A `beam.Row` representing the data instance.
173173
174174
Returns:
175-
The outlier score as a float.
175+
The outlier score as a float. None if an exception occurs during scoring,
176+
and NaN if the model is not ready.
176177
"""
177178
raise NotImplementedError
178179

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from apache_beam.ml.anomaly.detectors.zscore import ZScore
19+
from apache_beam.ml.anomaly.detectors.robust_zscore import RobustZScore
20+
from apache_beam.ml.anomaly.detectors.iqr import IQR
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import math
19+
from typing import Optional
20+
21+
import apache_beam as beam
22+
from apache_beam.ml.anomaly.base import AnomalyDetector
23+
from apache_beam.ml.anomaly.specifiable import specifiable
24+
from apache_beam.ml.anomaly.thresholds import FixedThreshold
25+
from apache_beam.ml.anomaly.univariate.base import EPSILON
26+
from apache_beam.ml.anomaly.univariate.quantile import BufferedSlidingQuantileTracker # pylint: disable=line-too-long
27+
from apache_beam.ml.anomaly.univariate.quantile import QuantileTracker
28+
from apache_beam.ml.anomaly.univariate.quantile import SecondaryBufferedQuantileTracker # pylint: disable=line-too-long
29+
30+
DEFAULT_WINDOW_SIZE = 1000
31+
32+
33+
@specifiable
34+
class IQR(AnomalyDetector):
35+
"""Interquartile Range (IQR) anomaly detector.
36+
37+
This class implements an anomaly detection algorithm based on the
38+
Interquartile Range (IQR) [#]_ . It calculates the IQR using quantile trackers
39+
for Q1 (25th percentile) and Q3 (75th percentile) and scores data points based
40+
on their deviation from these quartiles.
41+
42+
The score is calculated as follows:
43+
44+
* If a data point is above Q3, the score is (value - Q3) / IQR.
45+
* If a data point is below Q1, the score is (Q1 - value) / IQR.
46+
* If a data point is within the IQR (Q1 <= value <= Q3), the score is 0.
47+
Initializes the IQR anomaly detector.
48+
49+
Args:
50+
q1_tracker: Optional QuantileTracker for Q1 (25th percentile). If None, a
51+
BufferedSlidingQuantileTracker with a default window size is used.
52+
q3_tracker: Optional QuantileTracker for Q3 (75th percentile). If None, a
53+
SecondaryBufferedQuantileTracker based on q1_tracker is used.
54+
threshold_criterion: Optional ThresholdFn to apply on the score. Defaults
55+
to `FixedThreshold(1.5)` since outliers are commonly defined as data
56+
points that fall below Q1 - 1.5 IQR or above Q3 + 1.5 IQR.
57+
**kwargs: Additional keyword arguments.
58+
59+
.. [#] https://en.wikipedia.org/wiki/Interquartile_range
60+
"""
61+
def __init__(
62+
self,
63+
q1_tracker: Optional[QuantileTracker] = None,
64+
q3_tracker: Optional[QuantileTracker] = None,
65+
**kwargs):
66+
if "threshold_criterion" not in kwargs:
67+
kwargs["threshold_criterion"] = FixedThreshold(1.5)
68+
super().__init__(**kwargs)
69+
70+
self._q1_tracker = q1_tracker or \
71+
BufferedSlidingQuantileTracker(DEFAULT_WINDOW_SIZE, 0.25)
72+
assert self._q1_tracker._q == 0.25, \
73+
"q1_tracker must be initialized with q = 0.25"
74+
75+
self._q3_tracker = q3_tracker or \
76+
SecondaryBufferedQuantileTracker(self._q1_tracker, 0.75)
77+
assert self._q3_tracker._q == 0.75, \
78+
"q3_tracker must be initialized with q = 0.75"
79+
80+
def learn_one(self, x: beam.Row) -> None:
81+
"""Updates the quantile trackers with a new data point.
82+
83+
Args:
84+
x: A `beam.Row` containing a single numerical value.
85+
"""
86+
if len(x.__dict__) != 1:
87+
raise ValueError(
88+
"IQR.learn_one expected univariate input, but got %s", str(x))
89+
90+
v = next(iter(x))
91+
self._q1_tracker.push(v)
92+
self._q3_tracker.push(v)
93+
94+
def score_one(self, x: beam.Row) -> Optional[float]:
95+
"""Scores a data point based on its deviation from the IQR.
96+
97+
Args:
98+
x: A `beam.Row` containing a single numerical value.
99+
100+
Returns:
101+
float | None: The anomaly score.
102+
"""
103+
if len(x.__dict__) != 1:
104+
raise ValueError(
105+
"IQR.score_one expected univariate input, but got %s", str(x))
106+
107+
v = next(iter(x))
108+
if v is None or math.isnan(v):
109+
return None
110+
111+
q1 = self._q1_tracker.get()
112+
q3 = self._q3_tracker.get()
113+
114+
# not enough data points to compute median or median absolute deviation
115+
if math.isnan(q1) or math.isnan(q3):
116+
return float('NaN')
117+
118+
iqr = q3 - q1
119+
if abs(iqr) < EPSILON:
120+
return 0.0
121+
122+
if v > q3:
123+
return (v - q3) / iqr
124+
125+
if v < q1:
126+
return (q1 - v) / iqr
127+
128+
# q1 <= v <= q3, normal points
129+
return 0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import math
20+
import unittest
21+
22+
import apache_beam as beam
23+
from apache_beam.ml.anomaly.detectors.iqr import IQR
24+
25+
26+
class IQRTest(unittest.TestCase):
27+
input = [
28+
beam.Row(x=1),
29+
beam.Row(x=1),
30+
beam.Row(x=5),
31+
beam.Row(x=9),
32+
beam.Row(x=20),
33+
beam.Row(x=10),
34+
beam.Row(x=1)
35+
]
36+
37+
def test_with_default_trackers(self):
38+
iqr = IQR()
39+
40+
scores = []
41+
for row in IQRTest.input:
42+
scores.append(iqr.score_one(row))
43+
iqr.learn_one(row)
44+
45+
self.assertTrue(math.isnan(scores[0]))
46+
self.assertEqual(
47+
scores[1:], [0.0, 0.0, 3.0, 2.8, 0.125, 0.12903225806451613])
48+
49+
50+
if __name__ == '__main__':
51+
logging.getLogger().setLevel(logging.INFO)
52+
unittest.main()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import math
19+
from typing import Optional
20+
21+
import apache_beam as beam
22+
from apache_beam.ml.anomaly.base import AnomalyDetector
23+
from apache_beam.ml.anomaly.specifiable import specifiable
24+
from apache_beam.ml.anomaly.thresholds import FixedThreshold
25+
from apache_beam.ml.anomaly.univariate.base import EPSILON
26+
from apache_beam.ml.anomaly.univariate.mad import MadTracker
27+
28+
29+
# pylint: disable=line-too-long
30+
@specifiable
31+
class RobustZScore(AnomalyDetector):
32+
"""Robust Z-Score anomaly detector.
33+
34+
This class implements an detection algorithm based on Robust Z-Score (also
35+
known as Modified Z-Score), which is a robust alternative to the traditional
36+
Z-score [#]_. It uses the median and Median Absolute Deviation (MAD) to
37+
compute a score that is less sensitive to outliers.
38+
39+
The score is calculated as: `|0.6745 * (value - median) / MAD|`
40+
41+
Important:
42+
In the streaming setting, we use the online version of median and MAD in the
43+
calculation. Therefore, the score computed here does not exactly match its
44+
batch counterpart.
45+
46+
This implementation is adapted from the implementation within PySAD [#]_:
47+
https://github.com/selimfirat/pysad/blob/master/pysad/models/median_absolute_deviation.py
48+
49+
The batch version can be seen at PyOD [#]_:
50+
https://github.com/yzhao062/pyod/blob/master/pyod/models/mad.py
51+
52+
53+
Args:
54+
mad_tracker: Optional `MadTracker` instance. If None, a default `MadTracker`
55+
is created.
56+
threshold_criterion: threshold_criterion: Optional `ThresholdFn` to apply on
57+
the score. Defaults to `FixedThreshold(3)` due to the commonly used
58+
3-sigma rule.
59+
**kwargs: Additional keyword arguments.
60+
61+
.. [#] Hoaglin, David C.. (2013). Volume 16: How to Detect and Handle Outliers.
62+
.. [#] Yilmaz, Selim & Kozat, Suleyman. (2020). PySAD: A Streaming Anomaly Detection Framework in Python. 10.48550/arXiv.2009.02572.
63+
.. [#] Zhao, Y., Nasrullah, Z. and Li, Z.. (2019). PyOD: A Python Toolbox for Scalable Outlier Detection. Journal of machine learning research (JMLR), 20(96), pp.1-7.
64+
"""
65+
# pylint: enable=line-too-long
66+
SCALE_FACTOR = 0.6745
67+
68+
def __init__(self, mad_tracker: Optional[MadTracker] = None, **kwargs):
69+
if "threshold_criterion" not in kwargs:
70+
kwargs["threshold_criterion"] = FixedThreshold(3)
71+
super().__init__(**kwargs)
72+
self._mad_tracker = mad_tracker or MadTracker()
73+
74+
def learn_one(self, x: beam.Row) -> None:
75+
"""Updates the `MadTracker` with a new data point.
76+
77+
Args:
78+
x: A `beam.Row` containing a single numerical value.
79+
"""
80+
if len(x.__dict__) != 1:
81+
raise ValueError(
82+
"RobustZScore.learn_one expected univariate input, but got %s",
83+
str(x))
84+
85+
v = next(iter(x))
86+
self._mad_tracker.push(v)
87+
88+
def score_one(self, x: beam.Row) -> Optional[float]:
89+
"""Scores a data point using the Robust Z-Score.
90+
91+
Args:
92+
x: A `beam.Row` containing a single numerical value.
93+
94+
Returns:
95+
float | None: The Robust Z-Score.
96+
"""
97+
if len(x.__dict__) != 1:
98+
raise ValueError(
99+
"RobustZScore.score_one expected univariate input, but got %s",
100+
str(x))
101+
102+
v = next(iter(x))
103+
if v is None or math.isnan(v):
104+
return None
105+
106+
median = self._mad_tracker.get_median()
107+
mad = self._mad_tracker.get()
108+
109+
# not enough data points to compute median or median absolute deviation
110+
if math.isnan(mad) or math.isnan(median):
111+
return float('NaN')
112+
113+
if abs(mad) < EPSILON:
114+
return 0.0
115+
116+
return abs(RobustZScore.SCALE_FACTOR * (v - median) / mad)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
import math
20+
import unittest
21+
22+
import apache_beam as beam
23+
from apache_beam.ml.anomaly.detectors.robust_zscore import RobustZScore
24+
25+
26+
class RobustZScoreTest(unittest.TestCase):
27+
input = [
28+
beam.Row(x=1),
29+
beam.Row(x=1),
30+
beam.Row(x=5),
31+
beam.Row(x=7),
32+
beam.Row(x=20),
33+
beam.Row(x=6),
34+
beam.Row(x=1)
35+
]
36+
37+
def test_with_default_trackers(self):
38+
zscore = RobustZScore()
39+
40+
scores = []
41+
for row in RobustZScoreTest.input:
42+
scores.append(zscore.score_one(row))
43+
zscore.learn_one(row)
44+
45+
self.assertTrue(math.isnan(scores[0]))
46+
self.assertEqual(scores[1:], [0.0, 0.0, 0.0, 5.73325, 0.168625, 1.349])
47+
48+
49+
if __name__ == '__main__':
50+
logging.getLogger().setLevel(logging.INFO)
51+
unittest.main()

0 commit comments

Comments
 (0)