Main coves client
1import 'dart:async';
2import 'dart:convert';
3import 'dart:math' as math;
4
5import '../errors/auth_method_unsatisfiable_error.dart';
6import '../errors/token_invalid_error.dart';
7import '../errors/token_refresh_error.dart';
8import '../errors/token_revoked_error.dart';
9import '../oauth/client_auth.dart' show ClientAuthMethod;
10import '../oauth/oauth_server_agent.dart';
11import '../oauth/oauth_server_factory.dart';
12import '../platform/flutter_key.dart';
13import '../runtime/runtime.dart';
14import '../util.dart';
15import 'oauth_session.dart';
16
17/// Options for getting a cached value.
18class GetCachedOptions {
19 /// Cancellation token for aborting the operation
20 final CancellationToken? signal;
21
22 /// Do not use the cache to get the value. Always get a new value.
23 final bool? noCache;
24
25 /// Allow returning stale values from the cache.
26 final bool? allowStale;
27
28 const GetCachedOptions({this.signal, this.noCache, this.allowStale});
29}
30
31/// Abstract storage interface for values.
32///
33/// This is a generic key-value store interface.
34abstract class SimpleStore<K, V> {
35 /// Gets a value from the store.
36 ///
37 /// Returns `null` if the key doesn't exist.
38 Future<V?> get(K key, {CancellationToken? signal});
39
40 /// Sets a value in the store.
41 Future<void> set(K key, V value);
42
43 /// Deletes a value from the store.
44 Future<void> del(K key);
45
46 /// Optionally clears all values from the store.
47 Future<void> clear() async {}
48}
49
50/// Type alias for session storage
51typedef SessionStore = SimpleStore<String, Session>;
52
53/// Details of a session update event.
54class SessionUpdatedEvent {
55 /// The subject (user's DID)
56 final String sub;
57
58 /// The DPoP key
59 final Map<String, dynamic> dpopKey;
60
61 /// The authentication method
62 final String? authMethod;
63
64 /// The token set
65 final TokenSet tokenSet;
66
67 const SessionUpdatedEvent({
68 required this.sub,
69 required this.dpopKey,
70 this.authMethod,
71 required this.tokenSet,
72 });
73}
74
75/// Details of a session deletion event.
76class SessionDeletedEvent {
77 /// The subject (user's DID)
78 final String sub;
79
80 /// The cause of deletion
81 final Object cause;
82
83 const SessionDeletedEvent({required this.sub, required this.cause});
84}
85
86/// Manages session retrieval, caching, and refreshing.
87///
88/// The SessionGetter wraps a session store and provides:
89/// - Automatic token refresh when tokens are stale/expired
90/// - Caching to avoid redundant refresh operations
91/// - Events for session updates and deletions
92/// - Concurrency control to prevent multiple simultaneous refreshes
93///
94/// This is a critical component that ensures at most one token refresh
95/// is happening at a time for a given user, even across multiple tabs
96/// or app instances.
97///
98/// Example:
99/// ```dart
100/// final sessionGetter = SessionGetter(
101/// sessionStore: mySessionStore,
102/// serverFactory: myServerFactory,
103/// runtime: myRuntime,
104/// );
105///
106/// // Listen for session updates
107/// sessionGetter.onUpdated.listen((event) {
108/// print('Session updated for ${event.sub}');
109/// });
110///
111/// // Listen for session deletions
112/// sessionGetter.onDeleted.listen((event) {
113/// print('Session deleted for ${event.sub}: ${event.cause}');
114/// });
115///
116/// // Get a session (automatically refreshes if expired)
117/// final session = await sessionGetter.getSession('did:plc:abc123');
118///
119/// // Force refresh
120/// final freshSession = await sessionGetter.getSession('did:plc:abc123', true);
121/// ```
122class SessionGetter extends CachedGetter<AtprotoDid, Session> {
123 final OAuthServerFactory _serverFactory;
124 final Runtime _runtime;
125
126 final _eventTarget = CustomEventTarget<Map<String, dynamic>>();
127 final _updatedController = StreamController<SessionUpdatedEvent>.broadcast();
128 final _deletedController = StreamController<SessionDeletedEvent>.broadcast();
129
130 /// Stream of session update events.
131 Stream<SessionUpdatedEvent> get onUpdated => _updatedController.stream;
132
133 /// Stream of session deletion events.
134 Stream<SessionDeletedEvent> get onDeleted => _deletedController.stream;
135
136 SessionGetter({
137 required super.sessionStore,
138 required OAuthServerFactory serverFactory,
139 required Runtime runtime,
140 }) : _serverFactory = serverFactory,
141 _runtime = runtime,
142 super(
143 getter: null, // Will be set in _createGetter
144 options: CachedGetterOptions(
145 isStale: (sub, session) {
146 final tokenSet = session.tokenSet;
147 if (tokenSet.expiresAt == null) return false;
148
149 final expiresAt = DateTime.parse(tokenSet.expiresAt!);
150 final now = DateTime.now();
151
152 // Add some lee way to ensure the token is not expired when it
153 // reaches the server (10 seconds)
154 // Add some randomness to reduce the chances of multiple
155 // instances trying to refresh the token at the same time (0-30 seconds)
156 final buffer = Duration(
157 milliseconds:
158 10000 + (math.Random().nextDouble() * 30000).toInt(),
159 );
160
161 return expiresAt.isBefore(now.add(buffer));
162 },
163 onStoreError: (err, sub, session) async {
164 if (err is! AuthMethodUnsatisfiableError) {
165 // If the error was an AuthMethodUnsatisfiableError, there is no
166 // point in trying to call `fromIssuer`.
167 try {
168 // Parse authMethod
169 final authMethodValue = session.authMethod;
170 final authMethod =
171 authMethodValue is Map<String, dynamic>
172 ? ClientAuthMethod.fromJson(authMethodValue)
173 : (authMethodValue as String?) ?? 'legacy';
174
175 // Restore DPoP key from session for revocation
176 // CRITICAL FIX: Use the stored key instead of generating a new one
177 // This ensures DPoP proofs match the token binding
178 final dpopKey = FlutterKey.fromJwk(
179 session.dpopKey as Map<String, dynamic>,
180 );
181
182 // If the token data cannot be stored, let's revoke it
183 final server = await serverFactory.fromIssuer(
184 session.tokenSet.iss,
185 authMethod,
186 dpopKey,
187 );
188 await server.revoke(
189 session.tokenSet.refreshToken ??
190 session.tokenSet.accessToken,
191 );
192 } catch (_) {
193 // Let the original error propagate
194 }
195 }
196
197 throw err;
198 },
199 deleteOnError: (err) async {
200 return err is TokenRefreshError ||
201 err is TokenRevokedError ||
202 err is TokenInvalidError ||
203 err is AuthMethodUnsatisfiableError;
204 },
205 ),
206 ) {
207 // Set the getter function after construction
208 _getter = _createGetter();
209 }
210
211 /// Creates the getter function for refreshing sessions.
212 Future<Session> Function(AtprotoDid, GetCachedOptions, Session?)
213 _createGetter() {
214 return (sub, options, storedSession) async {
215 // There needs to be a previous session to be able to refresh. If
216 // storedSession is null, it means that the store does not contain
217 // a session for the given sub.
218 if (storedSession == null) {
219 // Because the session is not in the store, delStored() method
220 // will not be called by the CachedGetter class (because there is
221 // nothing to delete). This would typically happen if there is no
222 // synchronization mechanism between instances of this class. Let's
223 // make sure an event is dispatched here if this occurs.
224 const msg = 'The session was deleted by another process';
225 final cause = TokenRefreshError(sub, msg);
226 _dispatchDeletedEvent(sub, cause);
227 throw cause;
228 }
229
230 // From this point forward, throwing a TokenRefreshError will result in
231 // delStored() being called, resulting in an event being dispatched,
232 // even if the session was removed from the store through a concurrent
233 // access (which, normally, should not happen if a proper runtime lock
234 // was provided).
235
236 // authMethod can be a Map (serialized ClientAuthMethod) or String ('legacy')
237 final authMethodValue = storedSession.authMethod;
238 final authMethod =
239 authMethodValue is Map<String, dynamic>
240 ? ClientAuthMethod.fromJson(authMethodValue)
241 : (authMethodValue as String?) ?? 'legacy';
242 final tokenSet = storedSession.tokenSet;
243
244 if (sub != tokenSet.sub) {
245 // Fool-proofing (e.g. against invalid session storage)
246 throw TokenRefreshError(sub, 'Stored session sub mismatch');
247 }
248
249 if (tokenSet.refreshToken == null) {
250 throw TokenRefreshError(sub, 'No refresh token available');
251 }
252
253 // Since refresh tokens can only be used once, we might run into
254 // concurrency issues if multiple instances (e.g. browser tabs) are
255 // trying to refresh the same token simultaneously. The chances of this
256 // happening when multiple instances are started simultaneously is
257 // reduced by randomizing the expiry time (see isStale above). The
258 // best solution is to use a mutex/lock to ensure that only one instance
259 // is refreshing the token at a time (runtime.usingLock) but that is not
260 // always possible. If no lock implementation is provided, we will use
261 // the store to check if a concurrent refresh occurred.
262
263 // Restore dpopKey from stored private JWK with error handling
264 // CRITICAL FIX: Use the stored key instead of generating a new one
265 // This ensures DPoP proofs match the token binding during refresh
266 final FlutterKey dpopKey;
267 try {
268 dpopKey = FlutterKey.fromJwk(
269 storedSession.dpopKey as Map<String, dynamic>,
270 );
271 } catch (e) {
272 // If key is corrupted, the session is unusable - force re-authentication
273 throw TokenRefreshError(
274 sub,
275 'Corrupted DPoP key in stored session: $e. Re-authentication required.',
276 );
277 }
278
279 final server = await _serverFactory.fromIssuer(
280 tokenSet.iss,
281 authMethod,
282 dpopKey,
283 );
284
285 // Because refresh tokens can only be used once, we must not use the
286 // "signal" to abort the refresh, or throw any abort error beyond this
287 // point. Any thrown error beyond this point will prevent the
288 // SessionGetter from obtaining, and storing, the new token set,
289 // effectively rendering the currently saved session unusable.
290 options.signal?.throwIfCancelled();
291
292 try {
293 final newTokenSet = await server.refresh(tokenSet);
294
295 if (sub != newTokenSet.sub) {
296 // The server returned another sub. Was the tokenSet manipulated?
297 throw TokenRefreshError(sub, 'Token set sub mismatch');
298 }
299
300 // CRITICAL FIX: Preserve the stored DPoP key (full private JWK)
301 // This ensures the same key is used across token refreshes
302 return Session(
303 dpopKey: storedSession.dpopKey,
304 tokenSet: newTokenSet,
305 authMethod: server.authMethod.toJson(),
306 );
307 } catch (cause) {
308 // If the refresh token is invalid, let's try to recover from
309 // concurrency issues, or make sure the session is deleted by throwing
310 // a TokenRefreshError.
311 if (cause is OAuthResponseError &&
312 cause.status == 400 &&
313 cause.error == 'invalid_grant') {
314 // In case there is no lock implementation in the runtime, we will
315 // wait for a short time to give the other concurrent instances a
316 // chance to finish their refreshing of the token. If a concurrent
317 // refresh did occur, we will pretend that this one succeeded.
318 if (!_runtime.hasImplementationLock) {
319 await Future.delayed(Duration(seconds: 1));
320
321 final stored = await getStored(sub);
322 if (stored == null) {
323 // A concurrent refresh occurred and caused the session to be
324 // deleted (for a reason we can't know at this point).
325
326 // Using a distinct error message mainly for debugging
327 // purposes. Also, throwing a TokenRefreshError to trigger
328 // deletion through the deleteOnError callback.
329 const msg = 'The session was deleted by another process';
330 throw TokenRefreshError(sub, msg, cause: cause);
331 } else if (stored.tokenSet.accessToken != tokenSet.accessToken ||
332 stored.tokenSet.refreshToken != tokenSet.refreshToken) {
333 // A concurrent refresh occurred. Pretend this one succeeded.
334 return stored;
335 } else {
336 // There were no concurrent refresh. The token is (likely)
337 // simply no longer valid.
338 }
339 }
340
341 // Make sure the session gets deleted from the store
342 final msg = cause.errorDescription ?? 'The session was revoked';
343 throw TokenRefreshError(sub, msg, cause: cause);
344 }
345
346 // Re-throw the original exception if it wasn't an invalid_grant error
347 if (cause is Exception) {
348 throw cause;
349 } else {
350 throw Exception('Token refresh failed: $cause');
351 }
352 }
353 };
354 }
355
356 @override
357 Future<void> setStored(String key, Session value) async {
358 // Prevent tampering with the stored value
359 if (key != value.tokenSet.sub) {
360 throw TypeError();
361 }
362
363 await super.setStored(key, value);
364
365 // Serialize authMethod to String for the event
366 // authMethod can be Map<String, dynamic>, String, or null
367 String? authMethodString;
368 if (value.authMethod is Map) {
369 authMethodString = jsonEncode(value.authMethod);
370 } else if (value.authMethod is String) {
371 authMethodString = value.authMethod as String;
372 } else {
373 authMethodString = null;
374 }
375
376 _dispatchUpdatedEvent(key, value.dpopKey, authMethodString, value.tokenSet);
377 }
378
379 @override
380 Future<void> delStored(AtprotoDid key, [Object? cause]) async {
381 await super.delStored(key, cause);
382 _dispatchDeletedEvent(key, cause ?? Exception('Session deleted'));
383 }
384
385 /// Gets a session, optionally refreshing it.
386 ///
387 /// Parameters:
388 /// - [sub]: The subject (user's DID)
389 /// - [refresh]: When `true`, forces a token refresh even if not expired.
390 /// When `false`, uses cached tokens even if expired.
391 /// When `'auto'`, refreshes only if expired (default).
392 Future<Session> getSession(AtprotoDid sub, [dynamic refresh = 'auto']) {
393 return get(
394 sub,
395 GetCachedOptions(noCache: refresh == true, allowStale: refresh == false),
396 );
397 }
398
399 @override
400 Future<Session> get(AtprotoDid key, [GetCachedOptions? options]) async {
401 final session = await _runtime.usingLock(
402 '@atproto-oauth-client-$key',
403 () async {
404 // Make sure, even if there is no signal in the options, that the
405 // request will be cancelled after at most 30 seconds.
406 final timeoutToken = CancellationToken();
407 final timeoutTimer = Timer(Duration(seconds: 30), () => timeoutToken.cancel());
408
409 final combinedSignal =
410 options?.signal != null
411 ? combineSignals([options!.signal, timeoutToken])
412 : CombinedCancellationToken([timeoutToken]);
413
414 try {
415 return await super.get(
416 key,
417 GetCachedOptions(
418 signal: CancellationToken(), // Use combined signal
419 noCache: options?.noCache,
420 allowStale: options?.allowStale,
421 ),
422 );
423 } finally {
424 timeoutTimer.cancel(); // Cancel timer before disposing token
425 combinedSignal.dispose();
426 timeoutToken.dispose();
427 }
428 },
429 );
430
431 if (key != session.tokenSet.sub) {
432 // Fool-proofing (e.g. against invalid session storage)
433 throw Exception('Token set does not match the expected sub');
434 }
435
436 return session;
437 }
438
439 void _dispatchUpdatedEvent(
440 String sub,
441 Map<String, dynamic> dpopKey,
442 String? authMethod,
443 TokenSet tokenSet,
444 ) {
445 final event = SessionUpdatedEvent(
446 sub: sub,
447 dpopKey: dpopKey,
448 authMethod: authMethod,
449 tokenSet: tokenSet,
450 );
451
452 _updatedController.add(event);
453 _eventTarget.dispatchCustomEvent('updated', event);
454 }
455
456 void _dispatchDeletedEvent(String sub, Object cause) {
457 final event = SessionDeletedEvent(sub: sub, cause: cause);
458
459 _deletedController.add(event);
460 _eventTarget.dispatchCustomEvent('deleted', event);
461 }
462
463 /// Disposes of resources used by this session getter.
464 void dispose() {
465 _updatedController.close();
466 _deletedController.close();
467 _eventTarget.dispose();
468 }
469}
470
471/// Placeholder for OAuthResponseError
472/// Will be implemented in later chunks
473class OAuthResponseError implements Exception {
474 final int status;
475 final String? error;
476 final String? errorDescription;
477
478 OAuthResponseError({required this.status, this.error, this.errorDescription});
479}
480
481/// Options for the CachedGetter.
482class CachedGetterOptions<K, V> {
483 /// Function to determine if a cached value is stale
484 final bool Function(K key, V value)? isStale;
485
486 /// Function called when storing a value fails
487 final Future<void> Function(Object err, K key, V value)? onStoreError;
488
489 /// Function to determine if a value should be deleted on error
490 final Future<bool> Function(Object err)? deleteOnError;
491
492 const CachedGetterOptions({
493 this.isStale,
494 this.onStoreError,
495 this.deleteOnError,
496 });
497}
498
499/// A pending item in the cache.
500class _PendingItem<V> {
501 final Future<({V value, bool isFresh})> future;
502
503 _PendingItem(this.future);
504}
505
506/// Wrapper utility that uses a store to speed up the retrieval of values.
507///
508/// The CachedGetter ensures that at most one fresh call is ever being made
509/// for a given key. It also contains logic for reading from the cache which,
510/// if the cache is based on localStorage/indexedDB, will sync across multiple
511/// tabs (for a given key).
512///
513/// This is an abstract base class. Subclasses should provide the getter
514/// function and any additional logic.
515class CachedGetter<K, V> {
516 final SimpleStore<K, V> _store;
517 final CachedGetterOptions<K, V> _options;
518 final Map<K, _PendingItem<V>> _pending = {};
519
520 late Future<V> Function(K, GetCachedOptions, V?) _getter;
521
522 CachedGetter({
523 required SimpleStore<K, V> sessionStore,
524 required Future<V> Function(K, GetCachedOptions, V?)? getter,
525 required CachedGetterOptions<K, V> options,
526 }) : _store = sessionStore,
527 _options = options {
528 if (getter != null) {
529 _getter = getter;
530 }
531 }
532
533 Future<V> get(K key, [GetCachedOptions? options]) async {
534 options ??= GetCachedOptions();
535 final signal = options.signal;
536 final noCache = options.noCache ?? false;
537 final allowStale = options.allowStale ?? false;
538
539 signal?.throwIfCancelled();
540
541 final isStale = _options.isStale;
542 final deleteOnError = _options.deleteOnError;
543
544 // Determine if a stored value can be used
545 bool allowStored(V value) {
546 if (noCache) return false; // Never allow stored values
547 if (allowStale || isStale == null) return true; // Always allow
548 return !isStale(key, value); // Check if stale
549 }
550
551 // As long as concurrent requests are made for the same key, only one
552 // request will be made to the getStored & getter functions at a time.
553 _PendingItem<V>? previousExecutionFlow;
554 while ((previousExecutionFlow = _pending[key]) != null) {
555 try {
556 final result = await previousExecutionFlow!.future;
557 final isFresh = result.isFresh;
558 final value = result.value;
559
560 // Use the concurrent request's result if it is fresh
561 if (isFresh) return value;
562 // Use the concurrent request's result if not fresh (loaded from the
563 // store), and matches the conditions for using a stored value.
564 if (allowStored(value)) return value;
565 } catch (_) {
566 // Ignore errors from previous execution flows (they will have been
567 // propagated by that flow).
568 }
569
570 // Break the loop if the signal was cancelled
571 signal?.throwIfCancelled();
572 }
573
574 final currentExecutionFlow = _PendingItem<V>(
575 Future(() async {
576 final storedValue = await getStored(key, signal: signal);
577
578 if (storedValue != null && allowStored(storedValue)) {
579 // Use the stored value as return value for the current execution
580 // flow. Notify other concurrent execution flows that we got a value,
581 // but that it came from the store (isFresh = false).
582 return (value: storedValue, isFresh: false);
583 }
584
585 return Future(() async {
586 return await _getter(key, options!, storedValue);
587 })
588 .catchError((err) async {
589 if (storedValue != null) {
590 try {
591 if (deleteOnError != null && await deleteOnError(err)) {
592 await delStored(key, err);
593 }
594 } catch (error) {
595 throw Exception('Error while deleting stored value: $error');
596 }
597 }
598 throw err;
599 })
600 .then((value) async {
601 // The value should be stored even if the signal was cancelled.
602 await setStored(key, value);
603 return (value: value, isFresh: true);
604 });
605 }).whenComplete(() {
606 _pending.remove(key);
607 }),
608 );
609
610 if (_pending.containsKey(key)) {
611 // This should never happen. There must not be any 'await'
612 // statement between this and the loop iteration check.
613 throw Exception('Concurrent request for the same key');
614 }
615
616 _pending[key] = currentExecutionFlow;
617
618 final result = await currentExecutionFlow.future;
619 return result.value;
620 }
621
622 Future<V?> getStored(K key, {CancellationToken? signal}) async {
623 try {
624 return await _store.get(key, signal: signal);
625 } catch (err) {
626 return null;
627 }
628 }
629
630 Future<void> setStored(K key, V value) async {
631 try {
632 await _store.set(key, value);
633 } catch (err) {
634 final onStoreError = _options.onStoreError;
635 if (onStoreError != null) {
636 await onStoreError(err, key, value);
637 }
638 }
639 }
640
641 Future<void> delStored(K key, [Object? cause]) async {
642 await _store.del(key);
643 }
644}