Skip to content

Commit f3ecab4

Browse files
feat: improve ContextManager performance using AsyncLocalStorage
Use AsyncLocalStorage in ContentManager to improve performance
1 parent 744c871 commit f3ecab4

File tree

5 files changed

+535
-428
lines changed

5 files changed

+535
-428
lines changed

packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts

+4-160
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,17 @@
1414
* limitations under the License.
1515
*/
1616

17-
import { ContextManager, Context } from '@opentelemetry/context-base';
17+
import { Context } from '@opentelemetry/context-base';
1818
import * as asyncHooks from 'async_hooks';
19-
import { EventEmitter } from 'events';
19+
import { BaseContextManager } from './BaseContextManager';
2020

21-
type Func<T> = (...args: unknown[]) => T;
22-
23-
type PatchedEventEmitter = {
24-
/**
25-
* Store a map for each event of all original listener and their "patched"
26-
* version so when the listener is removed by the user, we remove the
27-
* corresponding "patched" function.
28-
*/
29-
__ot_listeners?: { [name: string]: WeakMap<Func<void>, Func<void>> };
30-
} & EventEmitter;
31-
32-
const ADD_LISTENER_METHODS = [
33-
'addListener' as 'addListener',
34-
'on' as 'on',
35-
'once' as 'once',
36-
'prependListener' as 'prependListener',
37-
'prependOnceListener' as 'prependOnceListener',
38-
];
39-
40-
export class AsyncHooksContextManager implements ContextManager {
21+
export class AsyncHooksContextManager extends BaseContextManager {
4122
private _asyncHook: asyncHooks.AsyncHook;
4223
private _contexts: Map<number, Context | undefined> = new Map();
4324
private _stack: Array<Context | undefined> = [];
4425

4526
constructor() {
27+
super();
4628
this._asyncHook = asyncHooks.createHook({
4729
init: this._init.bind(this),
4830
before: this._before.bind(this),
@@ -68,19 +50,6 @@ export class AsyncHooksContextManager implements ContextManager {
6850
}
6951
}
7052

71-
bind<T>(target: T, context?: Context): T {
72-
// if no specific context to propagate is given, we use the current one
73-
if (context === undefined) {
74-
context = this.active();
75-
}
76-
if (target instanceof EventEmitter) {
77-
return this._bindEventEmitter(target, context);
78-
} else if (typeof target === 'function') {
79-
return this._bindFunction(target, context);
80-
}
81-
return target;
82-
}
83-
8453
enable(): this {
8554
this._asyncHook.enable();
8655
return this;
@@ -93,131 +62,6 @@ export class AsyncHooksContextManager implements ContextManager {
9362
return this;
9463
}
9564

96-
private _bindFunction<T extends Function>(target: T, context: Context): T {
97-
const manager = this;
98-
const contextWrapper = function (this: {}, ...args: unknown[]) {
99-
return manager.with(context, () => target.apply(this, args));
100-
};
101-
Object.defineProperty(contextWrapper, 'length', {
102-
enumerable: false,
103-
configurable: true,
104-
writable: false,
105-
value: target.length,
106-
});
107-
/**
108-
* It isn't possible to tell Typescript that contextWrapper is the same as T
109-
* so we forced to cast as any here.
110-
*/
111-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
112-
return contextWrapper as any;
113-
}
114-
115-
/**
116-
* By default, EventEmitter call their callback with their context, which we do
117-
* not want, instead we will bind a specific context to all callbacks that
118-
* go through it.
119-
* @param target EventEmitter a instance of EventEmitter to patch
120-
* @param context the context we want to bind
121-
*/
122-
private _bindEventEmitter<T extends EventEmitter>(
123-
target: T,
124-
context: Context
125-
): T {
126-
const ee = (target as unknown) as PatchedEventEmitter;
127-
if (ee.__ot_listeners !== undefined) return target;
128-
ee.__ot_listeners = {};
129-
130-
// patch methods that add a listener to propagate context
131-
ADD_LISTENER_METHODS.forEach(methodName => {
132-
if (ee[methodName] === undefined) return;
133-
ee[methodName] = this._patchAddListener(ee, ee[methodName], context);
134-
});
135-
// patch methods that remove a listener
136-
if (typeof ee.removeListener === 'function') {
137-
ee.removeListener = this._patchRemoveListener(ee, ee.removeListener);
138-
}
139-
if (typeof ee.off === 'function') {
140-
ee.off = this._patchRemoveListener(ee, ee.off);
141-
}
142-
// patch method that remove all listeners
143-
if (typeof ee.removeAllListeners === 'function') {
144-
ee.removeAllListeners = this._patchRemoveAllListeners(
145-
ee,
146-
ee.removeAllListeners
147-
);
148-
}
149-
return target;
150-
}
151-
152-
/**
153-
* Patch methods that remove a given listener so that we match the "patched"
154-
* version of that listener (the one that propagate context).
155-
* @param ee EventEmitter instance
156-
* @param original reference to the patched method
157-
*/
158-
private _patchRemoveListener(ee: PatchedEventEmitter, original: Function) {
159-
return function (this: {}, event: string, listener: Func<void>) {
160-
if (
161-
ee.__ot_listeners === undefined ||
162-
ee.__ot_listeners[event] === undefined
163-
) {
164-
return original.call(this, event, listener);
165-
}
166-
const events = ee.__ot_listeners[event];
167-
const patchedListener = events.get(listener);
168-
return original.call(this, event, patchedListener || listener);
169-
};
170-
}
171-
172-
/**
173-
* Patch methods that remove all listeners so we remove our
174-
* internal references for a given event.
175-
* @param ee EventEmitter instance
176-
* @param original reference to the patched method
177-
*/
178-
private _patchRemoveAllListeners(
179-
ee: PatchedEventEmitter,
180-
original: Function
181-
) {
182-
return function (this: {}, event: string) {
183-
if (
184-
ee.__ot_listeners === undefined ||
185-
ee.__ot_listeners[event] === undefined
186-
) {
187-
return original.call(this, event);
188-
}
189-
delete ee.__ot_listeners[event];
190-
return original.call(this, event);
191-
};
192-
}
193-
194-
/**
195-
* Patch methods on an event emitter instance that can add listeners so we
196-
* can force them to propagate a given context.
197-
* @param ee EventEmitter instance
198-
* @param original reference to the patched method
199-
* @param [context] context to propagate when calling listeners
200-
*/
201-
private _patchAddListener(
202-
ee: PatchedEventEmitter,
203-
original: Function,
204-
context: Context
205-
) {
206-
const contextManager = this;
207-
return function (this: {}, event: string, listener: Func<void>) {
208-
if (ee.__ot_listeners === undefined) ee.__ot_listeners = {};
209-
let listeners = ee.__ot_listeners[event];
210-
if (listeners === undefined) {
211-
listeners = new WeakMap();
212-
ee.__ot_listeners[event] = listeners;
213-
}
214-
const patchedListener = contextManager.bind(listener, context);
215-
// store a weak reference of the user listener to ours
216-
listeners.set(listener, patchedListener);
217-
return original.call(this, event, patchedListener);
218-
};
219-
}
220-
22165
/**
22266
* Init hook will be called when userland create a async context, setting the
22367
* context as the current one if it exist.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { Context } from '@opentelemetry/context-base';
18+
import { AsyncLocalStorage } from 'async_hooks';
19+
import { BaseContextManager } from './BaseContextManager';
20+
21+
export class AsyncLocalStorageContextManager extends BaseContextManager {
22+
private _asyncLocalStorage: AsyncLocalStorage<Context>;
23+
24+
constructor() {
25+
super();
26+
this._asyncLocalStorage = new AsyncLocalStorage();
27+
}
28+
29+
active(): Context {
30+
return this._asyncLocalStorage.getStore() ?? Context.ROOT_CONTEXT;
31+
}
32+
33+
with<T extends (...args: unknown[]) => ReturnType<T>>(
34+
context: Context,
35+
fn: T
36+
): ReturnType<T> {
37+
return this._asyncLocalStorage.run(context, fn) as ReturnType<T>;
38+
}
39+
40+
enable(): this {
41+
return this;
42+
}
43+
44+
disable(): this {
45+
this._asyncLocalStorage.disable();
46+
return this;
47+
}
48+
}

0 commit comments

Comments
 (0)