-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathlime_timeseries.py
187 lines (167 loc) · 8.17 KB
/
lime_timeseries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
import numpy as np
import sklearn
from fastdtw import fastdtw
from lime import explanation
from lime import lime_base
from dianna import utils
from dianna.utils.maskers import generate_time_series_masks
from dianna.utils.maskers import mask_data
from dianna.utils.predict import make_predictions
class LIMETimeseries:
"""LIME implementation for timeseries.
This implementation is inspired by the paper:
Validation of XAI explanations for multivariate time series classification in
the maritime domain. (https://doi.org/10.1016/j.jocs.2021.101539)
"""
def __init__(
self,
kernel_width=25,
verbose=False,
preprocess_function=None,
feature_selection='auto',
):
"""Initializes Lime explainer for timeseries.
Args:
kernel_width (int): Width of the kernel used in LIME explainer.
verbose (bool): Whether to print progress messages during explanation.
feature_selection (str): Feature selection method to be used by explainer.
preprocess_function (callable, optional): Function to preprocess the time series data before passing it
to the explainer. Defaults to None.
"""
def kernel(d):
"""Kernel function used in LIME explainer."""
return np.sqrt(np.exp(-(d**2) / kernel_width**2))
self.explainer = lime_base.LimeBase(kernel, verbose)
self.feature_selection = feature_selection
self.domain_mapper = explanation.DomainMapper()
self.preprocess_function = preprocess_function
self._is_multivariate = False
def explain(
self,
model_or_function,
input_timeseries,
labels=(0, ),
class_names=None,
num_features=1,
num_samples=1,
num_slices=1,
batch_size=1,
mask_type='mean',
distance_method='cosine',
): # pylint: disable=too-many-arguments,too-many-locals
"""Run the LIME explainer for timeseries.
Args:
model_or_function (callable or str): The function that runs the model to be explained _or_
the path to a ONNX model on disk.
input_timeseries (np.ndarray): The input time series data to be explained, with shape
[batch_size, sequence_length, num_features].
labels (list): The list of labels for different classes.
class_names (list): The list of class names.
num_features (int): The number of features to include in the explanation.
num_samples (int): The number of samples to generate for the LIME explainer.
num_slices (int): The number of slices to divide the time series data into.
batch_size (int): The batch size to use for running the model.
mask_type (str): The type of mask to apply to the time series data. Can be "mean" or "noise".
distance_method (str): The distance metric to use for LIME. Can be "cosine" or "euclidean".
Returns:
np.ndarray: An array (np.ndarray) containing the LIME explanations for each class.
"""
# TODO: p_keep does not exist in LIME. LIME will mask every point, which means the number
# of steps masked is 1. We should updating it after adapting maskers function to LIME.
# wrap up the input model or function using the runner
runner = utils.get_function(
model_or_function, preprocess_function=self.preprocess_function)
masks = generate_time_series_masks(input_timeseries.shape,
num_samples,
p_keep=0.1)
# NOTE: Required by `lime_base` explainer since the first instance must be the original data
# For more details, check this link
# https://github.com/marcotcr/lime/blob/fd7eb2e6f760619c29fca0187c07b82157601b32/lime/lime_base.py#L148
masks[0, :, :] = 1.0
masked = mask_data(input_timeseries, masks, mask_type=mask_type)
# generate predictions using the masked data.
predictions = make_predictions(masked, runner, batch_size)
# need to reshape for the calculation of distance
_, sequence, n_var = masked.shape
masked = masked.reshape((-1, sequence * n_var))
distance = self._calculate_distance(masked,
distance_method=distance_method)
exp = explanation.Explanation(domain_mapper=self.domain_mapper,
class_names=class_names)
# Expected shape of input:
# masked[num_samples, channels * num_slices],
# predictions[num_samples, labels],
# distances[num_samples]
for label in labels:
(
exp.intercept[int(label)],
exp.local_exp[int(label)],
exp.score,
exp.local_pred,
) = self.explainer.explain_instance_with_data(
masked,
predictions,
distance,
label=label,
num_features=num_features,
model_regressor=None,
)
# extract scores from lime explainer
saliency = []
for i, label in enumerate(labels):
local_exp = sorted(exp.local_exp[label])
# shape of local_exp [(index, saliency)]
selected_saliency = [i[1] for i in local_exp]
saliency.append(selected_saliency[:])
return np.concatenate(saliency).reshape(-1, sequence, n_var)
def _calculate_distance(self, masked_data, distance_method='cosine'):
"""Calcuate distance between perturbed data and the original samples.
Args:
masked_data (np.ndarray): The perturbed time series data.
*Note: The first instance is the original timeseries
distance_method (str): The distance metric to use. Defaults to "cosine".
Supported options are:
- 'cosine': Computes the cosine similarity between the two vectors.
- 'euclidean': Computes the Euclidean distance between the two vectors.
- 'dtw': Uses Dynamic Time Warping to calculate the distance between
the two time series.
Returns:
np.ndarray: A vector containing the distance between two timeseries.
Raises:
ValueError: If the given `distance_method` is not supported.
Notes:
- The cosine similarity is a measure of the similarity between two non-zero vectors
of an inner product space that measures the cosine of the angle between them.
- The Euclidean distance is the straight-line distance between two points in
Euclidean space.
- Dynamic Time Warping is an algorithm for measuring similarity between two time
series sequences that may vary in speed or timing.
"""
support_methods = ['cosine', 'euclidean']
if distance_method == 'dtw':
distance = self._dtw_distance(masked_data)
elif distance_method in support_methods:
distance = (sklearn.metrics.pairwise.pairwise_distances(
masked_data,
masked_data[0].reshape([1, -1]),
metric=distance_method).ravel())
if distance_method == 'cosine':
distance *= 100 # make sure it has same scale as other methods
else:
raise ValueError(
f'Given method {distance_method} is not supported. Please '
"choose from 'dtw', 'cosine' and 'euclidean'.")
return distance
def _dtw_distance(self, masked_data):
"""Calculate distance based on dynamic time warping.
Args:
masked_data (np.ndarray): An array of time series with some segments masked out.
*Note: The first instance is the original timeseries
Returns:
np.ndarray: DTW distances.
"""
distance = np.asarray([
fastdtw(masked_data[0], one_masked_data)[0]
for one_masked_data in masked_data
])
return distance