Skip to content

Commit bf2f617

Browse files
AsyncLocalStorage based ContextManager (#1210)
Co-authored-by: Daniel Dyla <dyladan@users.noreply.github.com>
1 parent 643b97f commit bf2f617

File tree

5 files changed

+534
-426
lines changed

5 files changed

+534
-426
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { ContextManager, Context } from '@opentelemetry/context-base';
18+
import { EventEmitter } from 'events';
19+
20+
type Func<T> = (...args: unknown[]) => T;
21+
22+
type PatchedEventEmitter = {
23+
/**
24+
* Store a map for each event of all original listener and their "patched"
25+
* version so when the listener is removed by the user, we remove the
26+
* corresponding "patched" function.
27+
*/
28+
__ot_listeners?: { [name: string]: WeakMap<Func<void>, Func<void>> };
29+
} & EventEmitter;
30+
31+
const ADD_LISTENER_METHODS = [
32+
'addListener' as 'addListener',
33+
'on' as 'on',
34+
'once' as 'once',
35+
'prependListener' as 'prependListener',
36+
'prependOnceListener' as 'prependOnceListener',
37+
];
38+
39+
export abstract class AbstractAsyncHooksContextManager
40+
implements ContextManager {
41+
abstract active(): Context;
42+
43+
abstract with<T extends (...args: unknown[]) => ReturnType<T>>(
44+
context: Context,
45+
fn: T
46+
): ReturnType<T>;
47+
48+
abstract enable(): this;
49+
50+
abstract disable(): this;
51+
52+
bind<T>(target: T, context: Context = this.active()): T {
53+
if (target instanceof EventEmitter) {
54+
return this._bindEventEmitter(target, context);
55+
}
56+
57+
if (typeof target === 'function') {
58+
return this._bindFunction(target, context);
59+
}
60+
return target;
61+
}
62+
63+
private _bindFunction<T extends Function>(target: T, context: Context): T {
64+
const manager = this;
65+
const contextWrapper = function (this: {}, ...args: unknown[]) {
66+
return manager.with(context, () => target.apply(this, args));
67+
};
68+
Object.defineProperty(contextWrapper, 'length', {
69+
enumerable: false,
70+
configurable: true,
71+
writable: false,
72+
value: target.length,
73+
});
74+
/**
75+
* It isn't possible to tell Typescript that contextWrapper is the same as T
76+
* so we forced to cast as any here.
77+
*/
78+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
79+
return contextWrapper as any;
80+
}
81+
82+
/**
83+
* By default, EventEmitter call their callback with their context, which we do
84+
* not want, instead we will bind a specific context to all callbacks that
85+
* go through it.
86+
* @param target EventEmitter a instance of EventEmitter to patch
87+
* @param context the context we want to bind
88+
*/
89+
private _bindEventEmitter<T extends EventEmitter>(
90+
target: T,
91+
context: Context
92+
): T {
93+
const ee = (target as unknown) as PatchedEventEmitter;
94+
if (ee.__ot_listeners !== undefined) return target;
95+
ee.__ot_listeners = {};
96+
97+
// patch methods that add a listener to propagate context
98+
ADD_LISTENER_METHODS.forEach(methodName => {
99+
if (ee[methodName] === undefined) return;
100+
ee[methodName] = this._patchAddListener(ee, ee[methodName], context);
101+
});
102+
// patch methods that remove a listener
103+
if (typeof ee.removeListener === 'function') {
104+
ee.removeListener = this._patchRemoveListener(ee, ee.removeListener);
105+
}
106+
if (typeof ee.off === 'function') {
107+
ee.off = this._patchRemoveListener(ee, ee.off);
108+
}
109+
// patch method that remove all listeners
110+
if (typeof ee.removeAllListeners === 'function') {
111+
ee.removeAllListeners = this._patchRemoveAllListeners(
112+
ee,
113+
ee.removeAllListeners
114+
);
115+
}
116+
return target;
117+
}
118+
119+
/**
120+
* Patch methods that remove a given listener so that we match the "patched"
121+
* version of that listener (the one that propagate context).
122+
* @param ee EventEmitter instance
123+
* @param original reference to the patched method
124+
*/
125+
private _patchRemoveListener(ee: PatchedEventEmitter, original: Function) {
126+
return function (this: {}, event: string, listener: Func<void>) {
127+
if (
128+
ee.__ot_listeners === undefined ||
129+
ee.__ot_listeners[event] === undefined
130+
) {
131+
return original.call(this, event, listener);
132+
}
133+
const events = ee.__ot_listeners[event];
134+
const patchedListener = events.get(listener);
135+
return original.call(this, event, patchedListener || listener);
136+
};
137+
}
138+
139+
/**
140+
* Patch methods that remove all listeners so we remove our
141+
* internal references for a given event.
142+
* @param ee EventEmitter instance
143+
* @param original reference to the patched method
144+
*/
145+
private _patchRemoveAllListeners(
146+
ee: PatchedEventEmitter,
147+
original: Function
148+
) {
149+
return function (this: {}, event: string) {
150+
if (
151+
ee.__ot_listeners === undefined ||
152+
ee.__ot_listeners[event] === undefined
153+
) {
154+
return original.call(this, event);
155+
}
156+
delete ee.__ot_listeners[event];
157+
return original.call(this, event);
158+
};
159+
}
160+
161+
/**
162+
* Patch methods on an event emitter instance that can add listeners so we
163+
* can force them to propagate a given context.
164+
* @param ee EventEmitter instance
165+
* @param original reference to the patched method
166+
* @param [context] context to propagate when calling listeners
167+
*/
168+
private _patchAddListener(
169+
ee: PatchedEventEmitter,
170+
original: Function,
171+
context: Context
172+
) {
173+
const contextManager = this;
174+
return function (this: {}, event: string, listener: Func<void>) {
175+
if (ee.__ot_listeners === undefined) ee.__ot_listeners = {};
176+
let listeners = ee.__ot_listeners[event];
177+
if (listeners === undefined) {
178+
listeners = new WeakMap();
179+
ee.__ot_listeners[event] = listeners;
180+
}
181+
const patchedListener = contextManager.bind(listener, context);
182+
// store a weak reference of the user listener to ours
183+
listeners.set(listener, patchedListener);
184+
return original.call(this, event, patchedListener);
185+
};
186+
}
187+
}

packages/opentelemetry-context-async-hooks/src/AsyncHooksContextManager.ts

+4-160
Original file line numberDiff line numberDiff line change
@@ -14,35 +14,17 @@
1414
* limitations under the License.
1515
*/
1616

17-
import { ContextManager, Context } from '@opentelemetry/context-base';
17+
import { Context } from '@opentelemetry/context-base';
1818
import * as asyncHooks from 'async_hooks';
19-
import { EventEmitter } from 'events';
19+
import { AbstractAsyncHooksContextManager } from './AbstractAsyncHooksContextManager';
2020

21-
type Func<T> = (...args: unknown[]) => T;
22-
23-
type PatchedEventEmitter = {
24-
/**
25-
* Store a map for each event of all original listener and their "patched"
26-
* version so when the listener is removed by the user, we remove the
27-
* corresponding "patched" function.
28-
*/
29-
__ot_listeners?: { [name: string]: WeakMap<Func<void>, Func<void>> };
30-
} & EventEmitter;
31-
32-
const ADD_LISTENER_METHODS = [
33-
'addListener' as 'addListener',
34-
'on' as 'on',
35-
'once' as 'once',
36-
'prependListener' as 'prependListener',
37-
'prependOnceListener' as 'prependOnceListener',
38-
];
39-
40-
export class AsyncHooksContextManager implements ContextManager {
21+
export class AsyncHooksContextManager extends AbstractAsyncHooksContextManager {
4122
private _asyncHook: asyncHooks.AsyncHook;
4223
private _contexts: Map<number, Context | undefined> = new Map();
4324
private _stack: Array<Context | undefined> = [];
4425

4526
constructor() {
27+
super();
4628
this._asyncHook = asyncHooks.createHook({
4729
init: this._init.bind(this),
4830
before: this._before.bind(this),
@@ -68,19 +50,6 @@ export class AsyncHooksContextManager implements ContextManager {
6850
}
6951
}
7052

71-
bind<T>(target: T, context?: Context): T {
72-
// if no specific context to propagate is given, we use the current one
73-
if (context === undefined) {
74-
context = this.active();
75-
}
76-
if (target instanceof EventEmitter) {
77-
return this._bindEventEmitter(target, context);
78-
} else if (typeof target === 'function') {
79-
return this._bindFunction(target, context);
80-
}
81-
return target;
82-
}
83-
8453
enable(): this {
8554
this._asyncHook.enable();
8655
return this;
@@ -93,131 +62,6 @@ export class AsyncHooksContextManager implements ContextManager {
9362
return this;
9463
}
9564

96-
private _bindFunction<T extends Function>(target: T, context: Context): T {
97-
const manager = this;
98-
const contextWrapper = function (this: {}, ...args: unknown[]) {
99-
return manager.with(context, () => target.apply(this, args));
100-
};
101-
Object.defineProperty(contextWrapper, 'length', {
102-
enumerable: false,
103-
configurable: true,
104-
writable: false,
105-
value: target.length,
106-
});
107-
/**
108-
* It isn't possible to tell Typescript that contextWrapper is the same as T
109-
* so we forced to cast as any here.
110-
*/
111-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
112-
return contextWrapper as any;
113-
}
114-
115-
/**
116-
* By default, EventEmitter call their callback with their context, which we do
117-
* not want, instead we will bind a specific context to all callbacks that
118-
* go through it.
119-
* @param target EventEmitter a instance of EventEmitter to patch
120-
* @param context the context we want to bind
121-
*/
122-
private _bindEventEmitter<T extends EventEmitter>(
123-
target: T,
124-
context: Context
125-
): T {
126-
const ee = (target as unknown) as PatchedEventEmitter;
127-
if (ee.__ot_listeners !== undefined) return target;
128-
ee.__ot_listeners = {};
129-
130-
// patch methods that add a listener to propagate context
131-
ADD_LISTENER_METHODS.forEach(methodName => {
132-
if (ee[methodName] === undefined) return;
133-
ee[methodName] = this._patchAddListener(ee, ee[methodName], context);
134-
});
135-
// patch methods that remove a listener
136-
if (typeof ee.removeListener === 'function') {
137-
ee.removeListener = this._patchRemoveListener(ee, ee.removeListener);
138-
}
139-
if (typeof ee.off === 'function') {
140-
ee.off = this._patchRemoveListener(ee, ee.off);
141-
}
142-
// patch method that remove all listeners
143-
if (typeof ee.removeAllListeners === 'function') {
144-
ee.removeAllListeners = this._patchRemoveAllListeners(
145-
ee,
146-
ee.removeAllListeners
147-
);
148-
}
149-
return target;
150-
}
151-
152-
/**
153-
* Patch methods that remove a given listener so that we match the "patched"
154-
* version of that listener (the one that propagate context).
155-
* @param ee EventEmitter instance
156-
* @param original reference to the patched method
157-
*/
158-
private _patchRemoveListener(ee: PatchedEventEmitter, original: Function) {
159-
return function (this: {}, event: string, listener: Func<void>) {
160-
if (
161-
ee.__ot_listeners === undefined ||
162-
ee.__ot_listeners[event] === undefined
163-
) {
164-
return original.call(this, event, listener);
165-
}
166-
const events = ee.__ot_listeners[event];
167-
const patchedListener = events.get(listener);
168-
return original.call(this, event, patchedListener || listener);
169-
};
170-
}
171-
172-
/**
173-
* Patch methods that remove all listeners so we remove our
174-
* internal references for a given event.
175-
* @param ee EventEmitter instance
176-
* @param original reference to the patched method
177-
*/
178-
private _patchRemoveAllListeners(
179-
ee: PatchedEventEmitter,
180-
original: Function
181-
) {
182-
return function (this: {}, event: string) {
183-
if (
184-
ee.__ot_listeners === undefined ||
185-
ee.__ot_listeners[event] === undefined
186-
) {
187-
return original.call(this, event);
188-
}
189-
delete ee.__ot_listeners[event];
190-
return original.call(this, event);
191-
};
192-
}
193-
194-
/**
195-
* Patch methods on an event emitter instance that can add listeners so we
196-
* can force them to propagate a given context.
197-
* @param ee EventEmitter instance
198-
* @param original reference to the patched method
199-
* @param [context] context to propagate when calling listeners
200-
*/
201-
private _patchAddListener(
202-
ee: PatchedEventEmitter,
203-
original: Function,
204-
context: Context
205-
) {
206-
const contextManager = this;
207-
return function (this: {}, event: string, listener: Func<void>) {
208-
if (ee.__ot_listeners === undefined) ee.__ot_listeners = {};
209-
let listeners = ee.__ot_listeners[event];
210-
if (listeners === undefined) {
211-
listeners = new WeakMap();
212-
ee.__ot_listeners[event] = listeners;
213-
}
214-
const patchedListener = contextManager.bind(listener, context);
215-
// store a weak reference of the user listener to ours
216-
listeners.set(listener, patchedListener);
217-
return original.call(this, event, patchedListener);
218-
};
219-
}
220-
22165
/**
22266
* Init hook will be called when userland create a async context, setting the
22367
* context as the current one if it exist.

0 commit comments

Comments
 (0)