1import {
2 executionAsyncId as _getExecutionAsyncId,
3 executionAsyncResource as _getExecutionAsyncResource,
4 createHook,
5} from 'node:async_hooks';
6import { AsyncResourceKind } from './constants';
7import { StackFrame, getStackFrame, promiseWithReject } from './utils';
8import { FiberError } from './errors';
9
10export const enum AsyncResourceFlags {
11 INIT = 0,
12 /** The node's `before` trigger was invoked. Work must have started */
13 PRE_EXECUTION = 1 << 0,
14 /** The node's `after` trigger was invoked. Work must have been completed */
15 POST_EXECUTION = 1 << 1,
16 /** The node (which is a promise) has been resolved and triggered subsequent promises */
17 RESOLVED = 1 << 2,
18 /** The node has been created in a context by a cancelled fiber */
19 ABORTED = 1 << 3,
20
21 /** If these flags are set it indicates that the resource is no longer blocking work it's a trigger for */
22 FINALIZED = AsyncResourceFlags.POST_EXECUTION | AsyncResourceFlags.RESOLVED,
23}
24
25type AsyncResourceObserverFn = (
26 event: AsyncResourceFlags,
27 node: AsyncResourceNode
28) => void;
29
30class AsyncResourceObserver {
31 fiberId: number;
32 callback: AsyncResourceObserverFn;
33 constructor(fiber: AsyncResourceFiber, callback: AsyncResourceObserverFn) {
34 this.fiberId = fiber.fiberId;
35 this.callback = callback;
36 }
37
38 _onInit(node: AsyncResourceNode) {
39 this.callback(AsyncResourceFlags.INIT, node);
40 if (node.fiberId === this.fiberId) {
41 this.observe(node);
42 }
43 }
44
45 _onBefore(node: AsyncResourceNode) {
46 this.callback(AsyncResourceFlags.PRE_EXECUTION, node);
47 }
48
49 _onAfter(node: AsyncResourceNode) {
50 this.callback(AsyncResourceFlags.POST_EXECUTION, node);
51 node.notifyObserver = null;
52 }
53
54 _onPromiseResolve(node: AsyncResourceNode) {
55 this.callback(AsyncResourceFlags.RESOLVED, node);
56 node.notifyObserver = null;
57 }
58
59 isObserved(node: AsyncResourceNode): boolean {
60 return node.notifyObserver === this;
61 }
62
63 observe(node: AsyncResourceNode): void {
64 if (node.notifyObserver !== null && node.notifyObserver !== this) {
65 throw new TypeError(
66 'Only one observer can be attached to a node at a time.\n' +
67 `Node (${node.asyncId}) is already being observed`
68 );
69 }
70 node.notifyObserver = this;
71 }
72}
73
74type RawAsyncResource = object & { [K in typeof fiberRef]?: AsyncResourceNode };
75
76const fiberRef = Symbol('async_resource_node');
77const fiberStack: AsyncResourceFiber[] = [];
78
79const getExecutionAsyncResource: () => RawAsyncResource =
80 _getExecutionAsyncResource;
81const getExecutionAsyncId = _getExecutionAsyncId;
82
83let _fiberIdx = 1;
84
85export class AsyncResourceFiber {
86 readonly fiberId = _fiberIdx++;
87 readonly root: AsyncResourceNode;
88
89 active = false;
90 parent: AsyncResourceFiber | null;
91 frame: StackFrame | null;
92
93 constructor(frame: StackFrame | null) {
94 // Fibers are created as children of the current fiber,
95 // but their root resource will be the current execution context,
96 // even if it's deeper into the parent fiber's async graph
97 const executionAsyncResource = getExecutionAsyncResource();
98 let root = executionAsyncResource[fiberRef];
99 if (root === undefined) {
100 // WARN: This must imply `!this.parent`
101 // If no parent fiber exists, this is the root fiber, and
102 // empty state is created for its resource node
103 root = new AsyncResourceNode(
104 this.fiberId,
105 getExecutionAsyncId(),
106 null,
107 frame
108 );
109 executionAsyncResource[fiberRef] = root;
110 }
111 this.root = root;
112 this.parent = getFiber();
113 this.frame = frame;
114 }
115
116 enable() {
117 if (!this.active) {
118 // Set the current execution context's fiber to be the current fiber
119 this.root.fiberId = this.fiberId;
120 this.active = true;
121 fiberStack.push(this);
122 asyncResourceGraphHook.enable();
123 }
124 return this;
125 }
126
127 disable() {
128 fiberStack.splice(fiberStack.indexOf(this), 1);
129 if (!fiberStack.length) asyncResourceGraphHook.disable();
130 // Reset the context's fiber. This must use the stack since fibers
131 // may be created out of order and `this.parent` may not be
132 // the same as the stack's fiber
133 this.root.fiberId = getFiber()?.fiberId || 0;
134 this.active = false;
135 return this;
136 }
137
138 get parentFiberIds(): readonly number[] {
139 const parentFiberIds: number[] = [];
140 let fiber: AsyncResourceFiber | null = this;
141 while ((fiber = fiber.parent) !== null) parentFiberIds.push(fiber.fiberId);
142 return parentFiberIds;
143 }
144
145 /** Returns amount of pending tasks in the fiber */
146 get pending() {
147 const countExecutionTargets = (node: AsyncResourceNode) => {
148 let count = 0;
149 // To count all pending tasks, we check all resources that
150 // have been created on the fiber recursively and count all
151 // non-finalized ones
152 for (const target of node.executionTargets.values()) {
153 if (target.fiberId === this.fiberId) {
154 count += countExecutionTargets(target);
155 count += target.flags & AsyncResourceFlags.FINALIZED ? 0 : 1;
156 }
157 }
158 return count;
159 };
160 // Pending tasks may be counted excluding the top-level resource,
161 // as it presumably will still be in progress, but shouldn't be
162 // considered to be blocking.
163 return countExecutionTargets(this.root);
164 }
165
166 get executionTargets(): AsyncResourceNode[] {
167 return [...this.root.executionTargets.values()].filter(node => {
168 return node.fiberId === this.fiberId;
169 });
170 }
171
172 toString() {
173 return `[Fiber: ${this.fiberId}]`;
174 }
175}
176
177/** Descriptors for an async resource (akin to IO tasks) */
178export class AsyncResourceNode {
179 asyncId: number;
180 fiberId: number;
181 type: AsyncResourceKind | null;
182 active: boolean;
183
184 /** If available, a stacktrace frame pointing to the resource's initialization */
185 frame: StackFrame | null;
186
187 /** An execution context's descriptor this async resource was created in */
188 executionOrigin: AsyncResourceNode | null;
189 /** An async resource's descriptor which will, upon completion, trigger this async resource
190 * @remarks
191 * This will be identical to `executionOrigin` if the resource was triggerd synchronously
192 */
193 triggerOrigin: AsyncResourceNode | null;
194
195 /** Async resources that this resource has created (sub-tasks) */
196 executionTargets: Map<number, AsyncResourceNode>;
197 /** Async resources that this resource will trigger upon completion (follow-up tasks) */
198 triggerTargets: Map<number, AsyncResourceNode>;
199
200 /** Descriptions and completion state of this node */
201 flags: AsyncResourceFlags;
202
203 /** Observer that should be notified about changes */
204 notifyObserver: AsyncResourceObserver | null;
205
206 constructor(
207 fiberId: number,
208 asyncId: number,
209 type: string | null,
210 frame: StackFrame | null
211 ) {
212 this.active = true;
213 this.asyncId = asyncId;
214 this.fiberId = fiberId;
215 this.type = type as AsyncResourceKind;
216 this.frame = frame;
217 this.executionOrigin = null;
218 this.triggerOrigin = null;
219 this.executionTargets = new Map();
220 this.triggerTargets = new Map();
221 this.flags = AsyncResourceFlags.INIT;
222 this.notifyObserver = null;
223 }
224
225 _onExecute(
226 asyncId: number,
227 type: string,
228 triggerAsyncId: number,
229 resource: RawAsyncResource
230 ) {
231 if (!this.active) {
232 return;
233 }
234 // This method is called on an execution context's descriptor that created a new async resource.
235 // Hence, we create a new child async resource here
236 const frame = getStackFrame(1);
237 const node = new AsyncResourceNode(this.fiberId, asyncId, type, frame);
238 node.executionOrigin = this;
239 resource[fiberRef] = node;
240 this.executionTargets.set(asyncId, node);
241 const triggerNode =
242 asyncId !== triggerAsyncId
243 ? getAsyncResourceNode(triggerAsyncId)
244 : undefined;
245 if (triggerNode) {
246 node.triggerOrigin = triggerNode;
247 triggerNode.triggerTargets.set(asyncId, node);
248 }
249 if (this.notifyObserver) {
250 this.notifyObserver._onInit(node);
251 }
252 }
253
254 _onBefore() {
255 this.flags |= AsyncResourceFlags.PRE_EXECUTION;
256 if (this.active && this.notifyObserver) {
257 this.notifyObserver._onBefore(this);
258 }
259 }
260
261 _onAfter() {
262 this.flags |= AsyncResourceFlags.POST_EXECUTION;
263 if (this.active && this.notifyObserver) {
264 this.notifyObserver._onAfter(this);
265 }
266 }
267
268 _onPromiseResolve() {
269 this.flags |= AsyncResourceFlags.RESOLVED;
270 if (this.active && this.notifyObserver) {
271 this.notifyObserver._onPromiseResolve(this);
272 }
273 }
274
275 toString() {
276 const name = this.type
277 ? `${this.type}(${this.asyncId})`
278 : `Fiber: ${this.fiberId}`;
279 return `[async ${name}]`;
280 }
281}
282
283const taintAsyncResourceGraph = (
284 node: AsyncResourceNode,
285 mask: AsyncResourceFlags,
286 flags: AsyncResourceFlags
287) => {
288 if ((node.flags & mask & flags) === 0) {
289 node.flags |= flags;
290 for (const target of node.executionTargets.values())
291 taintAsyncResourceGraph(target, mask, flags);
292 for (const target of node.triggerTargets.values())
293 taintAsyncResourceGraph(target, mask, flags);
294 }
295};
296
297const getAsyncResourceNode = (
298 asyncId: number
299): AsyncResourceNode | undefined => {
300 let executionNode = getExecutionAsyncResource()[fiberRef] ?? null;
301 if (executionNode) {
302 // The `asyncResourceGraphHook`'s callbacks execute inside the fiber's execution context stack.
303 // This means we can find any node by checking all nodes in the current and any parent
304 // execution contexts.
305 let node: AsyncResourceNode | undefined;
306 do {
307 if (executionNode.asyncId === asyncId) return executionNode;
308 if ((node = executionNode.executionTargets.get(asyncId))) return node;
309 } while ((executionNode = executionNode.executionOrigin));
310 }
311};
312
313let _asyncResourceGraphHookActive = false;
314
315const asyncResourceGraphHook = createHook({
316 init(
317 asyncId: number,
318 type: string,
319 triggerAsyncId: number,
320 resource: RawAsyncResource
321 ) {
322 if (!_asyncResourceGraphHookActive) {
323 try {
324 _asyncResourceGraphHookActive = true;
325 const executionNode = getExecutionAsyncResource()[fiberRef];
326 executionNode?._onExecute(asyncId, type, triggerAsyncId, resource);
327 } finally {
328 _asyncResourceGraphHookActive = false;
329 }
330 }
331 },
332 before(asyncId: number) {
333 try {
334 _asyncResourceGraphHookActive = true;
335 getAsyncResourceNode(asyncId)?._onBefore();
336 } finally {
337 _asyncResourceGraphHookActive = false;
338 }
339 },
340 after(asyncId: number) {
341 try {
342 _asyncResourceGraphHookActive = true;
343 getAsyncResourceNode(asyncId)?._onAfter();
344 } finally {
345 _asyncResourceGraphHookActive = false;
346 }
347 },
348 promiseResolve(asyncId: number) {
349 try {
350 _asyncResourceGraphHookActive = true;
351 getAsyncResourceNode(asyncId)?._onPromiseResolve();
352 } finally {
353 _asyncResourceGraphHookActive = false;
354 }
355 },
356 // NOTE: While it's nice for cleanups we leave out the `destroy`
357 // hook since it has performance implications according to the docs
358});
359
360function fiberWatchdog<T>(
361 fiber: AsyncResourceFiber,
362 params: FiberParams,
363 promise: Promise<T>
364): Promise<T> {
365 try {
366 let watchdogImmediate: NodeJS.Immediate | void;
367 fiber.root.active = false;
368
369 const { abort } = params;
370 const { parentFiberIds } = fiber;
371
372 const pendingExecutionTargets = new Set<AsyncResourceNode>();
373 const watchdogResult = promiseWithReject(promise, () => {
374 if (watchdogImmediate)
375 watchdogImmediate = clearImmediate(watchdogImmediate);
376 });
377
378 function lastExecutionTarget(): AsyncResourceNode {
379 if (pendingExecutionTargets.size) {
380 const targets = [...pendingExecutionTargets.values()];
381 return targets[targets.length - 1];
382 } else {
383 const targets = fiber.executionTargets;
384 return targets[targets.length - 1] || fiber.root;
385 }
386 }
387
388 function assertFiberAbort(node: AsyncResourceNode) {
389 if (node.flags & AsyncResourceFlags.ABORTED) {
390 assertFiberError(new FiberError('FIBER_ABORTED', fiber, node));
391 } else if (
392 node.triggerOrigin &&
393 node.triggerOrigin.flags & AsyncResourceFlags.ABORTED
394 ) {
395 if (node.triggerOrigin.fiberId === fiber.fiberId) {
396 assertFiberError(
397 new FiberError('FIBER_ABORTED', fiber, node.triggerOrigin)
398 );
399 } else {
400 assertFiberError(
401 new FiberError('FOREIGN_ASYNC_ABORTED', fiber, node.triggerOrigin)
402 );
403 }
404 } else if (abort?.aborted) {
405 assertFiberError(abort.reason);
406 if (_asyncResourceGraphHookActive) abort.throwIfAborted();
407 }
408 }
409
410 function assertFiberError(error: Error) {
411 watchdogResult.reject(error);
412 if (_asyncResourceGraphHookActive) throw error;
413 }
414
415 function assertFiberOwnership(node: AsyncResourceNode) {
416 const { triggerOrigin } = node;
417 if (node.fiberId !== fiber.fiberId || !triggerOrigin) {
418 // We only check triggers for nodes owned by the current fiber
419 } else if (triggerOrigin === fiber.root) {
420 // Immediately invoked async IO on the fiber root are ignored
421 } else if (triggerOrigin.fiberId === node.fiberId) {
422 // If the trigger is a node in the same fiber, this is allowed
423 } else {
424 const isParentFiberTrigger = parentFiberIds.includes(
425 triggerOrigin.fiberId
426 );
427 if (!isParentFiberTrigger) {
428 assertFiberError(
429 new FiberError('FOREIGN_ASYNC_TRIGGER', fiber, node)
430 );
431 } else {
432 // TODO: Selectively allow resolved promises
433 assertFiberError(new FiberError('PARENT_ASYNC_TRIGGER', fiber, node));
434 }
435 }
436 }
437
438 function stallWatchdog() {
439 watchdogImmediate = undefined;
440 if (abort?.aborted) {
441 return;
442 }
443 let hasAsyncIO = false;
444 for (const asyncNode of pendingExecutionTargets) {
445 if (
446 (asyncNode.flags & AsyncResourceFlags.FINALIZED) === 0 &&
447 asyncNode.type !== 'PROMISE'
448 ) {
449 hasAsyncIO = true;
450 break;
451 }
452 }
453 if (!hasAsyncIO) {
454 watchdogResult.reject(
455 new FiberError('FIBER_STALL', fiber, lastExecutionTarget())
456 );
457 }
458 }
459
460 const scheduleCheck = () =>
461 watchdogImmediate || (watchdogImmediate = setImmediate(stallWatchdog));
462 watchdogImmediate = setImmediate(stallWatchdog);
463
464 abort?.addEventListener('abort', () => {
465 taintAsyncResourceGraph(
466 fiber.root,
467 AsyncResourceFlags.FINALIZED,
468 AsyncResourceFlags.ABORTED
469 );
470 });
471
472 const observer = new AsyncResourceObserver(fiber, (event, node) => {
473 scheduleCheck();
474 switch (event) {
475 case AsyncResourceFlags.INIT:
476 assertFiberOwnership(node);
477 assertFiberAbort(node);
478 pendingExecutionTargets.add(node);
479 return;
480 case AsyncResourceFlags.POST_EXECUTION:
481 pendingExecutionTargets.delete(node);
482 return;
483 case AsyncResourceFlags.RESOLVED:
484 assertFiberAbort(node);
485 pendingExecutionTargets.delete(node);
486 return;
487 default:
488 return;
489 }
490 });
491
492 const stack: AsyncResourceNode[] = [];
493 for (const target of fiber.root.executionTargets.values()) {
494 if (target.fiberId === fiber.fiberId) {
495 stack.push(target);
496 observer.observe(target);
497 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) {
498 assertFiberOwnership(target);
499 pendingExecutionTargets.add(target);
500 }
501 }
502 }
503 let pointer: AsyncResourceNode | undefined;
504 while ((pointer = stack.pop())) {
505 for (const target of pointer.executionTargets.values()) {
506 if (!observer.isObserved(target)) {
507 stack.push(target);
508 observer.observe(target);
509 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) {
510 assertFiberOwnership(target);
511 pendingExecutionTargets.add(target);
512 }
513 }
514 }
515 }
516
517 return watchdogResult.promise;
518 } finally {
519 fiber.root.active = true;
520 }
521}
522
523/** Enable async resource graph tracking and initialize root fiber if needed.
524 * @remarks
525 * Call this as early as async resources are created that other fibers depend
526 * on. No async resources outside of the root fiber will be tracked!
527 */
528export function enable(): AsyncResourceFiber {
529 return getFiber() || new AsyncResourceFiber(getStackFrame(0)).enable();
530}
531
532/** Disable top-level fiber
533 * @remarks
534 * If a root fiber has been created with `enable()`, calling this function
535 * allows all fiber data to be garbage collected.
536 */
537export function disable(): AsyncResourceFiber | null {
538 return getFiber()?.disable() || null;
539}
540
541/** Get currently active fiber */
542export function getFiber(): AsyncResourceFiber | null {
543 for (let idx = fiberStack.length - 1; idx >= 0; idx--)
544 if (fiberStack[idx].active) return fiberStack[idx];
545 return null;
546}
547
548/** Returns an arbitrary async resource's descriptor
549 * @remarks
550 * WARN: This will only work for async resource objects that have been
551 * created in a fiber.
552 */
553export function getFiberNode(resource: object): AsyncResourceNode | undefined {
554 return resource[fiberRef];
555}
556
557export interface FiberParams {
558 abort?: AbortSignal;
559}
560
561/** Create a fiber and execute it, returning both the result of `fn` and its fiber
562 * @remarks
563 * While this function returns synchronously, it will track all async resources
564 * that the passed function has created.
565 */
566export function fiber<T>(
567 fn: () => Promise<T>,
568 params: FiberParams = {}
569): { return: Promise<T>; fiber: AsyncResourceFiber } {
570 const fiber = new AsyncResourceFiber(getStackFrame(0));
571 try {
572 fiber.enable();
573 return { return: fiberWatchdog(fiber, params, fn()), fiber };
574 } finally {
575 fiber.disable();
576 }
577}