Main coves client
1import 'dart:async';
2import 'dart:convert';
3import 'dart:math' as math;
4
5import '../errors/auth_method_unsatisfiable_error.dart';
6import '../errors/token_invalid_error.dart';
7import '../errors/token_refresh_error.dart';
8import '../errors/token_revoked_error.dart';
9import '../oauth/client_auth.dart' show ClientAuthMethod;
10import '../oauth/oauth_server_agent.dart';
11import '../oauth/oauth_server_factory.dart';
12import '../runtime/runtime.dart';
13import '../util.dart';
14import 'oauth_session.dart';
15
16/// Options for getting a cached value.
17class GetCachedOptions {
18 /// Cancellation token for aborting the operation
19 final CancellationToken? signal;
20
21 /// Do not use the cache to get the value. Always get a new value.
22 final bool? noCache;
23
24 /// Allow returning stale values from the cache.
25 final bool? allowStale;
26
27 const GetCachedOptions({this.signal, this.noCache, this.allowStale});
28}
29
30/// Abstract storage interface for values.
31///
32/// This is a generic key-value store interface.
33abstract class SimpleStore<K, V> {
34 /// Gets a value from the store.
35 ///
36 /// Returns `null` if the key doesn't exist.
37 Future<V?> get(K key, {CancellationToken? signal});
38
39 /// Sets a value in the store.
40 Future<void> set(K key, V value);
41
42 /// Deletes a value from the store.
43 Future<void> del(K key);
44
45 /// Optionally clears all values from the store.
46 Future<void> clear() async {}
47}
48
49/// Type alias for session storage
50typedef SessionStore = SimpleStore<String, Session>;
51
52/// Details of a session update event.
53class SessionUpdatedEvent {
54 /// The subject (user's DID)
55 final String sub;
56
57 /// The DPoP key
58 final Map<String, dynamic> dpopKey;
59
60 /// The authentication method
61 final String? authMethod;
62
63 /// The token set
64 final TokenSet tokenSet;
65
66 const SessionUpdatedEvent({
67 required this.sub,
68 required this.dpopKey,
69 this.authMethod,
70 required this.tokenSet,
71 });
72}
73
74/// Details of a session deletion event.
75class SessionDeletedEvent {
76 /// The subject (user's DID)
77 final String sub;
78
79 /// The cause of deletion
80 final Object cause;
81
82 const SessionDeletedEvent({required this.sub, required this.cause});
83}
84
85/// Manages session retrieval, caching, and refreshing.
86///
87/// The SessionGetter wraps a session store and provides:
88/// - Automatic token refresh when tokens are stale/expired
89/// - Caching to avoid redundant refresh operations
90/// - Events for session updates and deletions
91/// - Concurrency control to prevent multiple simultaneous refreshes
92///
93/// This is a critical component that ensures at most one token refresh
94/// is happening at a time for a given user, even across multiple tabs
95/// or app instances.
96///
97/// Example:
98/// ```dart
99/// final sessionGetter = SessionGetter(
100/// sessionStore: mySessionStore,
101/// serverFactory: myServerFactory,
102/// runtime: myRuntime,
103/// );
104///
105/// // Listen for session updates
106/// sessionGetter.onUpdated.listen((event) {
107/// print('Session updated for ${event.sub}');
108/// });
109///
110/// // Listen for session deletions
111/// sessionGetter.onDeleted.listen((event) {
112/// print('Session deleted for ${event.sub}: ${event.cause}');
113/// });
114///
115/// // Get a session (automatically refreshes if expired)
116/// final session = await sessionGetter.getSession('did:plc:abc123');
117///
118/// // Force refresh
119/// final freshSession = await sessionGetter.getSession('did:plc:abc123', true);
120/// ```
121class SessionGetter extends CachedGetter<AtprotoDid, Session> {
122 final OAuthServerFactory _serverFactory;
123 final Runtime _runtime;
124
125 final _eventTarget = CustomEventTarget<Map<String, dynamic>>();
126 final _updatedController = StreamController<SessionUpdatedEvent>.broadcast();
127 final _deletedController = StreamController<SessionDeletedEvent>.broadcast();
128
129 /// Stream of session update events.
130 Stream<SessionUpdatedEvent> get onUpdated => _updatedController.stream;
131
132 /// Stream of session deletion events.
133 Stream<SessionDeletedEvent> get onDeleted => _deletedController.stream;
134
135 SessionGetter({
136 required super.sessionStore,
137 required OAuthServerFactory serverFactory,
138 required Runtime runtime,
139 }) : _serverFactory = serverFactory,
140 _runtime = runtime,
141 super(
142 getter: null, // Will be set in _createGetter
143 options: CachedGetterOptions(
144 isStale: (sub, session) {
145 final tokenSet = session.tokenSet;
146 if (tokenSet.expiresAt == null) return false;
147
148 final expiresAt = DateTime.parse(tokenSet.expiresAt!);
149 final now = DateTime.now();
150
151 // Add some lee way to ensure the token is not expired when it
152 // reaches the server (10 seconds)
153 // Add some randomness to reduce the chances of multiple
154 // instances trying to refresh the token at the same time (0-30 seconds)
155 final buffer = Duration(
156 milliseconds:
157 10000 + (math.Random().nextDouble() * 30000).toInt(),
158 );
159
160 return expiresAt.isBefore(now.add(buffer));
161 },
162 onStoreError: (err, sub, session) async {
163 if (err is! AuthMethodUnsatisfiableError) {
164 // If the error was an AuthMethodUnsatisfiableError, there is no
165 // point in trying to call `fromIssuer`.
166 try {
167 // Parse authMethod
168 final authMethodValue = session.authMethod;
169 final authMethod =
170 authMethodValue is Map<String, dynamic>
171 ? ClientAuthMethod.fromJson(authMethodValue)
172 : (authMethodValue as String?) ?? 'legacy';
173
174 // Generate new DPoP key for revocation
175 // (stored key is serialized and can't be directly used)
176 final dpopKeyAlgs = ['ES256', 'RS256'];
177 final newDpopKey = await runtime.generateKey(dpopKeyAlgs);
178
179 // If the token data cannot be stored, let's revoke it
180 final server = await serverFactory.fromIssuer(
181 session.tokenSet.iss,
182 authMethod,
183 newDpopKey,
184 );
185 await server.revoke(
186 session.tokenSet.refreshToken ??
187 session.tokenSet.accessToken,
188 );
189 } catch (_) {
190 // Let the original error propagate
191 }
192 }
193
194 throw err;
195 },
196 deleteOnError: (err) async {
197 return err is TokenRefreshError ||
198 err is TokenRevokedError ||
199 err is TokenInvalidError ||
200 err is AuthMethodUnsatisfiableError;
201 },
202 ),
203 ) {
204 // Set the getter function after construction
205 _getter = _createGetter();
206 }
207
208 /// Creates the getter function for refreshing sessions.
209 Future<Session> Function(AtprotoDid, GetCachedOptions, Session?)
210 _createGetter() {
211 return (sub, options, storedSession) async {
212 // There needs to be a previous session to be able to refresh. If
213 // storedSession is null, it means that the store does not contain
214 // a session for the given sub.
215 if (storedSession == null) {
216 // Because the session is not in the store, delStored() method
217 // will not be called by the CachedGetter class (because there is
218 // nothing to delete). This would typically happen if there is no
219 // synchronization mechanism between instances of this class. Let's
220 // make sure an event is dispatched here if this occurs.
221 const msg = 'The session was deleted by another process';
222 final cause = TokenRefreshError(sub, msg);
223 _dispatchDeletedEvent(sub, cause);
224 throw cause;
225 }
226
227 // From this point forward, throwing a TokenRefreshError will result in
228 // delStored() being called, resulting in an event being dispatched,
229 // even if the session was removed from the store through a concurrent
230 // access (which, normally, should not happen if a proper runtime lock
231 // was provided).
232
233 final dpopKey = storedSession.dpopKey;
234 // authMethod can be a Map (serialized ClientAuthMethod) or String ('legacy')
235 final authMethodValue = storedSession.authMethod;
236 final authMethod =
237 authMethodValue is Map<String, dynamic>
238 ? ClientAuthMethod.fromJson(authMethodValue)
239 : (authMethodValue as String?) ?? 'legacy';
240 final tokenSet = storedSession.tokenSet;
241
242 if (sub != tokenSet.sub) {
243 // Fool-proofing (e.g. against invalid session storage)
244 throw TokenRefreshError(sub, 'Stored session sub mismatch');
245 }
246
247 if (tokenSet.refreshToken == null) {
248 throw TokenRefreshError(sub, 'No refresh token available');
249 }
250
251 // Since refresh tokens can only be used once, we might run into
252 // concurrency issues if multiple instances (e.g. browser tabs) are
253 // trying to refresh the same token simultaneously. The chances of this
254 // happening when multiple instances are started simultaneously is
255 // reduced by randomizing the expiry time (see isStale above). The
256 // best solution is to use a mutex/lock to ensure that only one instance
257 // is refreshing the token at a time (runtime.usingLock) but that is not
258 // always possible. If no lock implementation is provided, we will use
259 // the store to check if a concurrent refresh occurred.
260
261 // TODO: Key Persistence Workaround
262 // The storedSession.dpopKey is a Map<String, dynamic> (serialized JWK),
263 // but OAuthServerFactory.fromIssuer() expects a Key object.
264 // Until Key serialization is implemented (see runtime_implementation.dart),
265 // we generate a new DPoP key for each session refresh.
266 // This works but means tokens are bound to new keys, requiring refresh.
267 final dpopKeyAlgs = ['ES256', 'RS256']; // Supported DPoP algorithms
268 final newDpopKey = await _runtime.generateKey(dpopKeyAlgs);
269
270 final server = await _serverFactory.fromIssuer(
271 tokenSet.iss,
272 authMethod,
273 newDpopKey,
274 );
275
276 // Because refresh tokens can only be used once, we must not use the
277 // "signal" to abort the refresh, or throw any abort error beyond this
278 // point. Any thrown error beyond this point will prevent the
279 // SessionGetter from obtaining, and storing, the new token set,
280 // effectively rendering the currently saved session unusable.
281 options.signal?.throwIfCancelled();
282
283 try {
284 final newTokenSet = await server.refresh(tokenSet);
285
286 if (sub != newTokenSet.sub) {
287 // The server returned another sub. Was the tokenSet manipulated?
288 throw TokenRefreshError(sub, 'Token set sub mismatch');
289 }
290
291 return Session(
292 dpopKey: newDpopKey.bareJwk ?? {},
293 tokenSet: newTokenSet,
294 authMethod: server.authMethod.toJson(),
295 );
296 } catch (cause) {
297 // If the refresh token is invalid, let's try to recover from
298 // concurrency issues, or make sure the session is deleted by throwing
299 // a TokenRefreshError.
300 if (cause is OAuthResponseError &&
301 cause.status == 400 &&
302 cause.error == 'invalid_grant') {
303 // In case there is no lock implementation in the runtime, we will
304 // wait for a short time to give the other concurrent instances a
305 // chance to finish their refreshing of the token. If a concurrent
306 // refresh did occur, we will pretend that this one succeeded.
307 if (!_runtime.hasImplementationLock) {
308 await Future.delayed(Duration(seconds: 1));
309
310 final stored = await getStored(sub);
311 if (stored == null) {
312 // A concurrent refresh occurred and caused the session to be
313 // deleted (for a reason we can't know at this point).
314
315 // Using a distinct error message mainly for debugging
316 // purposes. Also, throwing a TokenRefreshError to trigger
317 // deletion through the deleteOnError callback.
318 const msg = 'The session was deleted by another process';
319 throw TokenRefreshError(sub, msg, cause: cause);
320 } else if (stored.tokenSet.accessToken != tokenSet.accessToken ||
321 stored.tokenSet.refreshToken != tokenSet.refreshToken) {
322 // A concurrent refresh occurred. Pretend this one succeeded.
323 return stored;
324 } else {
325 // There were no concurrent refresh. The token is (likely)
326 // simply no longer valid.
327 }
328 }
329
330 // Make sure the session gets deleted from the store
331 final msg = cause.errorDescription ?? 'The session was revoked';
332 throw TokenRefreshError(sub, msg, cause: cause);
333 }
334
335 // Re-throw the original exception if it wasn't an invalid_grant error
336 if (cause is Exception) {
337 throw cause;
338 } else {
339 throw Exception('Token refresh failed: $cause');
340 }
341 }
342 };
343 }
344
345 @override
346 Future<void> setStored(String key, Session value) async {
347 // Prevent tampering with the stored value
348 if (key != value.tokenSet.sub) {
349 throw TypeError();
350 }
351
352 await super.setStored(key, value);
353
354 // Serialize authMethod to String for the event
355 // authMethod can be Map<String, dynamic>, String, or null
356 String? authMethodString;
357 if (value.authMethod is Map) {
358 authMethodString = jsonEncode(value.authMethod);
359 } else if (value.authMethod is String) {
360 authMethodString = value.authMethod as String;
361 } else {
362 authMethodString = null;
363 }
364
365 _dispatchUpdatedEvent(key, value.dpopKey, authMethodString, value.tokenSet);
366 }
367
368 @override
369 Future<void> delStored(AtprotoDid key, [Object? cause]) async {
370 await super.delStored(key, cause);
371 _dispatchDeletedEvent(key, cause ?? Exception('Session deleted'));
372 }
373
374 /// Gets a session, optionally refreshing it.
375 ///
376 /// Parameters:
377 /// - [sub]: The subject (user's DID)
378 /// - [refresh]: When `true`, forces a token refresh even if not expired.
379 /// When `false`, uses cached tokens even if expired.
380 /// When `'auto'`, refreshes only if expired (default).
381 Future<Session> getSession(AtprotoDid sub, [dynamic refresh = 'auto']) {
382 return get(
383 sub,
384 GetCachedOptions(noCache: refresh == true, allowStale: refresh == false),
385 );
386 }
387
388 @override
389 Future<Session> get(AtprotoDid key, [GetCachedOptions? options]) async {
390 final session = await _runtime.usingLock(
391 '@atproto-oauth-client-$key',
392 () async {
393 // Make sure, even if there is no signal in the options, that the
394 // request will be cancelled after at most 30 seconds.
395 final timeoutToken = CancellationToken();
396 Timer(Duration(seconds: 30), () => timeoutToken.cancel());
397
398 final combinedSignal =
399 options?.signal != null
400 ? combineSignals([options!.signal, timeoutToken])
401 : CombinedCancellationToken([timeoutToken]);
402
403 try {
404 return await super.get(
405 key,
406 GetCachedOptions(
407 signal: CancellationToken(), // Use combined signal
408 noCache: options?.noCache,
409 allowStale: options?.allowStale,
410 ),
411 );
412 } finally {
413 combinedSignal.dispose();
414 timeoutToken.dispose();
415 }
416 },
417 );
418
419 if (key != session.tokenSet.sub) {
420 // Fool-proofing (e.g. against invalid session storage)
421 throw Exception('Token set does not match the expected sub');
422 }
423
424 return session;
425 }
426
427 void _dispatchUpdatedEvent(
428 String sub,
429 Map<String, dynamic> dpopKey,
430 String? authMethod,
431 TokenSet tokenSet,
432 ) {
433 final event = SessionUpdatedEvent(
434 sub: sub,
435 dpopKey: dpopKey,
436 authMethod: authMethod,
437 tokenSet: tokenSet,
438 );
439
440 _updatedController.add(event);
441 _eventTarget.dispatchCustomEvent('updated', event);
442 }
443
444 void _dispatchDeletedEvent(String sub, Object cause) {
445 final event = SessionDeletedEvent(sub: sub, cause: cause);
446
447 _deletedController.add(event);
448 _eventTarget.dispatchCustomEvent('deleted', event);
449 }
450
451 /// Disposes of resources used by this session getter.
452 void dispose() {
453 _updatedController.close();
454 _deletedController.close();
455 _eventTarget.dispose();
456 }
457}
458
459/// Placeholder for OAuthResponseError
460/// Will be implemented in later chunks
461class OAuthResponseError implements Exception {
462 final int status;
463 final String? error;
464 final String? errorDescription;
465
466 OAuthResponseError({required this.status, this.error, this.errorDescription});
467}
468
469/// Options for the CachedGetter.
470class CachedGetterOptions<K, V> {
471 /// Function to determine if a cached value is stale
472 final bool Function(K key, V value)? isStale;
473
474 /// Function called when storing a value fails
475 final Future<void> Function(Object err, K key, V value)? onStoreError;
476
477 /// Function to determine if a value should be deleted on error
478 final Future<bool> Function(Object err)? deleteOnError;
479
480 const CachedGetterOptions({
481 this.isStale,
482 this.onStoreError,
483 this.deleteOnError,
484 });
485}
486
487/// A pending item in the cache.
488class _PendingItem<V> {
489 final Future<({V value, bool isFresh})> future;
490
491 _PendingItem(this.future);
492}
493
494/// Wrapper utility that uses a store to speed up the retrieval of values.
495///
496/// The CachedGetter ensures that at most one fresh call is ever being made
497/// for a given key. It also contains logic for reading from the cache which,
498/// if the cache is based on localStorage/indexedDB, will sync across multiple
499/// tabs (for a given key).
500///
501/// This is an abstract base class. Subclasses should provide the getter
502/// function and any additional logic.
503class CachedGetter<K, V> {
504 final SimpleStore<K, V> _store;
505 final CachedGetterOptions<K, V> _options;
506 final Map<K, _PendingItem<V>> _pending = {};
507
508 late Future<V> Function(K, GetCachedOptions, V?) _getter;
509
510 CachedGetter({
511 required SimpleStore<K, V> sessionStore,
512 required Future<V> Function(K, GetCachedOptions, V?)? getter,
513 required CachedGetterOptions<K, V> options,
514 }) : _store = sessionStore,
515 _options = options {
516 if (getter != null) {
517 _getter = getter;
518 }
519 }
520
521 Future<V> get(K key, [GetCachedOptions? options]) async {
522 options ??= GetCachedOptions();
523 final signal = options.signal;
524 final noCache = options.noCache ?? false;
525 final allowStale = options.allowStale ?? false;
526
527 signal?.throwIfCancelled();
528
529 final isStale = _options.isStale;
530 final deleteOnError = _options.deleteOnError;
531
532 // Determine if a stored value can be used
533 bool allowStored(V value) {
534 if (noCache) return false; // Never allow stored values
535 if (allowStale || isStale == null) return true; // Always allow
536 return !isStale(key, value); // Check if stale
537 }
538
539 // As long as concurrent requests are made for the same key, only one
540 // request will be made to the getStored & getter functions at a time.
541 _PendingItem<V>? previousExecutionFlow;
542 while ((previousExecutionFlow = _pending[key]) != null) {
543 try {
544 final result = await previousExecutionFlow!.future;
545 final isFresh = result.isFresh;
546 final value = result.value;
547
548 // Use the concurrent request's result if it is fresh
549 if (isFresh) return value;
550 // Use the concurrent request's result if not fresh (loaded from the
551 // store), and matches the conditions for using a stored value.
552 if (allowStored(value)) return value;
553 } catch (_) {
554 // Ignore errors from previous execution flows (they will have been
555 // propagated by that flow).
556 }
557
558 // Break the loop if the signal was cancelled
559 signal?.throwIfCancelled();
560 }
561
562 final currentExecutionFlow = _PendingItem<V>(
563 Future(() async {
564 final storedValue = await getStored(key, signal: signal);
565
566 if (storedValue != null && allowStored(storedValue)) {
567 // Use the stored value as return value for the current execution
568 // flow. Notify other concurrent execution flows that we got a value,
569 // but that it came from the store (isFresh = false).
570 return (value: storedValue, isFresh: false);
571 }
572
573 return Future(() async {
574 return await _getter(key, options!, storedValue);
575 })
576 .catchError((err) async {
577 if (storedValue != null) {
578 try {
579 if (deleteOnError != null && await deleteOnError(err)) {
580 await delStored(key, err);
581 }
582 } catch (error) {
583 throw Exception('Error while deleting stored value: $error');
584 }
585 }
586 throw err;
587 })
588 .then((value) async {
589 // The value should be stored even if the signal was cancelled.
590 await setStored(key, value);
591 return (value: value, isFresh: true);
592 });
593 }).whenComplete(() {
594 _pending.remove(key);
595 }),
596 );
597
598 if (_pending.containsKey(key)) {
599 // This should never happen. There must not be any 'await'
600 // statement between this and the loop iteration check.
601 throw Exception('Concurrent request for the same key');
602 }
603
604 _pending[key] = currentExecutionFlow;
605
606 final result = await currentExecutionFlow.future;
607 return result.value;
608 }
609
610 Future<V?> getStored(K key, {CancellationToken? signal}) async {
611 try {
612 return await _store.get(key, signal: signal);
613 } catch (err) {
614 return null;
615 }
616 }
617
618 Future<void> setStored(K key, V value) async {
619 try {
620 await _store.set(key, value);
621 } catch (err) {
622 final onStoreError = _options.onStoreError;
623 if (onStoreError != null) {
624 await onStoreError(err, key, value);
625 }
626 }
627 }
628
629 Future<void> delStored(K key, [Object? cause]) async {
630 await _store.del(key);
631 }
632}