WIP: Node.js isolation primitive to run asynchronous worker-like operations without leaking async IO
at main 18 kB view raw
1import { 2 executionAsyncId as _getExecutionAsyncId, 3 executionAsyncResource as _getExecutionAsyncResource, 4 createHook, 5} from 'node:async_hooks'; 6import { AsyncResourceKind } from './constants'; 7import { StackFrame, getStackFrame, promiseWithReject } from './utils'; 8import { FiberError } from './errors'; 9 10export const enum AsyncResourceFlags { 11 INIT = 0, 12 /** The node's `before` trigger was invoked. Work must have started */ 13 PRE_EXECUTION = 1 << 0, 14 /** The node's `after` trigger was invoked. Work must have been completed */ 15 POST_EXECUTION = 1 << 1, 16 /** The node (which is a promise) has been resolved and triggered subsequent promises */ 17 RESOLVED = 1 << 2, 18 /** The node has been created in a context by a cancelled fiber */ 19 ABORTED = 1 << 3, 20 21 /** If these flags are set it indicates that the resource is no longer blocking work it's a trigger for */ 22 FINALIZED = AsyncResourceFlags.POST_EXECUTION | AsyncResourceFlags.RESOLVED, 23} 24 25type AsyncResourceObserverFn = ( 26 event: AsyncResourceFlags, 27 node: AsyncResourceNode 28) => void; 29 30class AsyncResourceObserver { 31 fiberId: number; 32 callback: AsyncResourceObserverFn; 33 constructor(fiber: AsyncResourceFiber, callback: AsyncResourceObserverFn) { 34 this.fiberId = fiber.fiberId; 35 this.callback = callback; 36 } 37 38 _onInit(node: AsyncResourceNode) { 39 this.callback(AsyncResourceFlags.INIT, node); 40 if (node.fiberId === this.fiberId) { 41 this.observe(node); 42 } 43 } 44 45 _onBefore(node: AsyncResourceNode) { 46 this.callback(AsyncResourceFlags.PRE_EXECUTION, node); 47 } 48 49 _onAfter(node: AsyncResourceNode) { 50 this.callback(AsyncResourceFlags.POST_EXECUTION, node); 51 node.notifyObserver = null; 52 } 53 54 _onPromiseResolve(node: AsyncResourceNode) { 55 this.callback(AsyncResourceFlags.RESOLVED, node); 56 node.notifyObserver = null; 57 } 58 59 isObserved(node: AsyncResourceNode): boolean { 60 return node.notifyObserver === this; 61 } 62 63 observe(node: AsyncResourceNode): void { 64 if (node.notifyObserver !== null && node.notifyObserver !== this) { 65 throw new TypeError( 66 'Only one observer can be attached to a node at a time.\n' + 67 `Node (${node.asyncId}) is already being observed` 68 ); 69 } 70 node.notifyObserver = this; 71 } 72} 73 74type RawAsyncResource = object & { [K in typeof fiberRef]?: AsyncResourceNode }; 75 76const fiberRef = Symbol('async_resource_node'); 77const fiberStack: AsyncResourceFiber[] = []; 78 79const getExecutionAsyncResource: () => RawAsyncResource = 80 _getExecutionAsyncResource; 81const getExecutionAsyncId = _getExecutionAsyncId; 82 83let _fiberIdx = 1; 84 85export class AsyncResourceFiber { 86 readonly fiberId = _fiberIdx++; 87 readonly root: AsyncResourceNode; 88 89 active = false; 90 parent: AsyncResourceFiber | null; 91 frame: StackFrame | null; 92 93 constructor(frame: StackFrame | null) { 94 // Fibers are created as children of the current fiber, 95 // but their root resource will be the current execution context, 96 // even if it's deeper into the parent fiber's async graph 97 const executionAsyncResource = getExecutionAsyncResource(); 98 let root = executionAsyncResource[fiberRef]; 99 if (root === undefined) { 100 // WARN: This must imply `!this.parent` 101 // If no parent fiber exists, this is the root fiber, and 102 // empty state is created for its resource node 103 root = new AsyncResourceNode( 104 this.fiberId, 105 getExecutionAsyncId(), 106 null, 107 frame 108 ); 109 executionAsyncResource[fiberRef] = root; 110 } 111 this.root = root; 112 this.parent = getFiber(); 113 this.frame = frame; 114 } 115 116 enable() { 117 if (!this.active) { 118 // Set the current execution context's fiber to be the current fiber 119 this.root.fiberId = this.fiberId; 120 this.active = true; 121 fiberStack.push(this); 122 asyncResourceGraphHook.enable(); 123 } 124 return this; 125 } 126 127 disable() { 128 fiberStack.splice(fiberStack.indexOf(this), 1); 129 if (!fiberStack.length) asyncResourceGraphHook.disable(); 130 // Reset the context's fiber. This must use the stack since fibers 131 // may be created out of order and `this.parent` may not be 132 // the same as the stack's fiber 133 this.root.fiberId = getFiber()?.fiberId || 0; 134 this.active = false; 135 return this; 136 } 137 138 get parentFiberIds(): readonly number[] { 139 const parentFiberIds: number[] = []; 140 let fiber: AsyncResourceFiber | null = this; 141 while ((fiber = fiber.parent) !== null) parentFiberIds.push(fiber.fiberId); 142 return parentFiberIds; 143 } 144 145 /** Returns amount of pending tasks in the fiber */ 146 get pending() { 147 const countExecutionTargets = (node: AsyncResourceNode) => { 148 let count = 0; 149 // To count all pending tasks, we check all resources that 150 // have been created on the fiber recursively and count all 151 // non-finalized ones 152 for (const target of node.executionTargets.values()) { 153 if (target.fiberId === this.fiberId) { 154 count += countExecutionTargets(target); 155 count += target.flags & AsyncResourceFlags.FINALIZED ? 0 : 1; 156 } 157 } 158 return count; 159 }; 160 // Pending tasks may be counted excluding the top-level resource, 161 // as it presumably will still be in progress, but shouldn't be 162 // considered to be blocking. 163 return countExecutionTargets(this.root); 164 } 165 166 get executionTargets(): AsyncResourceNode[] { 167 return [...this.root.executionTargets.values()].filter(node => { 168 return node.fiberId === this.fiberId; 169 }); 170 } 171 172 toString() { 173 return `[Fiber: ${this.fiberId}]`; 174 } 175} 176 177/** Descriptors for an async resource (akin to IO tasks) */ 178export class AsyncResourceNode { 179 asyncId: number; 180 fiberId: number; 181 type: AsyncResourceKind | null; 182 active: boolean; 183 184 /** If available, a stacktrace frame pointing to the resource's initialization */ 185 frame: StackFrame | null; 186 187 /** An execution context's descriptor this async resource was created in */ 188 executionOrigin: AsyncResourceNode | null; 189 /** An async resource's descriptor which will, upon completion, trigger this async resource 190 * @remarks 191 * This will be identical to `executionOrigin` if the resource was triggerd synchronously 192 */ 193 triggerOrigin: AsyncResourceNode | null; 194 195 /** Async resources that this resource has created (sub-tasks) */ 196 executionTargets: Map<number, AsyncResourceNode>; 197 /** Async resources that this resource will trigger upon completion (follow-up tasks) */ 198 triggerTargets: Map<number, AsyncResourceNode>; 199 200 /** Descriptions and completion state of this node */ 201 flags: AsyncResourceFlags; 202 203 /** Observer that should be notified about changes */ 204 notifyObserver: AsyncResourceObserver | null; 205 206 constructor( 207 fiberId: number, 208 asyncId: number, 209 type: string | null, 210 frame: StackFrame | null 211 ) { 212 this.active = true; 213 this.asyncId = asyncId; 214 this.fiberId = fiberId; 215 this.type = type as AsyncResourceKind; 216 this.frame = frame; 217 this.executionOrigin = null; 218 this.triggerOrigin = null; 219 this.executionTargets = new Map(); 220 this.triggerTargets = new Map(); 221 this.flags = AsyncResourceFlags.INIT; 222 this.notifyObserver = null; 223 } 224 225 _onExecute( 226 asyncId: number, 227 type: string, 228 triggerAsyncId: number, 229 resource: RawAsyncResource 230 ) { 231 if (!this.active) { 232 return; 233 } 234 // This method is called on an execution context's descriptor that created a new async resource. 235 // Hence, we create a new child async resource here 236 const frame = getStackFrame(1); 237 const node = new AsyncResourceNode(this.fiberId, asyncId, type, frame); 238 node.executionOrigin = this; 239 resource[fiberRef] = node; 240 this.executionTargets.set(asyncId, node); 241 const triggerNode = 242 asyncId !== triggerAsyncId 243 ? getAsyncResourceNode(triggerAsyncId) 244 : undefined; 245 if (triggerNode) { 246 node.triggerOrigin = triggerNode; 247 triggerNode.triggerTargets.set(asyncId, node); 248 } 249 if (this.notifyObserver) { 250 this.notifyObserver._onInit(node); 251 } 252 } 253 254 _onBefore() { 255 this.flags |= AsyncResourceFlags.PRE_EXECUTION; 256 if (this.active && this.notifyObserver) { 257 this.notifyObserver._onBefore(this); 258 } 259 } 260 261 _onAfter() { 262 this.flags |= AsyncResourceFlags.POST_EXECUTION; 263 if (this.active && this.notifyObserver) { 264 this.notifyObserver._onAfter(this); 265 } 266 } 267 268 _onPromiseResolve() { 269 this.flags |= AsyncResourceFlags.RESOLVED; 270 if (this.active && this.notifyObserver) { 271 this.notifyObserver._onPromiseResolve(this); 272 } 273 } 274 275 toString() { 276 const name = this.type 277 ? `${this.type}(${this.asyncId})` 278 : `Fiber: ${this.fiberId}`; 279 return `[async ${name}]`; 280 } 281} 282 283const taintAsyncResourceGraph = ( 284 node: AsyncResourceNode, 285 mask: AsyncResourceFlags, 286 flags: AsyncResourceFlags 287) => { 288 if ((node.flags & mask & flags) === 0) { 289 node.flags |= flags; 290 for (const target of node.executionTargets.values()) 291 taintAsyncResourceGraph(target, mask, flags); 292 for (const target of node.triggerTargets.values()) 293 taintAsyncResourceGraph(target, mask, flags); 294 } 295}; 296 297const getAsyncResourceNode = ( 298 asyncId: number 299): AsyncResourceNode | undefined => { 300 let executionNode = getExecutionAsyncResource()[fiberRef] ?? null; 301 if (executionNode) { 302 // The `asyncResourceGraphHook`'s callbacks execute inside the fiber's execution context stack. 303 // This means we can find any node by checking all nodes in the current and any parent 304 // execution contexts. 305 let node: AsyncResourceNode | undefined; 306 do { 307 if (executionNode.asyncId === asyncId) return executionNode; 308 if ((node = executionNode.executionTargets.get(asyncId))) return node; 309 } while ((executionNode = executionNode.executionOrigin)); 310 } 311}; 312 313let _asyncResourceGraphHookActive = false; 314 315const asyncResourceGraphHook = createHook({ 316 init( 317 asyncId: number, 318 type: string, 319 triggerAsyncId: number, 320 resource: RawAsyncResource 321 ) { 322 if (!_asyncResourceGraphHookActive) { 323 try { 324 _asyncResourceGraphHookActive = true; 325 const executionNode = getExecutionAsyncResource()[fiberRef]; 326 executionNode?._onExecute(asyncId, type, triggerAsyncId, resource); 327 } finally { 328 _asyncResourceGraphHookActive = false; 329 } 330 } 331 }, 332 before(asyncId: number) { 333 try { 334 _asyncResourceGraphHookActive = true; 335 getAsyncResourceNode(asyncId)?._onBefore(); 336 } finally { 337 _asyncResourceGraphHookActive = false; 338 } 339 }, 340 after(asyncId: number) { 341 try { 342 _asyncResourceGraphHookActive = true; 343 getAsyncResourceNode(asyncId)?._onAfter(); 344 } finally { 345 _asyncResourceGraphHookActive = false; 346 } 347 }, 348 promiseResolve(asyncId: number) { 349 try { 350 _asyncResourceGraphHookActive = true; 351 getAsyncResourceNode(asyncId)?._onPromiseResolve(); 352 } finally { 353 _asyncResourceGraphHookActive = false; 354 } 355 }, 356 // NOTE: While it's nice for cleanups we leave out the `destroy` 357 // hook since it has performance implications according to the docs 358}); 359 360function fiberWatchdog<T>( 361 fiber: AsyncResourceFiber, 362 params: FiberParams, 363 promise: Promise<T> 364): Promise<T> { 365 try { 366 let watchdogImmediate: NodeJS.Immediate | void; 367 fiber.root.active = false; 368 369 const { abort } = params; 370 const { parentFiberIds } = fiber; 371 372 const pendingExecutionTargets = new Set<AsyncResourceNode>(); 373 const watchdogResult = promiseWithReject(promise, () => { 374 if (watchdogImmediate) 375 watchdogImmediate = clearImmediate(watchdogImmediate); 376 }); 377 378 function lastExecutionTarget(): AsyncResourceNode { 379 if (pendingExecutionTargets.size) { 380 const targets = [...pendingExecutionTargets.values()]; 381 return targets[targets.length - 1]; 382 } else { 383 const targets = fiber.executionTargets; 384 return targets[targets.length - 1] || fiber.root; 385 } 386 } 387 388 function assertFiberAbort(node: AsyncResourceNode) { 389 if (node.flags & AsyncResourceFlags.ABORTED) { 390 assertFiberError(new FiberError('FIBER_ABORTED', fiber, node)); 391 } else if ( 392 node.triggerOrigin && 393 node.triggerOrigin.flags & AsyncResourceFlags.ABORTED 394 ) { 395 if (node.triggerOrigin.fiberId === fiber.fiberId) { 396 assertFiberError( 397 new FiberError('FIBER_ABORTED', fiber, node.triggerOrigin) 398 ); 399 } else { 400 assertFiberError( 401 new FiberError('FOREIGN_ASYNC_ABORTED', fiber, node.triggerOrigin) 402 ); 403 } 404 } else if (abort?.aborted) { 405 assertFiberError(abort.reason); 406 if (_asyncResourceGraphHookActive) abort.throwIfAborted(); 407 } 408 } 409 410 function assertFiberError(error: Error) { 411 watchdogResult.reject(error); 412 if (_asyncResourceGraphHookActive) throw error; 413 } 414 415 function assertFiberOwnership(node: AsyncResourceNode) { 416 const { triggerOrigin } = node; 417 if (node.fiberId !== fiber.fiberId || !triggerOrigin) { 418 // We only check triggers for nodes owned by the current fiber 419 } else if (triggerOrigin === fiber.root) { 420 // Immediately invoked async IO on the fiber root are ignored 421 } else if (triggerOrigin.fiberId === node.fiberId) { 422 // If the trigger is a node in the same fiber, this is allowed 423 } else { 424 const isParentFiberTrigger = parentFiberIds.includes( 425 triggerOrigin.fiberId 426 ); 427 if (!isParentFiberTrigger) { 428 assertFiberError( 429 new FiberError('FOREIGN_ASYNC_TRIGGER', fiber, node) 430 ); 431 } else { 432 // TODO: Selectively allow resolved promises 433 assertFiberError(new FiberError('PARENT_ASYNC_TRIGGER', fiber, node)); 434 } 435 } 436 } 437 438 function stallWatchdog() { 439 watchdogImmediate = undefined; 440 if (abort?.aborted) { 441 return; 442 } 443 let hasAsyncIO = false; 444 for (const asyncNode of pendingExecutionTargets) { 445 if ( 446 (asyncNode.flags & AsyncResourceFlags.FINALIZED) === 0 && 447 asyncNode.type !== 'PROMISE' 448 ) { 449 hasAsyncIO = true; 450 break; 451 } 452 } 453 if (!hasAsyncIO) { 454 watchdogResult.reject( 455 new FiberError('FIBER_STALL', fiber, lastExecutionTarget()) 456 ); 457 } 458 } 459 460 const scheduleCheck = () => 461 watchdogImmediate || (watchdogImmediate = setImmediate(stallWatchdog)); 462 watchdogImmediate = setImmediate(stallWatchdog); 463 464 abort?.addEventListener('abort', () => { 465 taintAsyncResourceGraph( 466 fiber.root, 467 AsyncResourceFlags.FINALIZED, 468 AsyncResourceFlags.ABORTED 469 ); 470 }); 471 472 const observer = new AsyncResourceObserver(fiber, (event, node) => { 473 scheduleCheck(); 474 switch (event) { 475 case AsyncResourceFlags.INIT: 476 assertFiberOwnership(node); 477 assertFiberAbort(node); 478 pendingExecutionTargets.add(node); 479 return; 480 case AsyncResourceFlags.POST_EXECUTION: 481 pendingExecutionTargets.delete(node); 482 return; 483 case AsyncResourceFlags.RESOLVED: 484 assertFiberAbort(node); 485 pendingExecutionTargets.delete(node); 486 return; 487 default: 488 return; 489 } 490 }); 491 492 const stack: AsyncResourceNode[] = []; 493 for (const target of fiber.root.executionTargets.values()) { 494 if (target.fiberId === fiber.fiberId) { 495 stack.push(target); 496 observer.observe(target); 497 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) { 498 assertFiberOwnership(target); 499 pendingExecutionTargets.add(target); 500 } 501 } 502 } 503 let pointer: AsyncResourceNode | undefined; 504 while ((pointer = stack.pop())) { 505 for (const target of pointer.executionTargets.values()) { 506 if (!observer.isObserved(target)) { 507 stack.push(target); 508 observer.observe(target); 509 if ((target.flags & AsyncResourceFlags.FINALIZED) === 0) { 510 assertFiberOwnership(target); 511 pendingExecutionTargets.add(target); 512 } 513 } 514 } 515 } 516 517 return watchdogResult.promise; 518 } finally { 519 fiber.root.active = true; 520 } 521} 522 523/** Enable async resource graph tracking and initialize root fiber if needed. 524 * @remarks 525 * Call this as early as async resources are created that other fibers depend 526 * on. No async resources outside of the root fiber will be tracked! 527 */ 528export function enable(): AsyncResourceFiber { 529 return getFiber() || new AsyncResourceFiber(getStackFrame(0)).enable(); 530} 531 532/** Disable top-level fiber 533 * @remarks 534 * If a root fiber has been created with `enable()`, calling this function 535 * allows all fiber data to be garbage collected. 536 */ 537export function disable(): AsyncResourceFiber | null { 538 return getFiber()?.disable() || null; 539} 540 541/** Get currently active fiber */ 542export function getFiber(): AsyncResourceFiber | null { 543 for (let idx = fiberStack.length - 1; idx >= 0; idx--) 544 if (fiberStack[idx].active) return fiberStack[idx]; 545 return null; 546} 547 548/** Returns an arbitrary async resource's descriptor 549 * @remarks 550 * WARN: This will only work for async resource objects that have been 551 * created in a fiber. 552 */ 553export function getFiberNode(resource: object): AsyncResourceNode | undefined { 554 return resource[fiberRef]; 555} 556 557export interface FiberParams { 558 abort?: AbortSignal; 559} 560 561/** Create a fiber and execute it, returning both the result of `fn` and its fiber 562 * @remarks 563 * While this function returns synchronously, it will track all async resources 564 * that the passed function has created. 565 */ 566export function fiber<T>( 567 fn: () => Promise<T>, 568 params: FiberParams = {} 569): { return: Promise<T>; fiber: AsyncResourceFiber } { 570 const fiber = new AsyncResourceFiber(getStackFrame(0)); 571 try { 572 fiber.enable(); 573 return { return: fiberWatchdog(fiber, params, fn()), fiber }; 574 } finally { 575 fiber.disable(); 576 } 577}