WIP: Node.js isolation primitive to run asynchronous worker-like operations without leaking async IO
1import { 2 executionAsyncId as _getExecutionAsyncId, 3 executionAsyncResource as _getExecutionAsyncResource, 4 createHook, 5} from 'node:async_hooks'; 6import { AsyncResourceKind } from './constants'; 7import { StackFrame, getStackFrame, promiseWithReject } from './utils'; 8import { FiberError } from './errors'; 9 10export const enum AsyncResourceFlags { 11 INIT = 0, 12 /** The node's `before` trigger was invoked. Work must have started */ 13 PRE_EXECUTION = 1 << 0, 14 /** The node's `after` trigger was invoked. Work must have been completed */ 15 POST_EXECUTION = 1 << 1, 16 /** The node (which is a promise) has been resolved and triggered subsequent promises */ 17 RESOLVED = 1 << 2, 18 /** The node has been created in a context by a cancelled fiber */ 19 ABORTED = 1 << 3, 20 21 /** If these flags are set it indicates that the resource is no longer blocking work it's a trigger for */ 22 FINALIZED = AsyncResourceFlags.POST_EXECUTION | AsyncResourceFlags.RESOLVED, 23} 24 25type AsyncResourceObserverFn = ( 26 event: AsyncResourceFlags, 27 node: AsyncResourceNode 28) => void; 29 30class AsyncResourceObserver { 31 fiberId: number; 32 callback: AsyncResourceObserverFn; 33 constructor(fiber: AsyncResourceFiber, callback: AsyncResourceObserverFn) { 34 this.fiberId = fiber.fiberId; 35 this.callback = callback; 36 } 37 38 _onExecute(executionNode: AsyncResourceNode, node: AsyncResourceNode) { 39 this.callback(AsyncResourceFlags.INIT, executionNode); 40 if (node.fiberId === this.fiberId) { 41 this.observe(node); 42 } 43 } 44 45 _onBefore(node: AsyncResourceNode) { 46 this.callback(AsyncResourceFlags.PRE_EXECUTION, node); 47 } 48 49 _onAfter(node: AsyncResourceNode) { 50 this.callback(AsyncResourceFlags.POST_EXECUTION, node); 51 node.notifyObserver = null; 52 } 53 54 _onPromiseResolve(node: AsyncResourceNode) { 55 this.callback(AsyncResourceFlags.RESOLVED, node); 56 node.notifyObserver = null; 57 } 58 59 isObserved(node: AsyncResourceNode): boolean { 60 return node.notifyObserver === this; 61 } 62 63 observe(node: AsyncResourceNode): void { 64 if (node.notifyObserver !== null && node.notifyObserver !== this) { 65 throw new TypeError( 66 'Only one observer can be attached to a node at a time.\n' + 67 `Node (${node.asyncId}) is already being observed` 68 ); 69 } 70 node.notifyObserver = this; 71 } 72} 73 74type RawAsyncResource = object & { [K in typeof fiberRef]?: AsyncResourceNode }; 75 76const fiberRef = Symbol('async_resource_node'); 77const fiberStack: AsyncResourceFiber[] = []; 78 79const getExecutionAsyncResource: () => RawAsyncResource = 80 _getExecutionAsyncResource; 81const getExecutionAsyncId = _getExecutionAsyncId; 82 83let _fiberIdx = 1; 84 85export class AsyncResourceFiber { 86 readonly fiberId = _fiberIdx++; 87 readonly root: AsyncResourceNode; 88 89 active = false; 90 parent: AsyncResourceFiber | null; 91 frame: StackFrame | null; 92 93 constructor(frame: StackFrame | null) { 94 // Fibers are created as children of the current fiber, 95 // but their root resource will be the current execution context, 96 // even if it's deeper into the parent fiber's async graph 97 const executionAsyncResource = getExecutionAsyncResource(); 98 let root = executionAsyncResource[fiberRef]; 99 if (root === undefined) { 100 // WARN: This must imply `!this.parent` 101 // If no parent fiber exists, this is the root fiber, and 102 // empty state is created for its resource node 103 root = new AsyncResourceNode( 104 this.fiberId, 105 getExecutionAsyncId(), 106 null, 107 frame 108 ); 109 executionAsyncResource[fiberRef] = root; 110 } 111 this.root = root; 112 this.parent = getFiber(); 113 this.frame = frame; 114 } 115 116 enable() { 117 if (!this.active) { 118 // Set the current execution context's fiber to be the current fiber 119 this.root.fiberId = this.fiberId; 120 this.active = true; 121 fiberStack.push(this); 122 asyncResourceGraphHook.enable(); 123 } 124 return this; 125 } 126 127 disable() { 128 fiberStack.splice(fiberStack.indexOf(this), 1); 129 if (!fiberStack.length) asyncResourceGraphHook.disable(); 130 // Reset the context's fiber. This must use the stack since fibers 131 // may be created out of order and `this.parent` may not be 132 // the same as the stack's fiber 133 this.root.fiberId = getFiber()?.fiberId || 0; 134 this.active = false; 135 return this; 136 } 137 138 get parentFiberIds(): readonly number[] { 139 const parentFiberIds: number[] = []; 140 let fiber: AsyncResourceFiber | null = this; 141 while ((fiber = fiber.parent) !== null) parentFiberIds.push(fiber.fiberId); 142 return parentFiberIds; 143 } 144 145 /** Returns amount of pending tasks in the fiber */ 146 get pending() { 147 const countExecutionTargets = (node: AsyncResourceNode) => { 148 let count = 0; 149 // To count all pending tasks, we check all resources that 150 // have been created on the fiber recursively and count all 151 // non-finalized ones 152 for (const target of node.executionTargets.values()) { 153 if (target.fiberId === this.fiberId) { 154 count += countExecutionTargets(target); 155 count += target.flags & AsyncResourceFlags.FINALIZED ? 0 : 1; 156 } 157 } 158 return count; 159 }; 160 // Pending tasks may be counted excluding the top-level resource, 161 // as it presumably will still be in progress, but shouldn't be 162 // considered to be blocking. 163 return countExecutionTargets(this.root); 164 } 165 166 get executionTargets(): AsyncResourceNode[] { 167 return [...this.root.executionTargets.values()].filter(node => { 168 return node.fiberId === this.fiberId; 169 }); 170 } 171 172 toString() { 173 return `[Fiber: ${this.fiberId}]`; 174 } 175} 176 177/** Descriptors for an async resource (akin to IO tasks) */ 178export class AsyncResourceNode { 179 asyncId: number; 180 fiberId: number; 181 type: AsyncResourceKind | null; 182 active: boolean; 183 184 /** If available, a stacktrace frame pointing to the resource's initialization */ 185 frame: StackFrame | null; 186 187 /** An execution context's descriptor this async resource was created in */ 188 executionOrigin: AsyncResourceNode | null; 189 /** An async resource's descriptor which will, upon completion, trigger this async resource 190 * @remarks 191 * This will be identical to `executionOrigin` if the resource was triggerd synchronously 192 */ 193 triggerOrigin: AsyncResourceNode | null; 194 195 /** Async resources that this resource has created (sub-tasks) */ 196 executionTargets: Map<number, AsyncResourceNode>; 197 /** Async resources that this resource will trigger upon completion (follow-up tasks) */ 198 triggerTargets: Map<number, AsyncResourceNode>; 199 200 /** Descriptions and completion state of this node */ 201 flags: AsyncResourceFlags; 202 203 /** Observer that should be notified about changes */ 204 notifyObserver: AsyncResourceObserver | null; 205 206 constructor( 207 fiberId: number, 208 asyncId: number, 209 type: string | null, 210 frame: StackFrame | null 211 ) { 212 this.active = true; 213 this.asyncId = asyncId; 214 this.fiberId = fiberId; 215 this.type = type as AsyncResourceKind; 216 this.frame = frame; 217 this.executionOrigin = null; 218 this.triggerOrigin = null; 219 this.executionTargets = new Map(); 220 this.triggerTargets = new Map(); 221 this.flags = AsyncResourceFlags.INIT; 222 this.notifyObserver = null; 223 } 224 225 _onExecute( 226 asyncId: number, 227 type: string, 228 triggerAsyncId: number, 229 resource: RawAsyncResource 230 ) { 231 if (!this.active) { 232 return; 233 } 234 // This method is called on an execution context's descriptor that created a new async resource. 235 // Hence, we create a new child async resource here 236 const frame = getStackFrame(1); 237 const node = new AsyncResourceNode(this.fiberId, asyncId, type, frame); 238 node.executionOrigin = this; 239 resource[fiberRef] = node; 240 this.executionTargets.set(asyncId, node); 241 const triggerNode = 242 asyncId !== triggerAsyncId 243 ? getAsyncResourceNode(triggerAsyncId) 244 : undefined; 245 if (triggerNode) { 246 node.triggerOrigin = triggerNode; 247 triggerNode.triggerTargets.set(asyncId, node); 248 } 249 if (this.notifyObserver) { 250 this.notifyObserver._onExecute(this, node); 251 } 252 } 253 254 _onBefore() { 255 this.flags |= AsyncResourceFlags.PRE_EXECUTION; 256 if (this.active && this.notifyObserver) { 257 this.notifyObserver._onBefore(this); 258 } 259 } 260 261 _onAfter() { 262 this.flags |= AsyncResourceFlags.POST_EXECUTION; 263 if (this.active && this.notifyObserver) { 264 this.notifyObserver._onAfter(this); 265 } 266 } 267 268 _onPromiseResolve() { 269 this.flags |= AsyncResourceFlags.RESOLVED; 270 if (this.active && this.notifyObserver) { 271 this.notifyObserver._onPromiseResolve(this); 272 } 273 } 274 275 toString() { 276 return `[async ${this.type || `Fiber: ${this.fiberId}`}]`; 277 } 278} 279 280const taintAsyncResourceGraph = ( 281 node: AsyncResourceNode, 282 mask: AsyncResourceFlags, 283 flags: AsyncResourceFlags 284) => { 285 if ((node.flags & mask & flags) === 0) { 286 node.flags |= flags; 287 for (const target of node.executionTargets.values()) 288 taintAsyncResourceGraph(target, mask, flags); 289 for (const target of node.triggerTargets.values()) 290 taintAsyncResourceGraph(target, mask, flags); 291 } 292}; 293 294const getAsyncResourceNode = ( 295 asyncId: number 296): AsyncResourceNode | undefined => { 297 let executionNode = getExecutionAsyncResource()[fiberRef] ?? null; 298 if (executionNode) { 299 // The `asyncResourceGraphHook`'s callbacks execute inside the fiber's execution context stack. 300 // This means we can find any node by checking all nodes in the current and any parent 301 // execution contexts. 302 let node: AsyncResourceNode | undefined; 303 do { 304 if (executionNode.asyncId === asyncId) return executionNode; 305 if ((node = executionNode.executionTargets.get(asyncId))) return node; 306 } while ((executionNode = executionNode.executionOrigin)); 307 } 308}; 309 310let _asyncResourceGraphHookActive = false; 311 312const asyncResourceGraphHook = createHook({ 313 init( 314 asyncId: number, 315 type: string, 316 triggerAsyncId: number, 317 resource: RawAsyncResource 318 ) { 319 if (!_asyncResourceGraphHookActive) { 320 try { 321 _asyncResourceGraphHookActive = true; 322 const executionNode = getExecutionAsyncResource()[fiberRef]; 323 executionNode?._onExecute(asyncId, type, triggerAsyncId, resource); 324 } finally { 325 _asyncResourceGraphHookActive = false; 326 } 327 } 328 }, 329 before(asyncId: number) { 330 try { 331 _asyncResourceGraphHookActive = true; 332 getAsyncResourceNode(asyncId)?._onBefore(); 333 } finally { 334 _asyncResourceGraphHookActive = false; 335 } 336 }, 337 after(asyncId: number) { 338 try { 339 _asyncResourceGraphHookActive = true; 340 getAsyncResourceNode(asyncId)?._onAfter(); 341 } finally { 342 _asyncResourceGraphHookActive = false; 343 } 344 }, 345 promiseResolve(asyncId: number) { 346 try { 347 _asyncResourceGraphHookActive = true; 348 getAsyncResourceNode(asyncId)?._onPromiseResolve(); 349 } finally { 350 _asyncResourceGraphHookActive = false; 351 } 352 }, 353 // NOTE: While it's nice for cleanups we leave out the `destroy` 354 // hook since it has performance implications according to the docs 355}); 356 357function fiberWatchdog<T>( 358 fiber: AsyncResourceFiber, 359 params: FiberParams, 360 promise: Promise<T> 361): Promise<T> { 362 try { 363 let watchdogImmediate: NodeJS.Immediate | void; 364 fiber.root.active = false; 365 366 const { abort } = params; 367 const { parentFiberIds } = fiber; 368 369 const pendingExecutionTargets = new Set<AsyncResourceNode>(); 370 const watchdogResult = promiseWithReject(promise, () => { 371 if (watchdogImmediate) 372 watchdogImmediate = clearImmediate(watchdogImmediate); 373 }); 374 375 function lastExecutionTarget(): AsyncResourceNode { 376 if (pendingExecutionTargets.size) { 377 const targets = [...pendingExecutionTargets.values()]; 378 return targets[targets.length - 1]; 379 } else { 380 const targets = fiber.executionTargets; 381 return targets[targets.length - 1] || fiber.root; 382 } 383 } 384 385 function assertFiberAbort(node: AsyncResourceNode) { 386 if (node.flags & AsyncResourceFlags.ABORTED) { 387 assertFiberError(new FiberError('FIBER_ABORTED', fiber, node)); 388 } else if ( 389 node.triggerOrigin && 390 node.triggerOrigin.flags & AsyncResourceFlags.ABORTED 391 ) { 392 if (node.triggerOrigin.fiberId === fiber.fiberId) { 393 assertFiberError( 394 new FiberError('FIBER_ABORTED', fiber, node.triggerOrigin) 395 ); 396 } else { 397 assertFiberError( 398 new FiberError('FOREIGN_ASYNC_ABORTED', fiber, node.triggerOrigin) 399 ); 400 } 401 } else if (abort?.aborted) { 402 assertFiberError(abort.reason); 403 if (_asyncResourceGraphHookActive) abort.throwIfAborted(); 404 } 405 } 406 407 function assertFiberError(error: Error) { 408 watchdogResult.reject(error); 409 if (_asyncResourceGraphHookActive) throw error; 410 } 411 412 function assertFiberOwnership(node: AsyncResourceNode) { 413 const { triggerOrigin } = node; 414 if (node.fiberId !== fiber.fiberId || !triggerOrigin) { 415 // We only check triggers for nodes owned by the current fiber 416 } else if (triggerOrigin === fiber.root) { 417 // Immediately invoked async IO on the fiber root are ignored 418 } else if (triggerOrigin.fiberId === node.fiberId) { 419 // If the trigger is a node in the same fiber, this is allowed 420 } else { 421 const isParentFiberTrigger = parentFiberIds.includes( 422 triggerOrigin.fiberId 423 ); 424 if (!isParentFiberTrigger) { 425 assertFiberError( 426 new FiberError('FOREIGN_ASYNC_TRIGGER', fiber, node) 427 ); 428 } else { 429 // TODO: Selectively allow resolved promises 430 assertFiberError(new FiberError('PARENT_ASYNC_TRIGGER', fiber, node)); 431 } 432 } 433 } 434 435 function stallWatchdog() { 436 watchdogImmediate = undefined; 437 if (abort?.aborted) { 438 return; 439 } 440 let hasAsyncIO = false; 441 for (const asyncNode of pendingExecutionTargets) { 442 if ( 443 (asyncNode.flags & AsyncResourceFlags.FINALIZED) === 0 && 444 asyncNode.type !== 'PROMISE' 445 ) { 446 hasAsyncIO = true; 447 break; 448 } 449 } 450 if (!hasAsyncIO) { 451 watchdogResult.reject( 452 new FiberError('FIBER_STALL', fiber, lastExecutionTarget()) 453 ); 454 } 455 } 456 457 const scheduleCheck = () => 458 watchdogImmediate || (watchdogImmediate = setImmediate(stallWatchdog)); 459 watchdogImmediate = setImmediate(stallWatchdog); 460 461 abort?.addEventListener('abort', () => { 462 taintAsyncResourceGraph( 463 fiber.root, 464 AsyncResourceFlags.FINALIZED, 465 AsyncResourceFlags.ABORTED 466 ); 467 }); 468 469 const observer = new AsyncResourceObserver(fiber, (event, node) => { 470 scheduleCheck(); 471 switch (event) { 472 case AsyncResourceFlags.INIT: 473 assertFiberOwnership(node); 474 assertFiberAbort(node); 475 pendingExecutionTargets.add(node); 476 return; 477 case AsyncResourceFlags.POST_EXECUTION: 478 pendingExecutionTargets.delete(node); 479 return; 480 case AsyncResourceFlags.RESOLVED: 481 assertFiberAbort(node); 482 pendingExecutionTargets.delete(node); 483 return; 484 default: 485 return; 486 } 487 }); 488 489 const stack: AsyncResourceNode[] = []; 490 for (const target of fiber.root.executionTargets.values()) { 491 if (target.fiberId === fiber.fiberId) { 492 stack.push(target); 493 observer.observe(target); 494 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) { 495 assertFiberOwnership(target); 496 pendingExecutionTargets.add(target); 497 } 498 } 499 } 500 let pointer: AsyncResourceNode | undefined; 501 while ((pointer = stack.pop())) { 502 for (const target of pointer.executionTargets.values()) { 503 if (!observer.isObserved(target)) { 504 stack.push(target); 505 observer.observe(target); 506 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) { 507 assertFiberOwnership(target); 508 pendingExecutionTargets.add(target); 509 } 510 } 511 } 512 } 513 514 return watchdogResult.promise; 515 } finally { 516 fiber.root.active = false; 517 } 518} 519 520/** Enable async resource graph tracking and initialize root fiber if needed. 521 * @remarks 522 * Call this as early as async resources are created that other fibers depend 523 * on. No async resources outside of the root fiber will be tracked! 524 */ 525export function enable(): AsyncResourceFiber { 526 return getFiber() || new AsyncResourceFiber(getStackFrame(0)).enable(); 527} 528 529/** Disable top-level fiber 530 * @remarks 531 * If a root fiber has been created with `enable()`, calling this function 532 * allows all fiber data to be garbage collected. 533 */ 534export function disable(): AsyncResourceFiber | null { 535 return getFiber()?.disable() || null; 536} 537 538/** Get currently active fiber */ 539export function getFiber(): AsyncResourceFiber | null { 540 for (let idx = fiberStack.length - 1; idx >= 0; idx--) 541 if (fiberStack[idx].active) return fiberStack[idx]; 542 return null; 543} 544 545/** Returns an arbitrary async resource's descriptor 546 * @remarks 547 * WARN: This will only work for async resource objects that have been 548 * created in a fiber. 549 */ 550export function getFiberNode(resource: object): AsyncResourceNode | undefined { 551 return resource[fiberRef]; 552} 553 554export interface FiberParams { 555 abort?: AbortSignal; 556} 557 558/** Create a fiber and execute it, returning both the result of `fn` and its fiber 559 * @remarks 560 * While this function returns synchronously, it will track all async resources 561 * that the passed function has created. 562 */ 563export function fiber<T>( 564 fn: () => Promise<T>, 565 params: FiberParams = {} 566): { return: Promise<T>; fiber: AsyncResourceFiber } { 567 const fiber = new AsyncResourceFiber(getStackFrame(0)); 568 try { 569 fiber.enable(); 570 return { return: fiberWatchdog(fiber, params, fn()), fiber }; 571 } finally { 572 fiber.disable(); 573 } 574}