1import 'dart:async'; 2import 'dart:convert'; 3import 'dart:math' as math; 4 5import '../errors/auth_method_unsatisfiable_error.dart'; 6import '../errors/token_invalid_error.dart'; 7import '../errors/token_refresh_error.dart'; 8import '../errors/token_revoked_error.dart'; 9import '../oauth/client_auth.dart' show ClientAuthMethod; 10import '../oauth/oauth_server_agent.dart'; 11import '../oauth/oauth_server_factory.dart'; 12import '../platform/flutter_key.dart'; 13import '../runtime/runtime.dart'; 14import '../util.dart'; 15import 'oauth_session.dart'; 16 17/// Options for getting a cached value. 18class GetCachedOptions { 19 /// Cancellation token for aborting the operation 20 final CancellationToken? signal; 21 22 /// Do not use the cache to get the value. Always get a new value. 23 final bool? noCache; 24 25 /// Allow returning stale values from the cache. 26 final bool? allowStale; 27 28 const GetCachedOptions({this.signal, this.noCache, this.allowStale}); 29} 30 31/// Abstract storage interface for values. 32/// 33/// This is a generic key-value store interface. 34abstract class SimpleStore<K, V> { 35 /// Gets a value from the store. 36 /// 37 /// Returns `null` if the key doesn't exist. 38 Future<V?> get(K key, {CancellationToken? signal}); 39 40 /// Sets a value in the store. 41 Future<void> set(K key, V value); 42 43 /// Deletes a value from the store. 44 Future<void> del(K key); 45 46 /// Optionally clears all values from the store. 47 Future<void> clear() async {} 48} 49 50/// Type alias for session storage 51typedef SessionStore = SimpleStore<String, Session>; 52 53/// Details of a session update event. 54class SessionUpdatedEvent { 55 /// The subject (user's DID) 56 final String sub; 57 58 /// The DPoP key 59 final Map<String, dynamic> dpopKey; 60 61 /// The authentication method 62 final String? authMethod; 63 64 /// The token set 65 final TokenSet tokenSet; 66 67 const SessionUpdatedEvent({ 68 required this.sub, 69 required this.dpopKey, 70 this.authMethod, 71 required this.tokenSet, 72 }); 73} 74 75/// Details of a session deletion event. 76class SessionDeletedEvent { 77 /// The subject (user's DID) 78 final String sub; 79 80 /// The cause of deletion 81 final Object cause; 82 83 const SessionDeletedEvent({required this.sub, required this.cause}); 84} 85 86/// Manages session retrieval, caching, and refreshing. 87/// 88/// The SessionGetter wraps a session store and provides: 89/// - Automatic token refresh when tokens are stale/expired 90/// - Caching to avoid redundant refresh operations 91/// - Events for session updates and deletions 92/// - Concurrency control to prevent multiple simultaneous refreshes 93/// 94/// This is a critical component that ensures at most one token refresh 95/// is happening at a time for a given user, even across multiple tabs 96/// or app instances. 97/// 98/// Example: 99/// ```dart 100/// final sessionGetter = SessionGetter( 101/// sessionStore: mySessionStore, 102/// serverFactory: myServerFactory, 103/// runtime: myRuntime, 104/// ); 105/// 106/// // Listen for session updates 107/// sessionGetter.onUpdated.listen((event) { 108/// print('Session updated for ${event.sub}'); 109/// }); 110/// 111/// // Listen for session deletions 112/// sessionGetter.onDeleted.listen((event) { 113/// print('Session deleted for ${event.sub}: ${event.cause}'); 114/// }); 115/// 116/// // Get a session (automatically refreshes if expired) 117/// final session = await sessionGetter.getSession('did:plc:abc123'); 118/// 119/// // Force refresh 120/// final freshSession = await sessionGetter.getSession('did:plc:abc123', true); 121/// ``` 122class SessionGetter extends CachedGetter<AtprotoDid, Session> { 123 final OAuthServerFactory _serverFactory; 124 final Runtime _runtime; 125 126 final _eventTarget = CustomEventTarget<Map<String, dynamic>>(); 127 final _updatedController = StreamController<SessionUpdatedEvent>.broadcast(); 128 final _deletedController = StreamController<SessionDeletedEvent>.broadcast(); 129 130 /// Stream of session update events. 131 Stream<SessionUpdatedEvent> get onUpdated => _updatedController.stream; 132 133 /// Stream of session deletion events. 134 Stream<SessionDeletedEvent> get onDeleted => _deletedController.stream; 135 136 SessionGetter({ 137 required super.sessionStore, 138 required OAuthServerFactory serverFactory, 139 required Runtime runtime, 140 }) : _serverFactory = serverFactory, 141 _runtime = runtime, 142 super( 143 getter: null, // Will be set in _createGetter 144 options: CachedGetterOptions( 145 isStale: (sub, session) { 146 final tokenSet = session.tokenSet; 147 if (tokenSet.expiresAt == null) return false; 148 149 final expiresAt = DateTime.parse(tokenSet.expiresAt!); 150 final now = DateTime.now(); 151 152 // Add some lee way to ensure the token is not expired when it 153 // reaches the server (10 seconds) 154 // Add some randomness to reduce the chances of multiple 155 // instances trying to refresh the token at the same time (0-30 seconds) 156 final buffer = Duration( 157 milliseconds: 158 10000 + (math.Random().nextDouble() * 30000).toInt(), 159 ); 160 161 return expiresAt.isBefore(now.add(buffer)); 162 }, 163 onStoreError: (err, sub, session) async { 164 if (err is! AuthMethodUnsatisfiableError) { 165 // If the error was an AuthMethodUnsatisfiableError, there is no 166 // point in trying to call `fromIssuer`. 167 try { 168 // Parse authMethod 169 final authMethodValue = session.authMethod; 170 final authMethod = 171 authMethodValue is Map<String, dynamic> 172 ? ClientAuthMethod.fromJson(authMethodValue) 173 : (authMethodValue as String?) ?? 'legacy'; 174 175 // Restore DPoP key from session for revocation 176 // CRITICAL FIX: Use the stored key instead of generating a new one 177 // This ensures DPoP proofs match the token binding 178 final dpopKey = FlutterKey.fromJwk( 179 session.dpopKey as Map<String, dynamic>, 180 ); 181 182 // If the token data cannot be stored, let's revoke it 183 final server = await serverFactory.fromIssuer( 184 session.tokenSet.iss, 185 authMethod, 186 dpopKey, 187 ); 188 await server.revoke( 189 session.tokenSet.refreshToken ?? 190 session.tokenSet.accessToken, 191 ); 192 } catch (_) { 193 // Let the original error propagate 194 } 195 } 196 197 throw err; 198 }, 199 deleteOnError: (err) async { 200 return err is TokenRefreshError || 201 err is TokenRevokedError || 202 err is TokenInvalidError || 203 err is AuthMethodUnsatisfiableError; 204 }, 205 ), 206 ) { 207 // Set the getter function after construction 208 _getter = _createGetter(); 209 } 210 211 /// Creates the getter function for refreshing sessions. 212 Future<Session> Function(AtprotoDid, GetCachedOptions, Session?) 213 _createGetter() { 214 return (sub, options, storedSession) async { 215 // There needs to be a previous session to be able to refresh. If 216 // storedSession is null, it means that the store does not contain 217 // a session for the given sub. 218 if (storedSession == null) { 219 // Because the session is not in the store, delStored() method 220 // will not be called by the CachedGetter class (because there is 221 // nothing to delete). This would typically happen if there is no 222 // synchronization mechanism between instances of this class. Let's 223 // make sure an event is dispatched here if this occurs. 224 const msg = 'The session was deleted by another process'; 225 final cause = TokenRefreshError(sub, msg); 226 _dispatchDeletedEvent(sub, cause); 227 throw cause; 228 } 229 230 // From this point forward, throwing a TokenRefreshError will result in 231 // delStored() being called, resulting in an event being dispatched, 232 // even if the session was removed from the store through a concurrent 233 // access (which, normally, should not happen if a proper runtime lock 234 // was provided). 235 236 // authMethod can be a Map (serialized ClientAuthMethod) or String ('legacy') 237 final authMethodValue = storedSession.authMethod; 238 final authMethod = 239 authMethodValue is Map<String, dynamic> 240 ? ClientAuthMethod.fromJson(authMethodValue) 241 : (authMethodValue as String?) ?? 'legacy'; 242 final tokenSet = storedSession.tokenSet; 243 244 if (sub != tokenSet.sub) { 245 // Fool-proofing (e.g. against invalid session storage) 246 throw TokenRefreshError(sub, 'Stored session sub mismatch'); 247 } 248 249 if (tokenSet.refreshToken == null) { 250 throw TokenRefreshError(sub, 'No refresh token available'); 251 } 252 253 // Since refresh tokens can only be used once, we might run into 254 // concurrency issues if multiple instances (e.g. browser tabs) are 255 // trying to refresh the same token simultaneously. The chances of this 256 // happening when multiple instances are started simultaneously is 257 // reduced by randomizing the expiry time (see isStale above). The 258 // best solution is to use a mutex/lock to ensure that only one instance 259 // is refreshing the token at a time (runtime.usingLock) but that is not 260 // always possible. If no lock implementation is provided, we will use 261 // the store to check if a concurrent refresh occurred. 262 263 // Restore dpopKey from stored private JWK with error handling 264 // CRITICAL FIX: Use the stored key instead of generating a new one 265 // This ensures DPoP proofs match the token binding during refresh 266 final FlutterKey dpopKey; 267 try { 268 dpopKey = FlutterKey.fromJwk( 269 storedSession.dpopKey as Map<String, dynamic>, 270 ); 271 } catch (e) { 272 // If key is corrupted, the session is unusable - force re-authentication 273 throw TokenRefreshError( 274 sub, 275 'Corrupted DPoP key in stored session: $e. Re-authentication required.', 276 ); 277 } 278 279 final server = await _serverFactory.fromIssuer( 280 tokenSet.iss, 281 authMethod, 282 dpopKey, 283 ); 284 285 // Because refresh tokens can only be used once, we must not use the 286 // "signal" to abort the refresh, or throw any abort error beyond this 287 // point. Any thrown error beyond this point will prevent the 288 // SessionGetter from obtaining, and storing, the new token set, 289 // effectively rendering the currently saved session unusable. 290 options.signal?.throwIfCancelled(); 291 292 try { 293 final newTokenSet = await server.refresh(tokenSet); 294 295 if (sub != newTokenSet.sub) { 296 // The server returned another sub. Was the tokenSet manipulated? 297 throw TokenRefreshError(sub, 'Token set sub mismatch'); 298 } 299 300 // CRITICAL FIX: Preserve the stored DPoP key (full private JWK) 301 // This ensures the same key is used across token refreshes 302 return Session( 303 dpopKey: storedSession.dpopKey, 304 tokenSet: newTokenSet, 305 authMethod: server.authMethod.toJson(), 306 ); 307 } catch (cause) { 308 // If the refresh token is invalid, let's try to recover from 309 // concurrency issues, or make sure the session is deleted by throwing 310 // a TokenRefreshError. 311 if (cause is OAuthResponseError && 312 cause.status == 400 && 313 cause.error == 'invalid_grant') { 314 // In case there is no lock implementation in the runtime, we will 315 // wait for a short time to give the other concurrent instances a 316 // chance to finish their refreshing of the token. If a concurrent 317 // refresh did occur, we will pretend that this one succeeded. 318 if (!_runtime.hasImplementationLock) { 319 await Future.delayed(Duration(seconds: 1)); 320 321 final stored = await getStored(sub); 322 if (stored == null) { 323 // A concurrent refresh occurred and caused the session to be 324 // deleted (for a reason we can't know at this point). 325 326 // Using a distinct error message mainly for debugging 327 // purposes. Also, throwing a TokenRefreshError to trigger 328 // deletion through the deleteOnError callback. 329 const msg = 'The session was deleted by another process'; 330 throw TokenRefreshError(sub, msg, cause: cause); 331 } else if (stored.tokenSet.accessToken != tokenSet.accessToken || 332 stored.tokenSet.refreshToken != tokenSet.refreshToken) { 333 // A concurrent refresh occurred. Pretend this one succeeded. 334 return stored; 335 } else { 336 // There were no concurrent refresh. The token is (likely) 337 // simply no longer valid. 338 } 339 } 340 341 // Make sure the session gets deleted from the store 342 final msg = cause.errorDescription ?? 'The session was revoked'; 343 throw TokenRefreshError(sub, msg, cause: cause); 344 } 345 346 // Re-throw the original exception if it wasn't an invalid_grant error 347 if (cause is Exception) { 348 throw cause; 349 } else { 350 throw Exception('Token refresh failed: $cause'); 351 } 352 } 353 }; 354 } 355 356 @override 357 Future<void> setStored(String key, Session value) async { 358 // Prevent tampering with the stored value 359 if (key != value.tokenSet.sub) { 360 throw TypeError(); 361 } 362 363 await super.setStored(key, value); 364 365 // Serialize authMethod to String for the event 366 // authMethod can be Map<String, dynamic>, String, or null 367 String? authMethodString; 368 if (value.authMethod is Map) { 369 authMethodString = jsonEncode(value.authMethod); 370 } else if (value.authMethod is String) { 371 authMethodString = value.authMethod as String; 372 } else { 373 authMethodString = null; 374 } 375 376 _dispatchUpdatedEvent(key, value.dpopKey, authMethodString, value.tokenSet); 377 } 378 379 @override 380 Future<void> delStored(AtprotoDid key, [Object? cause]) async { 381 await super.delStored(key, cause); 382 _dispatchDeletedEvent(key, cause ?? Exception('Session deleted')); 383 } 384 385 /// Gets a session, optionally refreshing it. 386 /// 387 /// Parameters: 388 /// - [sub]: The subject (user's DID) 389 /// - [refresh]: When `true`, forces a token refresh even if not expired. 390 /// When `false`, uses cached tokens even if expired. 391 /// When `'auto'`, refreshes only if expired (default). 392 Future<Session> getSession(AtprotoDid sub, [dynamic refresh = 'auto']) { 393 return get( 394 sub, 395 GetCachedOptions(noCache: refresh == true, allowStale: refresh == false), 396 ); 397 } 398 399 @override 400 Future<Session> get(AtprotoDid key, [GetCachedOptions? options]) async { 401 final session = await _runtime.usingLock( 402 '@atproto-oauth-client-$key', 403 () async { 404 // Make sure, even if there is no signal in the options, that the 405 // request will be cancelled after at most 30 seconds. 406 final timeoutToken = CancellationToken(); 407 Timer(Duration(seconds: 30), () => timeoutToken.cancel()); 408 409 final combinedSignal = 410 options?.signal != null 411 ? combineSignals([options!.signal, timeoutToken]) 412 : CombinedCancellationToken([timeoutToken]); 413 414 try { 415 return await super.get( 416 key, 417 GetCachedOptions( 418 signal: CancellationToken(), // Use combined signal 419 noCache: options?.noCache, 420 allowStale: options?.allowStale, 421 ), 422 ); 423 } finally { 424 combinedSignal.dispose(); 425 timeoutToken.dispose(); 426 } 427 }, 428 ); 429 430 if (key != session.tokenSet.sub) { 431 // Fool-proofing (e.g. against invalid session storage) 432 throw Exception('Token set does not match the expected sub'); 433 } 434 435 return session; 436 } 437 438 void _dispatchUpdatedEvent( 439 String sub, 440 Map<String, dynamic> dpopKey, 441 String? authMethod, 442 TokenSet tokenSet, 443 ) { 444 final event = SessionUpdatedEvent( 445 sub: sub, 446 dpopKey: dpopKey, 447 authMethod: authMethod, 448 tokenSet: tokenSet, 449 ); 450 451 _updatedController.add(event); 452 _eventTarget.dispatchCustomEvent('updated', event); 453 } 454 455 void _dispatchDeletedEvent(String sub, Object cause) { 456 final event = SessionDeletedEvent(sub: sub, cause: cause); 457 458 _deletedController.add(event); 459 _eventTarget.dispatchCustomEvent('deleted', event); 460 } 461 462 /// Disposes of resources used by this session getter. 463 void dispose() { 464 _updatedController.close(); 465 _deletedController.close(); 466 _eventTarget.dispose(); 467 } 468} 469 470/// Placeholder for OAuthResponseError 471/// Will be implemented in later chunks 472class OAuthResponseError implements Exception { 473 final int status; 474 final String? error; 475 final String? errorDescription; 476 477 OAuthResponseError({required this.status, this.error, this.errorDescription}); 478} 479 480/// Options for the CachedGetter. 481class CachedGetterOptions<K, V> { 482 /// Function to determine if a cached value is stale 483 final bool Function(K key, V value)? isStale; 484 485 /// Function called when storing a value fails 486 final Future<void> Function(Object err, K key, V value)? onStoreError; 487 488 /// Function to determine if a value should be deleted on error 489 final Future<bool> Function(Object err)? deleteOnError; 490 491 const CachedGetterOptions({ 492 this.isStale, 493 this.onStoreError, 494 this.deleteOnError, 495 }); 496} 497 498/// A pending item in the cache. 499class _PendingItem<V> { 500 final Future<({V value, bool isFresh})> future; 501 502 _PendingItem(this.future); 503} 504 505/// Wrapper utility that uses a store to speed up the retrieval of values. 506/// 507/// The CachedGetter ensures that at most one fresh call is ever being made 508/// for a given key. It also contains logic for reading from the cache which, 509/// if the cache is based on localStorage/indexedDB, will sync across multiple 510/// tabs (for a given key). 511/// 512/// This is an abstract base class. Subclasses should provide the getter 513/// function and any additional logic. 514class CachedGetter<K, V> { 515 final SimpleStore<K, V> _store; 516 final CachedGetterOptions<K, V> _options; 517 final Map<K, _PendingItem<V>> _pending = {}; 518 519 late Future<V> Function(K, GetCachedOptions, V?) _getter; 520 521 CachedGetter({ 522 required SimpleStore<K, V> sessionStore, 523 required Future<V> Function(K, GetCachedOptions, V?)? getter, 524 required CachedGetterOptions<K, V> options, 525 }) : _store = sessionStore, 526 _options = options { 527 if (getter != null) { 528 _getter = getter; 529 } 530 } 531 532 Future<V> get(K key, [GetCachedOptions? options]) async { 533 options ??= GetCachedOptions(); 534 final signal = options.signal; 535 final noCache = options.noCache ?? false; 536 final allowStale = options.allowStale ?? false; 537 538 signal?.throwIfCancelled(); 539 540 final isStale = _options.isStale; 541 final deleteOnError = _options.deleteOnError; 542 543 // Determine if a stored value can be used 544 bool allowStored(V value) { 545 if (noCache) return false; // Never allow stored values 546 if (allowStale || isStale == null) return true; // Always allow 547 return !isStale(key, value); // Check if stale 548 } 549 550 // As long as concurrent requests are made for the same key, only one 551 // request will be made to the getStored & getter functions at a time. 552 _PendingItem<V>? previousExecutionFlow; 553 while ((previousExecutionFlow = _pending[key]) != null) { 554 try { 555 final result = await previousExecutionFlow!.future; 556 final isFresh = result.isFresh; 557 final value = result.value; 558 559 // Use the concurrent request's result if it is fresh 560 if (isFresh) return value; 561 // Use the concurrent request's result if not fresh (loaded from the 562 // store), and matches the conditions for using a stored value. 563 if (allowStored(value)) return value; 564 } catch (_) { 565 // Ignore errors from previous execution flows (they will have been 566 // propagated by that flow). 567 } 568 569 // Break the loop if the signal was cancelled 570 signal?.throwIfCancelled(); 571 } 572 573 final currentExecutionFlow = _PendingItem<V>( 574 Future(() async { 575 final storedValue = await getStored(key, signal: signal); 576 577 if (storedValue != null && allowStored(storedValue)) { 578 // Use the stored value as return value for the current execution 579 // flow. Notify other concurrent execution flows that we got a value, 580 // but that it came from the store (isFresh = false). 581 return (value: storedValue, isFresh: false); 582 } 583 584 return Future(() async { 585 return await _getter(key, options!, storedValue); 586 }) 587 .catchError((err) async { 588 if (storedValue != null) { 589 try { 590 if (deleteOnError != null && await deleteOnError(err)) { 591 await delStored(key, err); 592 } 593 } catch (error) { 594 throw Exception('Error while deleting stored value: $error'); 595 } 596 } 597 throw err; 598 }) 599 .then((value) async { 600 // The value should be stored even if the signal was cancelled. 601 await setStored(key, value); 602 return (value: value, isFresh: true); 603 }); 604 }).whenComplete(() { 605 _pending.remove(key); 606 }), 607 ); 608 609 if (_pending.containsKey(key)) { 610 // This should never happen. There must not be any 'await' 611 // statement between this and the loop iteration check. 612 throw Exception('Concurrent request for the same key'); 613 } 614 615 _pending[key] = currentExecutionFlow; 616 617 final result = await currentExecutionFlow.future; 618 return result.value; 619 } 620 621 Future<V?> getStored(K key, {CancellationToken? signal}) async { 622 try { 623 return await _store.get(key, signal: signal); 624 } catch (err) { 625 return null; 626 } 627 } 628 629 Future<void> setStored(K key, V value) async { 630 try { 631 await _store.set(key, value); 632 } catch (err) { 633 final onStoreError = _options.onStoreError; 634 if (onStoreError != null) { 635 await onStoreError(err, key, value); 636 } 637 } 638 } 639 640 Future<void> delStored(K key, [Object? cause]) async { 641 await _store.del(key); 642 } 643}