6
6
*/
7
7
8
8
import { mapValues , first , last , isNaN } from 'lodash' ;
9
+ import moment from 'moment' ;
9
10
import { ElasticsearchClient } from 'kibana/server' ;
10
11
import {
11
12
isTooManyBucketsPreviewException ,
12
13
TOO_MANY_BUCKETS_PREVIEW_EXCEPTION ,
13
14
} from '../../../../../common/alerting/metrics' ;
15
+ import { getIntervalInSeconds } from '../../../../utils/get_interval_in_seconds' ;
16
+ import { roundTimestamp } from '../../../../utils/round_timestamp' ;
14
17
import { InfraSource } from '../../../../../common/source_configuration/source_configuration' ;
15
18
import { InfraDatabaseSearchResponse } from '../../../adapters/framework/adapter_types' ;
16
19
import { createAfterKeyHandler } from '../../../../utils/create_afterkey_handler' ;
@@ -26,6 +29,7 @@ interface Aggregation {
26
29
aggregatedValue : { value : number ; values ?: Array < { key : number ; value : number } > } ;
27
30
doc_count : number ;
28
31
to_as_string : string ;
32
+ from_as_string : string ;
29
33
key_as_string : string ;
30
34
} > ;
31
35
} ;
@@ -109,14 +113,34 @@ const getMetric: (
109
113
filterQuery ,
110
114
timeframe
111
115
) {
112
- const { aggType } = params ;
116
+ const { aggType, timeSize , timeUnit } = params ;
113
117
const hasGroupBy = groupBy && groupBy . length ;
118
+
119
+ const interval = `${ timeSize } ${ timeUnit } ` ;
120
+ const intervalAsSeconds = getIntervalInSeconds ( interval ) ;
121
+ const intervalAsMS = intervalAsSeconds * 1000 ;
122
+
123
+ const to = moment ( timeframe ? timeframe . end : Date . now ( ) )
124
+ . add ( 1 , timeUnit )
125
+ . startOf ( timeUnit )
126
+ . valueOf ( ) ;
127
+
128
+ // Rate aggregations need 5 buckets worth of data
129
+ const minimumBuckets = aggType === Aggregators . RATE ? 5 : 1 ;
130
+
131
+ const minimumFrom = to - intervalAsMS * minimumBuckets ;
132
+
133
+ const from = roundTimestamp (
134
+ timeframe && timeframe . start <= minimumFrom ? timeframe . start : minimumFrom ,
135
+ timeUnit
136
+ ) ;
137
+
114
138
const searchBody = getElasticsearchMetricQuery (
115
139
params ,
116
140
timefield ,
141
+ { start : from , end : to } ,
117
142
hasGroupBy ? groupBy : undefined ,
118
- filterQuery ,
119
- timeframe
143
+ filterQuery
120
144
) ;
121
145
122
146
try {
@@ -140,7 +164,11 @@ const getMetric: (
140
164
...result ,
141
165
[ Object . values ( bucket . key )
142
166
. map ( ( value ) => value )
143
- . join ( ', ' ) ] : getValuesFromAggregations ( bucket , aggType ) ,
167
+ . join ( ', ' ) ] : getValuesFromAggregations ( bucket , aggType , {
168
+ from,
169
+ to,
170
+ bucketSizeInMillis : intervalAsMS ,
171
+ } ) ,
144
172
} ) ,
145
173
{ }
146
174
) ;
@@ -153,7 +181,8 @@ const getMetric: (
153
181
return {
154
182
[ UNGROUPED_FACTORY_KEY ] : getValuesFromAggregations (
155
183
( result . aggregations ! as unknown ) as Aggregation ,
156
- aggType
184
+ aggType ,
185
+ { from, to, bucketSizeInMillis : intervalAsMS }
157
186
) ,
158
187
} ;
159
188
} catch ( e ) {
@@ -173,17 +202,35 @@ const getMetric: (
173
202
}
174
203
} ;
175
204
205
+ interface DropPartialBucketOptions {
206
+ from : number ;
207
+ to : number ;
208
+ bucketSizeInMillis : number ;
209
+ }
210
+
211
+ const dropPartialBuckets = ( { from, to, bucketSizeInMillis } : DropPartialBucketOptions ) => (
212
+ row : {
213
+ key : string ;
214
+ value : number ;
215
+ } | null
216
+ ) => {
217
+ if ( row == null ) return null ;
218
+ const timestamp = new Date ( row . key ) . valueOf ( ) ;
219
+ return timestamp >= from && timestamp + bucketSizeInMillis <= to ;
220
+ } ;
221
+
176
222
const getValuesFromAggregations = (
177
223
aggregations : Aggregation ,
178
- aggType : MetricExpressionParams [ 'aggType' ]
224
+ aggType : MetricExpressionParams [ 'aggType' ] ,
225
+ dropPartialBucketsOptions : DropPartialBucketOptions
179
226
) => {
180
227
try {
181
228
const { buckets } = aggregations . aggregatedIntervals ;
182
229
if ( ! buckets . length ) return null ; // No Data state
183
230
184
231
if ( aggType === Aggregators . COUNT ) {
185
232
return buckets . map ( ( bucket ) => ( {
186
- key : bucket . to_as_string ,
233
+ key : bucket . from_as_string ,
187
234
value : bucket . doc_count ,
188
235
} ) ) ;
189
236
}
@@ -192,11 +239,28 @@ const getValuesFromAggregations = (
192
239
const values = bucket . aggregatedValue ?. values || [ ] ;
193
240
const firstValue = first ( values ) ;
194
241
if ( ! firstValue ) return null ;
195
- return { key : bucket . to_as_string , value : firstValue . value } ;
242
+ return { key : bucket . from_as_string , value : firstValue . value } ;
196
243
} ) ;
197
244
}
245
+
246
+ if ( aggType === Aggregators . AVERAGE ) {
247
+ return buckets . map ( ( bucket ) => ( {
248
+ key : bucket . key_as_string ?? bucket . from_as_string ,
249
+ value : bucket . aggregatedValue ?. value ?? null ,
250
+ } ) ) ;
251
+ }
252
+
253
+ if ( aggType === Aggregators . RATE ) {
254
+ return buckets
255
+ . map ( ( bucket ) => ( {
256
+ key : bucket . key_as_string ?? bucket . from_as_string ,
257
+ value : bucket . aggregatedValue ?. value ?? null ,
258
+ } ) )
259
+ . filter ( dropPartialBuckets ( dropPartialBucketsOptions ) ) ;
260
+ }
261
+
198
262
return buckets . map ( ( bucket ) => ( {
199
- key : bucket . key_as_string ?? bucket . to_as_string ,
263
+ key : bucket . key_as_string ?? bucket . from_as_string ,
200
264
value : bucket . aggregatedValue ?. value ?? null ,
201
265
} ) ) ;
202
266
} catch ( e ) {
0 commit comments