-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] APM Correlations: Fix usage in load balancing/HA setups. #115145
Changes from 1 commit
e1fa9c4
46f59d6
0e37684
7d75d5f
37aa29b
414a992
83f5ae0
8d92d94
9de6ff6
30d3883
91e5f13
c1cf1e5
72eb07d
ba86ce1
3368aba
430aa12
aa80643
4f080bf
8868bb9
5a3b927
9ccd53b
d15d18e
8bb304b
10952cc
da37fd3
cbeb5ef
a0992ef
a6e75d7
deddd79
45da6c5
71270c1
312e9b2
0cdb94f
f0f0d44
4497e74
81139a8
87c4222
f47e654
b65bab6
c0b6b3e
2b755d3
5310686
0e5de24
55472ef
72bcc11
3e411cd
97890cf
24d1d2b
86b78c1
de63d24
22f785c
9872afc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import { useCallback, useEffect, useReducer, useRef } from 'react'; | ||
import { chunk } from 'lodash'; | ||
|
||
import { IHttpFetchError } from 'src/core/public'; | ||
|
||
import { EVENT_OUTCOME } from '../../../../common/elasticsearch_fieldnames'; | ||
import { EventOutcome } from '../../../../common/event_outcome'; | ||
import { DEFAULT_PERCENTILE_THRESHOLD } from '../../../../common/search_strategies/constants'; | ||
import type { RawResponseBase } from '../../../../common/search_strategies/types'; | ||
import type { | ||
FailedTransactionsCorrelation, | ||
FailedTransactionsCorrelationsRawResponse, | ||
} from '../../../../common/search_strategies/failed_transactions_correlations/types'; | ||
|
||
import { useApmServiceContext } from '../../../context/apm_service/use_apm_service_context'; | ||
import { useUrlParams } from '../../../context/url_params_context/use_url_params'; | ||
|
||
import { useApmParams } from '../../../hooks/use_apm_params'; | ||
import { useTimeRange } from '../../../hooks/use_time_range'; | ||
import { callApmApi } from '../../../services/rest/createCallApmApi'; | ||
|
||
type Response = FailedTransactionsCorrelationsRawResponse & RawResponseBase; | ||
|
||
interface SearchStrategyProgress { | ||
error?: Error | IHttpFetchError; | ||
isRunning: boolean; | ||
loaded: number; | ||
total: number; | ||
} | ||
|
||
const getInitialRawResponse = (): Response => | ||
({ | ||
ccsWarning: false, | ||
took: 0, | ||
} as Response); | ||
|
||
const getInitialProgress = (): SearchStrategyProgress => ({ | ||
isRunning: false, | ||
loaded: 0, | ||
total: 100, | ||
}); | ||
|
||
const getReducer = | ||
<T>() => | ||
(prev: T, update: Partial<T>): T => ({ | ||
...prev, | ||
...update, | ||
}); | ||
|
||
export function useFailedTransactionsCorrelations() { | ||
const { serviceName, transactionType } = useApmServiceContext(); | ||
|
||
const { urlParams } = useUrlParams(); | ||
const { transactionName } = urlParams; | ||
|
||
const { | ||
query: { kuery, environment, rangeFrom, rangeTo }, | ||
} = useApmParams('/services/{serviceName}/transactions/view'); | ||
|
||
const { start, end } = useTimeRange({ rangeFrom, rangeTo }); | ||
|
||
const [rawResponse, setRawResponse] = useReducer( | ||
getReducer<Response>(), | ||
getInitialRawResponse() | ||
); | ||
|
||
const [fetchState, setFetchState] = useReducer( | ||
getReducer<SearchStrategyProgress>(), | ||
getInitialProgress() | ||
); | ||
|
||
const isCancelledRef = useRef(false); | ||
|
||
const startFetch = useCallback(async () => { | ||
isCancelledRef.current = false; | ||
|
||
setFetchState({ | ||
...getInitialProgress(), | ||
isRunning: true, | ||
error: undefined, | ||
}); | ||
|
||
const query = { | ||
serviceName, | ||
transactionName, | ||
transactionType, | ||
kuery, | ||
environment, | ||
start, | ||
end, | ||
}; | ||
|
||
try { | ||
const rawResponseUpdate = (await callApmApi({ | ||
endpoint: 'POST /internal/apm/latency/overall_distribution', | ||
signal: null, | ||
params: { | ||
body: { | ||
...query, | ||
percentileThreshold: DEFAULT_PERCENTILE_THRESHOLD, | ||
}, | ||
}, | ||
})) as Response; | ||
|
||
const { overallHistogram: errorHistogram } = (await callApmApi({ | ||
endpoint: 'POST /internal/apm/latency/overall_distribution', | ||
signal: null, | ||
params: { | ||
body: { | ||
...query, | ||
percentileThreshold: DEFAULT_PERCENTILE_THRESHOLD, | ||
termFilters: [ | ||
{ fieldName: EVENT_OUTCOME, fieldValue: EventOutcome.failure }, | ||
], | ||
}, | ||
}, | ||
})) as Response; | ||
|
||
if (isCancelledRef.current) { | ||
return; | ||
} | ||
|
||
setRawResponse({ | ||
...rawResponseUpdate, | ||
errorHistogram, | ||
}); | ||
setFetchState({ | ||
loaded: 5, | ||
}); | ||
|
||
const { fieldCandidates: candidates } = await callApmApi({ | ||
endpoint: 'GET /internal/apm/correlations/field_candidates', | ||
signal: null, | ||
params: { | ||
query, | ||
}, | ||
}); | ||
|
||
if (isCancelledRef.current) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this necessary? wouldn't the same result be achieved by reinitializing the abort controller? and what happens if you pass an aborted signal to a fetch call? will it execute or be immediately aborted? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, you can use controller.signal.aborted: https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal. But you can also simply pass in the signal, and the request will immediately be aborted, which will then automatically be handled by your catch clause. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great, that let me get rid of |
||
return; | ||
} | ||
|
||
const fieldCandidates = candidates.filter((t) => !(t === EVENT_OUTCOME)); | ||
|
||
setFetchState({ | ||
loaded: 10, | ||
}); | ||
|
||
const failedTransactionsCorrelations: FailedTransactionsCorrelation[] = | ||
[]; | ||
const fieldsToSample = new Set<string>(); | ||
const chunkSize = 10; | ||
let loadCounter = 0; | ||
|
||
const fieldCandidatesChunks = chunk(fieldCandidates, chunkSize); | ||
|
||
for (const fieldCandidatesChunk of fieldCandidatesChunks) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This splits all field candidates into chunks, the chunks are called in sequence here on the client side, but all field candidates of a chunk are then queried in parallel on the Kibana server side. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but why call these in sequence? how many blocking calls can we expect here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was made in the spirit of "make it slow". I'm sure this can be further optimized, in this PR we started to parallelize the server side calls and play it safe on the client side. Since the field candidates and field value pairs are generated dynamically, we don't want to allow to run an unlimited amount of queries in parallel. Field candidates are usually in the dozens, field value pairs can be in the hundreds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see how running dozens of requests sequentially is a better experience. We can use pLimit here to limit the number of concurrent requests. Something like 5 sounds like a good start. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 for using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes that's fine 👍 |
||
const pValues = await callApmApi({ | ||
endpoint: 'POST /internal/apm/correlations/p_values', | ||
signal: null, | ||
params: { | ||
body: { ...query, fieldCandidates: fieldCandidatesChunk }, | ||
}, | ||
}); | ||
|
||
if (pValues.failedTransactionsCorrelations.length > 0) { | ||
pValues.failedTransactionsCorrelations.forEach((d) => { | ||
fieldsToSample.add(d.fieldName); | ||
}); | ||
failedTransactionsCorrelations.push( | ||
...pValues.failedTransactionsCorrelations | ||
); | ||
rawResponseUpdate.failedTransactionsCorrelations = | ||
failedTransactionsCorrelations; | ||
setRawResponse(rawResponseUpdate); | ||
} | ||
|
||
if (isCancelledRef.current) { | ||
return; | ||
} | ||
|
||
loadCounter += chunkSize; | ||
setFetchState({ | ||
loaded: 20 + Math.round((loadCounter / fieldCandidates.length) * 80), | ||
}); | ||
} | ||
|
||
const fieldStats = await callApmApi({ | ||
endpoint: 'POST /internal/apm/correlations/field_stats', | ||
signal: null, | ||
params: { | ||
body: { | ||
...query, | ||
fieldsToSample: [...fieldsToSample], | ||
}, | ||
}, | ||
}); | ||
|
||
rawResponseUpdate.fieldStats = fieldStats.stats; | ||
setRawResponse(rawResponseUpdate); | ||
|
||
setFetchState({ | ||
loaded: 100, | ||
}); | ||
} catch (e) { | ||
// const err = e as Error | IHttpFetchError; | ||
// const message = error.body?.message ?? error.response?.statusText; | ||
setFetchState({ | ||
error: e as Error, | ||
}); | ||
} | ||
|
||
setFetchState({ | ||
isRunning: false, | ||
}); | ||
}, [ | ||
environment, | ||
serviceName, | ||
transactionName, | ||
transactionType, | ||
kuery, | ||
start, | ||
end, | ||
]); | ||
|
||
const cancelFetch = useCallback(() => { | ||
isCancelledRef.current = true; | ||
setFetchState({ | ||
isRunning: false, | ||
}); | ||
}, []); | ||
|
||
// auto-update | ||
useEffect(() => { | ||
startFetch(); | ||
return cancelFetch; | ||
}, [startFetch, cancelFetch]); | ||
|
||
return { | ||
progress: fetchState, | ||
response: rawResponse, | ||
startFetch, | ||
cancelFetch, | ||
}; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,8 @@ | |
* 2.0. | ||
*/ | ||
|
||
import { useCallback, useEffect, useReducer } from 'react'; | ||
import { useCallback, useEffect, useReducer, useRef } from 'react'; | ||
import { chunk } from 'lodash'; | ||
|
||
import { IHttpFetchError } from 'src/core/public'; | ||
|
||
|
@@ -73,8 +74,11 @@ export function useLatencyCorrelations() { | |
getInitialProgress() | ||
); | ||
|
||
const isCancelledRef = useRef(false); | ||
|
||
const startFetch = useCallback(async () => { | ||
// TODO re-implemented cancelling | ||
isCancelledRef.current = false; | ||
|
||
setFetchState({ | ||
...getInitialProgress(), | ||
isRunning: true, | ||
|
@@ -93,16 +97,20 @@ export function useLatencyCorrelations() { | |
|
||
try { | ||
const rawResponseUpdate = (await callApmApi({ | ||
endpoint: 'GET /internal/apm/latency/overall_distribution', | ||
endpoint: 'POST /internal/apm/latency/overall_distribution', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had problems using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, makes sense. |
||
signal: null, | ||
params: { | ||
query: { | ||
body: { | ||
...query, | ||
percentileThreshold: DEFAULT_PERCENTILE_THRESHOLD + '', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be converted to string? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in a0992ef, this was an oversight and the API already allows to send over a number already now. |
||
}, | ||
}, | ||
})) as Response; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the explicit type annotation? The returned type should already be correct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It used the casting because |
||
|
||
if (isCancelledRef.current) { | ||
return; | ||
} | ||
|
||
setRawResponse(rawResponseUpdate); | ||
setFetchState({ | ||
loaded: 5, | ||
|
@@ -116,6 +124,10 @@ export function useLatencyCorrelations() { | |
}, | ||
}); | ||
|
||
if (isCancelledRef.current) { | ||
return; | ||
} | ||
|
||
setFetchState({ | ||
loaded: 10, | ||
}); | ||
|
@@ -130,36 +142,47 @@ export function useLatencyCorrelations() { | |
}, | ||
}, | ||
}); | ||
|
||
if (isCancelledRef.current) { | ||
return; | ||
} | ||
|
||
setFetchState({ | ||
loaded: 20, | ||
}); | ||
|
||
const fieldsToSample = new Set<string>(); | ||
const latencyCorrelations: LatencyCorrelation[] = []; | ||
const chunkSize = 10; | ||
let loadCounter = 0; | ||
for (const { fieldName, fieldValue } of fieldValuePairs) { | ||
|
||
const fieldValuePairChunks = chunk(fieldValuePairs, chunkSize); | ||
|
||
for (const fieldValuePairChunk of fieldValuePairChunks) { | ||
const significantCorrelations = await callApmApi({ | ||
endpoint: 'GET /internal/apm/correlations/significant_correlations', | ||
endpoint: 'POST /internal/apm/correlations/significant_correlations', | ||
signal: null, | ||
params: { | ||
query: { | ||
...query, | ||
fieldName, | ||
fieldValue: fieldValue + '', | ||
}, | ||
body: { ...query, fieldValuePairs: fieldValuePairChunk }, | ||
}, | ||
}); | ||
|
||
if (significantCorrelations.latencyCorrelations.length > 0) { | ||
fieldsToSample.add(fieldName); | ||
significantCorrelations.latencyCorrelations.forEach((d) => { | ||
fieldsToSample.add(d.fieldName); | ||
}); | ||
latencyCorrelations.push( | ||
...significantCorrelations.latencyCorrelations | ||
); | ||
rawResponseUpdate.latencyCorrelations = latencyCorrelations; | ||
setRawResponse(rawResponseUpdate); | ||
} | ||
|
||
loadCounter++; | ||
if (isCancelledRef.current) { | ||
return; | ||
} | ||
|
||
loadCounter += chunkSize; | ||
setFetchState({ | ||
loaded: 20 + Math.round((loadCounter / fieldValuePairs.length) * 80), | ||
}); | ||
|
@@ -188,7 +211,6 @@ export function useLatencyCorrelations() { | |
setFetchState({ | ||
error: e as Error, | ||
}); | ||
return; | ||
} | ||
|
||
setFetchState({ | ||
|
@@ -205,6 +227,7 @@ export function useLatencyCorrelations() { | |
]); | ||
|
||
const cancelFetch = useCallback(() => { | ||
isCancelledRef.current = true; | ||
setFetchState({ | ||
isRunning: false, | ||
}); | ||
|
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import type { ElasticsearchClient } from 'src/core/server'; | ||
|
||
import type { SearchStrategyParams } from '../../../../common/search_strategies/types'; | ||
import type { FailedTransactionsCorrelation } from '../../../../common/search_strategies/failed_transactions_correlations/types'; | ||
|
||
import { ERROR_CORRELATION_THRESHOLD } from '../constants'; | ||
|
||
import { | ||
fetchFailedTransactionsCorrelationPValues, | ||
fetchTransactionDurationHistogramRangeSteps, | ||
} from './index'; | ||
|
||
export const fetchPValues = async ( | ||
esClient: ElasticsearchClient, | ||
paramsWithIndex: SearchStrategyParams, | ||
fieldCandidates: string[] | ||
) => { | ||
const failedTransactionsCorrelations: FailedTransactionsCorrelation[] = []; | ||
|
||
const histogramRangeSteps = await fetchTransactionDurationHistogramRangeSteps( | ||
esClient, | ||
paramsWithIndex | ||
); | ||
|
||
const results = await Promise.allSettled( | ||
fieldCandidates.map((fieldName) => | ||
fetchFailedTransactionsCorrelationPValues( | ||
esClient, | ||
paramsWithIndex, | ||
histogramRangeSteps, | ||
fieldName | ||
) | ||
) | ||
); | ||
|
||
results.forEach((result, idx) => { | ||
if (result.status === 'fulfilled') { | ||
failedTransactionsCorrelations.push( | ||
...result.value.filter( | ||
(record) => | ||
record && | ||
typeof record.pValue === 'number' && | ||
record.pValue < ERROR_CORRELATION_THRESHOLD | ||
) | ||
); | ||
} else { | ||
// If one of the fields in the batch had an error | ||
// addLogMessage( | ||
// `Error getting error correlation for field ${fieldCandidates[idx]}: ${result.reason}.` | ||
// ); | ||
} | ||
}); | ||
|
||
// TODO Fix CCS warning | ||
return { failedTransactionsCorrelations, ccsWarning: false }; | ||
}; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0; you may not use this file except in compliance with the Elastic License | ||
* 2.0. | ||
*/ | ||
|
||
import { range } from 'lodash'; | ||
|
||
import type { ElasticsearchClient } from 'src/core/server'; | ||
|
||
import type { | ||
FieldValuePair, | ||
SearchStrategyParams, | ||
} from '../../../../common/search_strategies/types'; | ||
import type { LatencyCorrelation } from '../../../../common/search_strategies/latency_correlations/types'; | ||
|
||
import { | ||
fetchTransactionDurationFractions, | ||
fetchTransactionDurationHistogramRangeSteps, | ||
fetchTransactionDurationHistograms, | ||
fetchTransactionDurationPercentiles, | ||
} from './index'; | ||
import { computeExpectationsAndRanges } from '../utils'; | ||
|
||
export const fetchSignificantCorrelations = async ( | ||
esClient: ElasticsearchClient, | ||
paramsWithIndex: SearchStrategyParams, | ||
fieldValuePairs: FieldValuePair[] | ||
) => { | ||
// Create an array of ranges [2, 4, 6, ..., 98] | ||
const percentileAggregationPercents = range(2, 100, 2); | ||
const { percentiles: percentilesRecords } = | ||
await fetchTransactionDurationPercentiles( | ||
esClient, | ||
paramsWithIndex, | ||
percentileAggregationPercents | ||
); | ||
const percentiles = Object.values(percentilesRecords); | ||
|
||
const { expectations, ranges } = computeExpectationsAndRanges(percentiles); | ||
|
||
const { fractions, totalDocCount } = await fetchTransactionDurationFractions( | ||
esClient, | ||
paramsWithIndex, | ||
ranges | ||
); | ||
|
||
const histogramRangeSteps = await fetchTransactionDurationHistogramRangeSteps( | ||
esClient, | ||
paramsWithIndex | ||
); | ||
|
||
const latencyCorrelations: LatencyCorrelation[] = []; | ||
|
||
for await (const item of fetchTransactionDurationHistograms( | ||
esClient, | ||
() => {}, | ||
paramsWithIndex, | ||
expectations, | ||
ranges, | ||
fractions, | ||
histogramRangeSteps, | ||
totalDocCount, | ||
fieldValuePairs | ||
)) { | ||
if (item !== undefined) { | ||
latencyCorrelations.push(item); | ||
} | ||
} | ||
|
||
// TODO Fix CCS warning | ||
return { latencyCorrelations, ccsWarning: false }; | ||
}; |
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we are not aborting request when the component unmounts (signal is
null
). Is that correct?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added abort signals in 45da6c5. (Previously we aborted via the
isCancelled
ref in between queries, this adds support to cancel already running queries too).