1import 'dart:async'; 2import 'dart:convert'; 3import 'dart:math' as math; 4 5import '../errors/auth_method_unsatisfiable_error.dart'; 6import '../errors/token_invalid_error.dart'; 7import '../errors/token_refresh_error.dart'; 8import '../errors/token_revoked_error.dart'; 9import '../oauth/client_auth.dart' show ClientAuthMethod; 10import '../oauth/oauth_server_agent.dart'; 11import '../oauth/oauth_server_factory.dart'; 12import '../runtime/runtime.dart'; 13import '../util.dart'; 14import 'oauth_session.dart'; 15 16/// Options for getting a cached value. 17class GetCachedOptions { 18 /// Cancellation token for aborting the operation 19 final CancellationToken? signal; 20 21 /// Do not use the cache to get the value. Always get a new value. 22 final bool? noCache; 23 24 /// Allow returning stale values from the cache. 25 final bool? allowStale; 26 27 const GetCachedOptions({this.signal, this.noCache, this.allowStale}); 28} 29 30/// Abstract storage interface for values. 31/// 32/// This is a generic key-value store interface. 33abstract class SimpleStore<K, V> { 34 /// Gets a value from the store. 35 /// 36 /// Returns `null` if the key doesn't exist. 37 Future<V?> get(K key, {CancellationToken? signal}); 38 39 /// Sets a value in the store. 40 Future<void> set(K key, V value); 41 42 /// Deletes a value from the store. 43 Future<void> del(K key); 44 45 /// Optionally clears all values from the store. 46 Future<void> clear() async {} 47} 48 49/// Type alias for session storage 50typedef SessionStore = SimpleStore<String, Session>; 51 52/// Details of a session update event. 53class SessionUpdatedEvent { 54 /// The subject (user's DID) 55 final String sub; 56 57 /// The DPoP key 58 final Map<String, dynamic> dpopKey; 59 60 /// The authentication method 61 final String? authMethod; 62 63 /// The token set 64 final TokenSet tokenSet; 65 66 const SessionUpdatedEvent({ 67 required this.sub, 68 required this.dpopKey, 69 this.authMethod, 70 required this.tokenSet, 71 }); 72} 73 74/// Details of a session deletion event. 75class SessionDeletedEvent { 76 /// The subject (user's DID) 77 final String sub; 78 79 /// The cause of deletion 80 final Object cause; 81 82 const SessionDeletedEvent({required this.sub, required this.cause}); 83} 84 85/// Manages session retrieval, caching, and refreshing. 86/// 87/// The SessionGetter wraps a session store and provides: 88/// - Automatic token refresh when tokens are stale/expired 89/// - Caching to avoid redundant refresh operations 90/// - Events for session updates and deletions 91/// - Concurrency control to prevent multiple simultaneous refreshes 92/// 93/// This is a critical component that ensures at most one token refresh 94/// is happening at a time for a given user, even across multiple tabs 95/// or app instances. 96/// 97/// Example: 98/// ```dart 99/// final sessionGetter = SessionGetter( 100/// sessionStore: mySessionStore, 101/// serverFactory: myServerFactory, 102/// runtime: myRuntime, 103/// ); 104/// 105/// // Listen for session updates 106/// sessionGetter.onUpdated.listen((event) { 107/// print('Session updated for ${event.sub}'); 108/// }); 109/// 110/// // Listen for session deletions 111/// sessionGetter.onDeleted.listen((event) { 112/// print('Session deleted for ${event.sub}: ${event.cause}'); 113/// }); 114/// 115/// // Get a session (automatically refreshes if expired) 116/// final session = await sessionGetter.getSession('did:plc:abc123'); 117/// 118/// // Force refresh 119/// final freshSession = await sessionGetter.getSession('did:plc:abc123', true); 120/// ``` 121class SessionGetter extends CachedGetter<AtprotoDid, Session> { 122 final OAuthServerFactory _serverFactory; 123 final Runtime _runtime; 124 125 final _eventTarget = CustomEventTarget<Map<String, dynamic>>(); 126 final _updatedController = StreamController<SessionUpdatedEvent>.broadcast(); 127 final _deletedController = StreamController<SessionDeletedEvent>.broadcast(); 128 129 /// Stream of session update events. 130 Stream<SessionUpdatedEvent> get onUpdated => _updatedController.stream; 131 132 /// Stream of session deletion events. 133 Stream<SessionDeletedEvent> get onDeleted => _deletedController.stream; 134 135 SessionGetter({ 136 required super.sessionStore, 137 required OAuthServerFactory serverFactory, 138 required Runtime runtime, 139 }) : _serverFactory = serverFactory, 140 _runtime = runtime, 141 super( 142 getter: null, // Will be set in _createGetter 143 options: CachedGetterOptions( 144 isStale: (sub, session) { 145 final tokenSet = session.tokenSet; 146 if (tokenSet.expiresAt == null) return false; 147 148 final expiresAt = DateTime.parse(tokenSet.expiresAt!); 149 final now = DateTime.now(); 150 151 // Add some lee way to ensure the token is not expired when it 152 // reaches the server (10 seconds) 153 // Add some randomness to reduce the chances of multiple 154 // instances trying to refresh the token at the same time (0-30 seconds) 155 final buffer = Duration( 156 milliseconds: 157 10000 + (math.Random().nextDouble() * 30000).toInt(), 158 ); 159 160 return expiresAt.isBefore(now.add(buffer)); 161 }, 162 onStoreError: (err, sub, session) async { 163 if (err is! AuthMethodUnsatisfiableError) { 164 // If the error was an AuthMethodUnsatisfiableError, there is no 165 // point in trying to call `fromIssuer`. 166 try { 167 // Parse authMethod 168 final authMethodValue = session.authMethod; 169 final authMethod = 170 authMethodValue is Map<String, dynamic> 171 ? ClientAuthMethod.fromJson(authMethodValue) 172 : (authMethodValue as String?) ?? 'legacy'; 173 174 // Generate new DPoP key for revocation 175 // (stored key is serialized and can't be directly used) 176 final dpopKeyAlgs = ['ES256', 'RS256']; 177 final newDpopKey = await runtime.generateKey(dpopKeyAlgs); 178 179 // If the token data cannot be stored, let's revoke it 180 final server = await serverFactory.fromIssuer( 181 session.tokenSet.iss, 182 authMethod, 183 newDpopKey, 184 ); 185 await server.revoke( 186 session.tokenSet.refreshToken ?? 187 session.tokenSet.accessToken, 188 ); 189 } catch (_) { 190 // Let the original error propagate 191 } 192 } 193 194 throw err; 195 }, 196 deleteOnError: (err) async { 197 return err is TokenRefreshError || 198 err is TokenRevokedError || 199 err is TokenInvalidError || 200 err is AuthMethodUnsatisfiableError; 201 }, 202 ), 203 ) { 204 // Set the getter function after construction 205 _getter = _createGetter(); 206 } 207 208 /// Creates the getter function for refreshing sessions. 209 Future<Session> Function(AtprotoDid, GetCachedOptions, Session?) 210 _createGetter() { 211 return (sub, options, storedSession) async { 212 // There needs to be a previous session to be able to refresh. If 213 // storedSession is null, it means that the store does not contain 214 // a session for the given sub. 215 if (storedSession == null) { 216 // Because the session is not in the store, delStored() method 217 // will not be called by the CachedGetter class (because there is 218 // nothing to delete). This would typically happen if there is no 219 // synchronization mechanism between instances of this class. Let's 220 // make sure an event is dispatched here if this occurs. 221 const msg = 'The session was deleted by another process'; 222 final cause = TokenRefreshError(sub, msg); 223 _dispatchDeletedEvent(sub, cause); 224 throw cause; 225 } 226 227 // From this point forward, throwing a TokenRefreshError will result in 228 // delStored() being called, resulting in an event being dispatched, 229 // even if the session was removed from the store through a concurrent 230 // access (which, normally, should not happen if a proper runtime lock 231 // was provided). 232 233 final dpopKey = storedSession.dpopKey; 234 // authMethod can be a Map (serialized ClientAuthMethod) or String ('legacy') 235 final authMethodValue = storedSession.authMethod; 236 final authMethod = 237 authMethodValue is Map<String, dynamic> 238 ? ClientAuthMethod.fromJson(authMethodValue) 239 : (authMethodValue as String?) ?? 'legacy'; 240 final tokenSet = storedSession.tokenSet; 241 242 if (sub != tokenSet.sub) { 243 // Fool-proofing (e.g. against invalid session storage) 244 throw TokenRefreshError(sub, 'Stored session sub mismatch'); 245 } 246 247 if (tokenSet.refreshToken == null) { 248 throw TokenRefreshError(sub, 'No refresh token available'); 249 } 250 251 // Since refresh tokens can only be used once, we might run into 252 // concurrency issues if multiple instances (e.g. browser tabs) are 253 // trying to refresh the same token simultaneously. The chances of this 254 // happening when multiple instances are started simultaneously is 255 // reduced by randomizing the expiry time (see isStale above). The 256 // best solution is to use a mutex/lock to ensure that only one instance 257 // is refreshing the token at a time (runtime.usingLock) but that is not 258 // always possible. If no lock implementation is provided, we will use 259 // the store to check if a concurrent refresh occurred. 260 261 // TODO: Key Persistence Workaround 262 // The storedSession.dpopKey is a Map<String, dynamic> (serialized JWK), 263 // but OAuthServerFactory.fromIssuer() expects a Key object. 264 // Until Key serialization is implemented (see runtime_implementation.dart), 265 // we generate a new DPoP key for each session refresh. 266 // This works but means tokens are bound to new keys, requiring refresh. 267 final dpopKeyAlgs = ['ES256', 'RS256']; // Supported DPoP algorithms 268 final newDpopKey = await _runtime.generateKey(dpopKeyAlgs); 269 270 final server = await _serverFactory.fromIssuer( 271 tokenSet.iss, 272 authMethod, 273 newDpopKey, 274 ); 275 276 // Because refresh tokens can only be used once, we must not use the 277 // "signal" to abort the refresh, or throw any abort error beyond this 278 // point. Any thrown error beyond this point will prevent the 279 // SessionGetter from obtaining, and storing, the new token set, 280 // effectively rendering the currently saved session unusable. 281 options.signal?.throwIfCancelled(); 282 283 try { 284 final newTokenSet = await server.refresh(tokenSet); 285 286 if (sub != newTokenSet.sub) { 287 // The server returned another sub. Was the tokenSet manipulated? 288 throw TokenRefreshError(sub, 'Token set sub mismatch'); 289 } 290 291 return Session( 292 dpopKey: newDpopKey.bareJwk ?? {}, 293 tokenSet: newTokenSet, 294 authMethod: server.authMethod.toJson(), 295 ); 296 } catch (cause) { 297 // If the refresh token is invalid, let's try to recover from 298 // concurrency issues, or make sure the session is deleted by throwing 299 // a TokenRefreshError. 300 if (cause is OAuthResponseError && 301 cause.status == 400 && 302 cause.error == 'invalid_grant') { 303 // In case there is no lock implementation in the runtime, we will 304 // wait for a short time to give the other concurrent instances a 305 // chance to finish their refreshing of the token. If a concurrent 306 // refresh did occur, we will pretend that this one succeeded. 307 if (!_runtime.hasImplementationLock) { 308 await Future.delayed(Duration(seconds: 1)); 309 310 final stored = await getStored(sub); 311 if (stored == null) { 312 // A concurrent refresh occurred and caused the session to be 313 // deleted (for a reason we can't know at this point). 314 315 // Using a distinct error message mainly for debugging 316 // purposes. Also, throwing a TokenRefreshError to trigger 317 // deletion through the deleteOnError callback. 318 const msg = 'The session was deleted by another process'; 319 throw TokenRefreshError(sub, msg, cause: cause); 320 } else if (stored.tokenSet.accessToken != tokenSet.accessToken || 321 stored.tokenSet.refreshToken != tokenSet.refreshToken) { 322 // A concurrent refresh occurred. Pretend this one succeeded. 323 return stored; 324 } else { 325 // There were no concurrent refresh. The token is (likely) 326 // simply no longer valid. 327 } 328 } 329 330 // Make sure the session gets deleted from the store 331 final msg = cause.errorDescription ?? 'The session was revoked'; 332 throw TokenRefreshError(sub, msg, cause: cause); 333 } 334 335 // Re-throw the original exception if it wasn't an invalid_grant error 336 if (cause is Exception) { 337 throw cause; 338 } else { 339 throw Exception('Token refresh failed: $cause'); 340 } 341 } 342 }; 343 } 344 345 @override 346 Future<void> setStored(String key, Session value) async { 347 // Prevent tampering with the stored value 348 if (key != value.tokenSet.sub) { 349 throw TypeError(); 350 } 351 352 await super.setStored(key, value); 353 354 // Serialize authMethod to String for the event 355 // authMethod can be Map<String, dynamic>, String, or null 356 String? authMethodString; 357 if (value.authMethod is Map) { 358 authMethodString = jsonEncode(value.authMethod); 359 } else if (value.authMethod is String) { 360 authMethodString = value.authMethod as String; 361 } else { 362 authMethodString = null; 363 } 364 365 _dispatchUpdatedEvent(key, value.dpopKey, authMethodString, value.tokenSet); 366 } 367 368 @override 369 Future<void> delStored(AtprotoDid key, [Object? cause]) async { 370 await super.delStored(key, cause); 371 _dispatchDeletedEvent(key, cause ?? Exception('Session deleted')); 372 } 373 374 /// Gets a session, optionally refreshing it. 375 /// 376 /// Parameters: 377 /// - [sub]: The subject (user's DID) 378 /// - [refresh]: When `true`, forces a token refresh even if not expired. 379 /// When `false`, uses cached tokens even if expired. 380 /// When `'auto'`, refreshes only if expired (default). 381 Future<Session> getSession(AtprotoDid sub, [dynamic refresh = 'auto']) { 382 return get( 383 sub, 384 GetCachedOptions(noCache: refresh == true, allowStale: refresh == false), 385 ); 386 } 387 388 @override 389 Future<Session> get(AtprotoDid key, [GetCachedOptions? options]) async { 390 final session = await _runtime.usingLock( 391 '@atproto-oauth-client-$key', 392 () async { 393 // Make sure, even if there is no signal in the options, that the 394 // request will be cancelled after at most 30 seconds. 395 final timeoutToken = CancellationToken(); 396 Timer(Duration(seconds: 30), () => timeoutToken.cancel()); 397 398 final combinedSignal = 399 options?.signal != null 400 ? combineSignals([options!.signal, timeoutToken]) 401 : CombinedCancellationToken([timeoutToken]); 402 403 try { 404 return await super.get( 405 key, 406 GetCachedOptions( 407 signal: CancellationToken(), // Use combined signal 408 noCache: options?.noCache, 409 allowStale: options?.allowStale, 410 ), 411 ); 412 } finally { 413 combinedSignal.dispose(); 414 timeoutToken.dispose(); 415 } 416 }, 417 ); 418 419 if (key != session.tokenSet.sub) { 420 // Fool-proofing (e.g. against invalid session storage) 421 throw Exception('Token set does not match the expected sub'); 422 } 423 424 return session; 425 } 426 427 void _dispatchUpdatedEvent( 428 String sub, 429 Map<String, dynamic> dpopKey, 430 String? authMethod, 431 TokenSet tokenSet, 432 ) { 433 final event = SessionUpdatedEvent( 434 sub: sub, 435 dpopKey: dpopKey, 436 authMethod: authMethod, 437 tokenSet: tokenSet, 438 ); 439 440 _updatedController.add(event); 441 _eventTarget.dispatchCustomEvent('updated', event); 442 } 443 444 void _dispatchDeletedEvent(String sub, Object cause) { 445 final event = SessionDeletedEvent(sub: sub, cause: cause); 446 447 _deletedController.add(event); 448 _eventTarget.dispatchCustomEvent('deleted', event); 449 } 450 451 /// Disposes of resources used by this session getter. 452 void dispose() { 453 _updatedController.close(); 454 _deletedController.close(); 455 _eventTarget.dispose(); 456 } 457} 458 459/// Placeholder for OAuthResponseError 460/// Will be implemented in later chunks 461class OAuthResponseError implements Exception { 462 final int status; 463 final String? error; 464 final String? errorDescription; 465 466 OAuthResponseError({required this.status, this.error, this.errorDescription}); 467} 468 469/// Options for the CachedGetter. 470class CachedGetterOptions<K, V> { 471 /// Function to determine if a cached value is stale 472 final bool Function(K key, V value)? isStale; 473 474 /// Function called when storing a value fails 475 final Future<void> Function(Object err, K key, V value)? onStoreError; 476 477 /// Function to determine if a value should be deleted on error 478 final Future<bool> Function(Object err)? deleteOnError; 479 480 const CachedGetterOptions({ 481 this.isStale, 482 this.onStoreError, 483 this.deleteOnError, 484 }); 485} 486 487/// A pending item in the cache. 488class _PendingItem<V> { 489 final Future<({V value, bool isFresh})> future; 490 491 _PendingItem(this.future); 492} 493 494/// Wrapper utility that uses a store to speed up the retrieval of values. 495/// 496/// The CachedGetter ensures that at most one fresh call is ever being made 497/// for a given key. It also contains logic for reading from the cache which, 498/// if the cache is based on localStorage/indexedDB, will sync across multiple 499/// tabs (for a given key). 500/// 501/// This is an abstract base class. Subclasses should provide the getter 502/// function and any additional logic. 503class CachedGetter<K, V> { 504 final SimpleStore<K, V> _store; 505 final CachedGetterOptions<K, V> _options; 506 final Map<K, _PendingItem<V>> _pending = {}; 507 508 late Future<V> Function(K, GetCachedOptions, V?) _getter; 509 510 CachedGetter({ 511 required SimpleStore<K, V> sessionStore, 512 required Future<V> Function(K, GetCachedOptions, V?)? getter, 513 required CachedGetterOptions<K, V> options, 514 }) : _store = sessionStore, 515 _options = options { 516 if (getter != null) { 517 _getter = getter; 518 } 519 } 520 521 Future<V> get(K key, [GetCachedOptions? options]) async { 522 options ??= GetCachedOptions(); 523 final signal = options.signal; 524 final noCache = options.noCache ?? false; 525 final allowStale = options.allowStale ?? false; 526 527 signal?.throwIfCancelled(); 528 529 final isStale = _options.isStale; 530 final deleteOnError = _options.deleteOnError; 531 532 // Determine if a stored value can be used 533 bool allowStored(V value) { 534 if (noCache) return false; // Never allow stored values 535 if (allowStale || isStale == null) return true; // Always allow 536 return !isStale(key, value); // Check if stale 537 } 538 539 // As long as concurrent requests are made for the same key, only one 540 // request will be made to the getStored & getter functions at a time. 541 _PendingItem<V>? previousExecutionFlow; 542 while ((previousExecutionFlow = _pending[key]) != null) { 543 try { 544 final result = await previousExecutionFlow!.future; 545 final isFresh = result.isFresh; 546 final value = result.value; 547 548 // Use the concurrent request's result if it is fresh 549 if (isFresh) return value; 550 // Use the concurrent request's result if not fresh (loaded from the 551 // store), and matches the conditions for using a stored value. 552 if (allowStored(value)) return value; 553 } catch (_) { 554 // Ignore errors from previous execution flows (they will have been 555 // propagated by that flow). 556 } 557 558 // Break the loop if the signal was cancelled 559 signal?.throwIfCancelled(); 560 } 561 562 final currentExecutionFlow = _PendingItem<V>( 563 Future(() async { 564 final storedValue = await getStored(key, signal: signal); 565 566 if (storedValue != null && allowStored(storedValue)) { 567 // Use the stored value as return value for the current execution 568 // flow. Notify other concurrent execution flows that we got a value, 569 // but that it came from the store (isFresh = false). 570 return (value: storedValue, isFresh: false); 571 } 572 573 return Future(() async { 574 return await _getter(key, options!, storedValue); 575 }) 576 .catchError((err) async { 577 if (storedValue != null) { 578 try { 579 if (deleteOnError != null && await deleteOnError(err)) { 580 await delStored(key, err); 581 } 582 } catch (error) { 583 throw Exception('Error while deleting stored value: $error'); 584 } 585 } 586 throw err; 587 }) 588 .then((value) async { 589 // The value should be stored even if the signal was cancelled. 590 await setStored(key, value); 591 return (value: value, isFresh: true); 592 }); 593 }).whenComplete(() { 594 _pending.remove(key); 595 }), 596 ); 597 598 if (_pending.containsKey(key)) { 599 // This should never happen. There must not be any 'await' 600 // statement between this and the loop iteration check. 601 throw Exception('Concurrent request for the same key'); 602 } 603 604 _pending[key] = currentExecutionFlow; 605 606 final result = await currentExecutionFlow.future; 607 return result.value; 608 } 609 610 Future<V?> getStored(K key, {CancellationToken? signal}) async { 611 try { 612 return await _store.get(key, signal: signal); 613 } catch (err) { 614 return null; 615 } 616 } 617 618 Future<void> setStored(K key, V value) async { 619 try { 620 await _store.set(key, value); 621 } catch (err) { 622 final onStoreError = _options.onStoreError; 623 if (onStoreError != null) { 624 await onStoreError(err, key, value); 625 } 626 } 627 } 628 629 Future<void> delStored(K key, [Object? cause]) async { 630 await _store.del(key); 631 } 632}