1import {
2 executionAsyncId as _getExecutionAsyncId,
3 executionAsyncResource as _getExecutionAsyncResource,
4 createHook,
5} from 'node:async_hooks';
6import { AsyncResourceKind } from './constants';
7import { StackFrame, getStackFrame, promiseWithReject } from './utils';
8import { FiberError } from './errors';
9
10export const enum AsyncResourceFlags {
11 INIT = 0,
12 /** The node's `before` trigger was invoked. Work must have started */
13 PRE_EXECUTION = 1 << 0,
14 /** The node's `after` trigger was invoked. Work must have been completed */
15 POST_EXECUTION = 1 << 1,
16 /** The node (which is a promise) has been resolved and triggered subsequent promises */
17 RESOLVED = 1 << 2,
18 /** The node has been created in a context by a cancelled fiber */
19 ABORTED = 1 << 3,
20
21 /** If these flags are set it indicates that the resource is no longer blocking work it's a trigger for */
22 FINALIZED = AsyncResourceFlags.POST_EXECUTION | AsyncResourceFlags.RESOLVED,
23}
24
25type AsyncResourceObserverFn = (
26 event: AsyncResourceFlags,
27 node: AsyncResourceNode
28) => void;
29
30class AsyncResourceObserver {
31 fiberId: number;
32 callback: AsyncResourceObserverFn;
33 constructor(fiber: AsyncResourceFiber, callback: AsyncResourceObserverFn) {
34 this.fiberId = fiber.fiberId;
35 this.callback = callback;
36 }
37
38 _onExecute(executionNode: AsyncResourceNode, node: AsyncResourceNode) {
39 this.callback(AsyncResourceFlags.INIT, executionNode);
40 if (node.fiberId === this.fiberId) {
41 this.observe(node);
42 }
43 }
44
45 _onBefore(node: AsyncResourceNode) {
46 this.callback(AsyncResourceFlags.PRE_EXECUTION, node);
47 }
48
49 _onAfter(node: AsyncResourceNode) {
50 this.callback(AsyncResourceFlags.POST_EXECUTION, node);
51 node.notifyObserver = null;
52 }
53
54 _onPromiseResolve(node: AsyncResourceNode) {
55 this.callback(AsyncResourceFlags.RESOLVED, node);
56 node.notifyObserver = null;
57 }
58
59 isObserved(node: AsyncResourceNode): boolean {
60 return node.notifyObserver === this;
61 }
62
63 observe(node: AsyncResourceNode): void {
64 if (node.notifyObserver !== null && node.notifyObserver !== this) {
65 throw new TypeError(
66 'Only one observer can be attached to a node at a time.\n' +
67 `Node (${node.asyncId}) is already being observed`
68 );
69 }
70 node.notifyObserver = this;
71 }
72}
73
74type RawAsyncResource = object & { [K in typeof fiberRef]?: AsyncResourceNode };
75
76const fiberRef = Symbol('async_resource_node');
77const fiberStack: AsyncResourceFiber[] = [];
78
79const getExecutionAsyncResource: () => RawAsyncResource =
80 _getExecutionAsyncResource;
81const getExecutionAsyncId = _getExecutionAsyncId;
82
83let _fiberIdx = 1;
84
85export class AsyncResourceFiber {
86 readonly fiberId = _fiberIdx++;
87 readonly root: AsyncResourceNode;
88
89 active = false;
90 parent: AsyncResourceFiber | null;
91 frame: StackFrame | null;
92
93 constructor(frame: StackFrame | null) {
94 // Fibers are created as children of the current fiber,
95 // but their root resource will be the current execution context,
96 // even if it's deeper into the parent fiber's async graph
97 const executionAsyncResource = getExecutionAsyncResource();
98 let root = executionAsyncResource[fiberRef];
99 if (root === undefined) {
100 // WARN: This must imply `!this.parent`
101 // If no parent fiber exists, this is the root fiber, and
102 // empty state is created for its resource node
103 root = new AsyncResourceNode(
104 this.fiberId,
105 getExecutionAsyncId(),
106 null,
107 frame
108 );
109 executionAsyncResource[fiberRef] = root;
110 }
111 this.root = root;
112 this.parent = getFiber();
113 this.frame = frame;
114 }
115
116 enable() {
117 if (!this.active) {
118 // Set the current execution context's fiber to be the current fiber
119 this.root.fiberId = this.fiberId;
120 this.active = true;
121 fiberStack.push(this);
122 asyncResourceGraphHook.enable();
123 }
124 return this;
125 }
126
127 disable() {
128 fiberStack.splice(fiberStack.indexOf(this), 1);
129 if (!fiberStack.length) asyncResourceGraphHook.disable();
130 // Reset the context's fiber. This must use the stack since fibers
131 // may be created out of order and `this.parent` may not be
132 // the same as the stack's fiber
133 this.root.fiberId = getFiber()?.fiberId || 0;
134 this.active = false;
135 return this;
136 }
137
138 get parentFiberIds(): readonly number[] {
139 const parentFiberIds: number[] = [];
140 let fiber: AsyncResourceFiber | null = this;
141 while ((fiber = fiber.parent) !== null) parentFiberIds.push(fiber.fiberId);
142 return parentFiberIds;
143 }
144
145 /** Returns amount of pending tasks in the fiber */
146 get pending() {
147 const countExecutionTargets = (node: AsyncResourceNode) => {
148 let count = 0;
149 // To count all pending tasks, we check all resources that
150 // have been created on the fiber recursively and count all
151 // non-finalized ones
152 for (const target of node.executionTargets.values()) {
153 if (target.fiberId === this.fiberId) {
154 count += countExecutionTargets(target);
155 count += target.flags & AsyncResourceFlags.FINALIZED ? 0 : 1;
156 }
157 }
158 return count;
159 };
160 // Pending tasks may be counted excluding the top-level resource,
161 // as it presumably will still be in progress, but shouldn't be
162 // considered to be blocking.
163 return countExecutionTargets(this.root);
164 }
165
166 get executionTargets(): AsyncResourceNode[] {
167 return [...this.root.executionTargets.values()].filter(node => {
168 return node.fiberId === this.fiberId;
169 });
170 }
171
172 toString() {
173 return `[Fiber: ${this.fiberId}]`;
174 }
175}
176
177/** Descriptors for an async resource (akin to IO tasks) */
178export class AsyncResourceNode {
179 asyncId: number;
180 fiberId: number;
181 type: AsyncResourceKind | null;
182 active: boolean;
183
184 /** If available, a stacktrace frame pointing to the resource's initialization */
185 frame: StackFrame | null;
186
187 /** An execution context's descriptor this async resource was created in */
188 executionOrigin: AsyncResourceNode | null;
189 /** An async resource's descriptor which will, upon completion, trigger this async resource
190 * @remarks
191 * This will be identical to `executionOrigin` if the resource was triggerd synchronously
192 */
193 triggerOrigin: AsyncResourceNode | null;
194
195 /** Async resources that this resource has created (sub-tasks) */
196 executionTargets: Map<number, AsyncResourceNode>;
197 /** Async resources that this resource will trigger upon completion (follow-up tasks) */
198 triggerTargets: Map<number, AsyncResourceNode>;
199
200 /** Descriptions and completion state of this node */
201 flags: AsyncResourceFlags;
202
203 /** Observer that should be notified about changes */
204 notifyObserver: AsyncResourceObserver | null;
205
206 constructor(
207 fiberId: number,
208 asyncId: number,
209 type: string | null,
210 frame: StackFrame | null
211 ) {
212 this.active = true;
213 this.asyncId = asyncId;
214 this.fiberId = fiberId;
215 this.type = type as AsyncResourceKind;
216 this.frame = frame;
217 this.executionOrigin = null;
218 this.triggerOrigin = null;
219 this.executionTargets = new Map();
220 this.triggerTargets = new Map();
221 this.flags = AsyncResourceFlags.INIT;
222 this.notifyObserver = null;
223 }
224
225 _onExecute(
226 asyncId: number,
227 type: string,
228 triggerAsyncId: number,
229 resource: RawAsyncResource
230 ) {
231 if (!this.active) {
232 return;
233 }
234 // This method is called on an execution context's descriptor that created a new async resource.
235 // Hence, we create a new child async resource here
236 const frame = getStackFrame(1);
237 const node = new AsyncResourceNode(this.fiberId, asyncId, type, frame);
238 node.executionOrigin = this;
239 resource[fiberRef] = node;
240 this.executionTargets.set(asyncId, node);
241 const triggerNode =
242 asyncId !== triggerAsyncId
243 ? getAsyncResourceNode(triggerAsyncId)
244 : undefined;
245 if (triggerNode) {
246 node.triggerOrigin = triggerNode;
247 triggerNode.triggerTargets.set(asyncId, node);
248 }
249 if (this.notifyObserver) {
250 this.notifyObserver._onExecute(this, node);
251 }
252 }
253
254 _onBefore() {
255 this.flags |= AsyncResourceFlags.PRE_EXECUTION;
256 if (this.active && this.notifyObserver) {
257 this.notifyObserver._onBefore(this);
258 }
259 }
260
261 _onAfter() {
262 this.flags |= AsyncResourceFlags.POST_EXECUTION;
263 if (this.active && this.notifyObserver) {
264 this.notifyObserver._onAfter(this);
265 }
266 }
267
268 _onPromiseResolve() {
269 this.flags |= AsyncResourceFlags.RESOLVED;
270 if (this.active && this.notifyObserver) {
271 this.notifyObserver._onPromiseResolve(this);
272 }
273 }
274
275 toString() {
276 return `[async ${this.type || `Fiber: ${this.fiberId}`}]`;
277 }
278}
279
280const taintAsyncResourceGraph = (
281 node: AsyncResourceNode,
282 mask: AsyncResourceFlags,
283 flags: AsyncResourceFlags
284) => {
285 if ((node.flags & mask & flags) === 0) {
286 node.flags |= flags;
287 for (const target of node.executionTargets.values())
288 taintAsyncResourceGraph(target, mask, flags);
289 for (const target of node.triggerTargets.values())
290 taintAsyncResourceGraph(target, mask, flags);
291 }
292};
293
294const getAsyncResourceNode = (
295 asyncId: number
296): AsyncResourceNode | undefined => {
297 let executionNode = getExecutionAsyncResource()[fiberRef] ?? null;
298 if (executionNode) {
299 // The `asyncResourceGraphHook`'s callbacks execute inside the fiber's execution context stack.
300 // This means we can find any node by checking all nodes in the current and any parent
301 // execution contexts.
302 let node: AsyncResourceNode | undefined;
303 do {
304 if (executionNode.asyncId === asyncId) return executionNode;
305 if ((node = executionNode.executionTargets.get(asyncId))) return node;
306 } while ((executionNode = executionNode.executionOrigin));
307 }
308};
309
310let _asyncResourceGraphHookActive = false;
311
312const asyncResourceGraphHook = createHook({
313 init(
314 asyncId: number,
315 type: string,
316 triggerAsyncId: number,
317 resource: RawAsyncResource
318 ) {
319 if (!_asyncResourceGraphHookActive) {
320 try {
321 _asyncResourceGraphHookActive = true;
322 const executionNode = getExecutionAsyncResource()[fiberRef];
323 executionNode?._onExecute(asyncId, type, triggerAsyncId, resource);
324 } finally {
325 _asyncResourceGraphHookActive = false;
326 }
327 }
328 },
329 before(asyncId: number) {
330 try {
331 _asyncResourceGraphHookActive = true;
332 getAsyncResourceNode(asyncId)?._onBefore();
333 } finally {
334 _asyncResourceGraphHookActive = false;
335 }
336 },
337 after(asyncId: number) {
338 try {
339 _asyncResourceGraphHookActive = true;
340 getAsyncResourceNode(asyncId)?._onAfter();
341 } finally {
342 _asyncResourceGraphHookActive = false;
343 }
344 },
345 promiseResolve(asyncId: number) {
346 try {
347 _asyncResourceGraphHookActive = true;
348 getAsyncResourceNode(asyncId)?._onPromiseResolve();
349 } finally {
350 _asyncResourceGraphHookActive = false;
351 }
352 },
353 // NOTE: While it's nice for cleanups we leave out the `destroy`
354 // hook since it has performance implications according to the docs
355});
356
357function fiberWatchdog<T>(
358 fiber: AsyncResourceFiber,
359 params: FiberParams,
360 promise: Promise<T>
361): Promise<T> {
362 try {
363 let watchdogImmediate: NodeJS.Immediate | void;
364 fiber.root.active = false;
365
366 const { abort } = params;
367 const { parentFiberIds } = fiber;
368
369 const pendingExecutionTargets = new Set<AsyncResourceNode>();
370 const watchdogResult = promiseWithReject(promise, () => {
371 if (watchdogImmediate)
372 watchdogImmediate = clearImmediate(watchdogImmediate);
373 });
374
375 function lastExecutionTarget(): AsyncResourceNode {
376 if (pendingExecutionTargets.size) {
377 const targets = [...pendingExecutionTargets.values()];
378 return targets[targets.length - 1];
379 } else {
380 const targets = fiber.executionTargets;
381 return targets[targets.length - 1] || fiber.root;
382 }
383 }
384
385 function assertFiberAbort(node: AsyncResourceNode) {
386 if (node.flags & AsyncResourceFlags.ABORTED) {
387 assertFiberError(new FiberError('FIBER_ABORTED', fiber, node));
388 } else if (
389 node.triggerOrigin &&
390 node.triggerOrigin.flags & AsyncResourceFlags.ABORTED
391 ) {
392 if (node.triggerOrigin.fiberId === fiber.fiberId) {
393 assertFiberError(
394 new FiberError('FIBER_ABORTED', fiber, node.triggerOrigin)
395 );
396 } else {
397 assertFiberError(
398 new FiberError('FOREIGN_ASYNC_ABORTED', fiber, node.triggerOrigin)
399 );
400 }
401 } else if (abort?.aborted) {
402 assertFiberError(abort.reason);
403 if (_asyncResourceGraphHookActive) abort.throwIfAborted();
404 }
405 }
406
407 function assertFiberError(error: Error) {
408 watchdogResult.reject(error);
409 if (_asyncResourceGraphHookActive) throw error;
410 }
411
412 function assertFiberOwnership(node: AsyncResourceNode) {
413 const { triggerOrigin } = node;
414 if (node.fiberId !== fiber.fiberId || !triggerOrigin) {
415 // We only check triggers for nodes owned by the current fiber
416 } else if (triggerOrigin === fiber.root) {
417 // Immediately invoked async IO on the fiber root are ignored
418 } else if (triggerOrigin.fiberId === node.fiberId) {
419 // If the trigger is a node in the same fiber, this is allowed
420 } else {
421 const isParentFiberTrigger = parentFiberIds.includes(
422 triggerOrigin.fiberId
423 );
424 if (!isParentFiberTrigger) {
425 assertFiberError(
426 new FiberError('FOREIGN_ASYNC_TRIGGER', fiber, node)
427 );
428 } else {
429 // TODO: Selectively allow resolved promises
430 assertFiberError(new FiberError('PARENT_ASYNC_TRIGGER', fiber, node));
431 }
432 }
433 }
434
435 function stallWatchdog() {
436 watchdogImmediate = undefined;
437 if (abort?.aborted) {
438 return;
439 }
440 let hasAsyncIO = false;
441 for (const asyncNode of pendingExecutionTargets) {
442 if (
443 (asyncNode.flags & AsyncResourceFlags.FINALIZED) === 0 &&
444 asyncNode.type !== 'PROMISE'
445 ) {
446 hasAsyncIO = true;
447 break;
448 }
449 }
450 if (!hasAsyncIO) {
451 watchdogResult.reject(
452 new FiberError('FIBER_STALL', fiber, lastExecutionTarget())
453 );
454 }
455 }
456
457 const scheduleCheck = () =>
458 watchdogImmediate || (watchdogImmediate = setImmediate(stallWatchdog));
459 watchdogImmediate = setImmediate(stallWatchdog);
460
461 abort?.addEventListener('abort', () => {
462 taintAsyncResourceGraph(
463 fiber.root,
464 AsyncResourceFlags.FINALIZED,
465 AsyncResourceFlags.ABORTED
466 );
467 });
468
469 const observer = new AsyncResourceObserver(fiber, (event, node) => {
470 scheduleCheck();
471 switch (event) {
472 case AsyncResourceFlags.INIT:
473 assertFiberOwnership(node);
474 assertFiberAbort(node);
475 pendingExecutionTargets.add(node);
476 return;
477 case AsyncResourceFlags.POST_EXECUTION:
478 pendingExecutionTargets.delete(node);
479 return;
480 case AsyncResourceFlags.RESOLVED:
481 assertFiberAbort(node);
482 pendingExecutionTargets.delete(node);
483 return;
484 default:
485 return;
486 }
487 });
488
489 const stack: AsyncResourceNode[] = [];
490 for (const target of fiber.root.executionTargets.values()) {
491 if (target.fiberId === fiber.fiberId) {
492 stack.push(target);
493 observer.observe(target);
494 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) {
495 assertFiberOwnership(target);
496 pendingExecutionTargets.add(target);
497 }
498 }
499 }
500 let pointer: AsyncResourceNode | undefined;
501 while ((pointer = stack.pop())) {
502 for (const target of pointer.executionTargets.values()) {
503 if (!observer.isObserved(target)) {
504 stack.push(target);
505 observer.observe(target);
506 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) {
507 assertFiberOwnership(target);
508 pendingExecutionTargets.add(target);
509 }
510 }
511 }
512 }
513
514 return watchdogResult.promise;
515 } finally {
516 fiber.root.active = false;
517 }
518}
519
520/** Enable async resource graph tracking and initialize root fiber if needed.
521 * @remarks
522 * Call this as early as async resources are created that other fibers depend
523 * on. No async resources outside of the root fiber will be tracked!
524 */
525export function enable(): AsyncResourceFiber {
526 return getFiber() || new AsyncResourceFiber(getStackFrame(0)).enable();
527}
528
529/** Disable top-level fiber
530 * @remarks
531 * If a root fiber has been created with `enable()`, calling this function
532 * allows all fiber data to be garbage collected.
533 */
534export function disable(): AsyncResourceFiber | null {
535 return getFiber()?.disable() || null;
536}
537
538/** Get currently active fiber */
539export function getFiber(): AsyncResourceFiber | null {
540 for (let idx = fiberStack.length - 1; idx >= 0; idx--)
541 if (fiberStack[idx].active) return fiberStack[idx];
542 return null;
543}
544
545/** Returns an arbitrary async resource's descriptor
546 * @remarks
547 * WARN: This will only work for async resource objects that have been
548 * created in a fiber.
549 */
550export function getFiberNode(resource: object): AsyncResourceNode | undefined {
551 return resource[fiberRef];
552}
553
554export interface FiberParams {
555 abort?: AbortSignal;
556}
557
558/** Create a fiber and execute it, returning both the result of `fn` and its fiber
559 * @remarks
560 * While this function returns synchronously, it will track all async resources
561 * that the passed function has created.
562 */
563export function fiber<T>(
564 fn: () => Promise<T>,
565 params: FiberParams = {}
566): { return: Promise<T>; fiber: AsyncResourceFiber } {
567 const fiber = new AsyncResourceFiber(getStackFrame(0));
568 try {
569 fiber.enable();
570 return { return: fiberWatchdog(fiber, params, fn()), fiber };
571 } finally {
572 fiber.disable();
573 }
574}