1import { err, expect, map, ok, type Result } from '$lib/result';
2import {
3 ComAtprotoIdentityResolveHandle,
4 ComAtprotoRepoGetRecord,
5 ComAtprotoRepoListRecords
6} from '@atcute/atproto';
7import { Client as AtcuteClient } from '@atcute/client';
8import { safeParse, type Handle, type InferOutput } from '@atcute/lexicons';
9import {
10 isDid,
11 parseCanonicalResourceUri,
12 parseResourceUri,
13 type ActorIdentifier,
14 type AtprotoDid,
15 type Cid,
16 type Did,
17 type Nsid,
18 type RecordKey,
19 type ResourceUri
20} from '@atcute/lexicons/syntax';
21import type {
22 InferInput,
23 InferXRPCBodyOutput,
24 ObjectSchema,
25 RecordKeySchema,
26 RecordSchema,
27 XRPCQueryMetadata
28} from '@atcute/lexicons/validations';
29import * as v from '@atcute/lexicons/validations';
30import { MiniDocQuery, type MiniDoc } from './slingshot';
31import { BacklinksQuery, type Backlinks, type BacklinksSource } from './constellation';
32import type { Records } from '@atcute/lexicons/ambient';
33import { PersistedLRU } from '$lib/cache';
34import { AppBskyActorProfile } from '@atcute/bluesky';
35import { WebSocket } from '@soffinal/websocket';
36import type { Notification } from './stardust';
37import { get } from 'svelte/store';
38import { settings } from '$lib/settings';
39import type { OAuthUserAgent } from '@atcute/oauth-browser-client';
40// import { JetstreamSubscription } from '@atcute/jetstream';
41
42const cacheTtl = 1000 * 60 * 60 * 24;
43export const handleCache = new PersistedLRU<Handle, AtprotoDid>({
44 max: 1000,
45 ttl: cacheTtl,
46 prefix: 'handle'
47});
48export const didDocCache = new PersistedLRU<ActorIdentifier, MiniDoc>({
49 max: 1000,
50 ttl: cacheTtl,
51 prefix: 'didDoc'
52});
53export const recordCache = new PersistedLRU<
54 string,
55 InferOutput<typeof ComAtprotoRepoGetRecord.mainSchema.output.schema>
56>({
57 max: 5000,
58 ttl: cacheTtl,
59 prefix: 'record'
60});
61
62export const slingshotUrl: URL = new URL(get(settings).endpoints.slingshot);
63export const spacedustUrl: URL = new URL(get(settings).endpoints.spacedust);
64export const constellationUrl: URL = new URL(get(settings).endpoints.constellation);
65
66type NotificationsStreamEncoder = WebSocket.Encoder<undefined, Notification>;
67export type NotificationsStream = WebSocket<NotificationsStreamEncoder>;
68export type NotificationsStreamEvent = WebSocket.Event<NotificationsStreamEncoder>;
69
70export type RecordOutput<Output> = { uri: ResourceUri; cid: Cid | undefined; record: Output };
71
72export class AtpClient {
73 public atcute: AtcuteClient | null = null;
74 public user: { did: Did; handle: Handle } | null = null;
75
76 async login(agent: OAuthUserAgent): Promise<Result<null, string>> {
77 try {
78 const rpc = new AtcuteClient({ handler: agent });
79 const res = await rpc.get('com.atproto.server.getSession');
80 if (!res.ok) throw res.data.error;
81 this.user = {
82 did: res.data.did,
83 handle: res.data.handle
84 };
85 this.atcute = rpc;
86 } catch (error) {
87 return err(`failed to login: ${error}`);
88 }
89
90 return ok(null);
91 }
92
93 async getRecordUri<
94 Collection extends Nsid,
95 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } },
96 TKey extends RecordKeySchema,
97 Schema extends RecordSchema<TObject, TKey>,
98 Output extends InferInput<Schema>
99 >(schema: Schema, uri: ResourceUri): Promise<Result<RecordOutput<Output>, string>> {
100 const parsedUri = expect(parseResourceUri(uri));
101 if (parsedUri.collection !== schema.object.shape.$type.expected)
102 return err(
103 `collections don't match: ${parsedUri.collection} != ${schema.object.shape.$type.expected}`
104 );
105 return await this.getRecord(schema, parsedUri.repo!, parsedUri.rkey!);
106 }
107
108 async getRecord<
109 Collection extends Nsid,
110 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } },
111 TKey extends RecordKeySchema,
112 Schema extends RecordSchema<TObject, TKey>,
113 Output extends InferInput<Schema>
114 >(
115 schema: Schema,
116 repo: ActorIdentifier,
117 rkey: RecordKey
118 ): Promise<Result<RecordOutput<Output>, string>> {
119 const collection = schema.object.shape.$type.expected;
120 const cacheKey = `${repo}:${collection}:${rkey}`;
121
122 const cached = recordCache.get(cacheKey);
123 if (cached) return ok({ uri: cached.uri, cid: cached.cid, record: cached.value as Output });
124 const cachedSignal = recordCache.getSignal(cacheKey);
125
126 const result = await Promise.race([
127 fetchMicrocosm(slingshotUrl, ComAtprotoRepoGetRecord.mainSchema, {
128 repo,
129 collection,
130 rkey
131 }).then((result): Result<RecordOutput<Output>, string> => {
132 if (!result.ok) return result;
133
134 const parsed = safeParse(schema, result.value.value);
135 if (!parsed.ok) return err(parsed.message);
136
137 recordCache.set(cacheKey, result.value);
138
139 return ok({
140 uri: result.value.uri,
141 cid: result.value.cid,
142 record: parsed.value as Output
143 });
144 }),
145 cachedSignal.then(
146 (d): Result<RecordOutput<Output>, string> =>
147 ok({ uri: d.uri, cid: d.cid, record: d.value as Output })
148 )
149 ]);
150
151 if (!result.ok) return result;
152
153 return ok(result.value);
154 }
155
156 async getProfile(repo?: ActorIdentifier): Promise<Result<AppBskyActorProfile.Main, string>> {
157 repo = repo ?? this.user?.did;
158 if (!repo) return err('not authenticated');
159 return map(await this.getRecord(AppBskyActorProfile.mainSchema, repo, 'self'), (d) => d.record);
160 }
161
162 async listRecords<Collection extends keyof Records>(
163 collection: Collection,
164 repo: ActorIdentifier,
165 cursor?: string,
166 limit?: number
167 ): Promise<
168 Result<InferXRPCBodyOutput<(typeof ComAtprotoRepoListRecords.mainSchema)['output']>, string>
169 > {
170 if (!this.atcute) return err('not authenticated');
171 const res = await this.atcute.get('com.atproto.repo.listRecords', {
172 params: {
173 repo,
174 collection,
175 cursor,
176 limit
177 }
178 });
179 if (!res.ok) return err(`${res.data.error}: ${res.data.message ?? 'no details'}`);
180 return ok(res.data);
181 }
182
183 async resolveHandle(identifier: ActorIdentifier): Promise<Result<AtprotoDid, string>> {
184 if (isDid(identifier)) return ok(identifier as AtprotoDid);
185
186 const cached = handleCache.get(identifier);
187 if (cached) return ok(cached);
188 const cachedSignal = handleCache.getSignal(identifier);
189
190 const res = await Promise.race([
191 fetchMicrocosm(slingshotUrl, ComAtprotoIdentityResolveHandle.mainSchema, {
192 handle: identifier
193 }),
194 cachedSignal.then((d): Result<{ did: Did }, string> => ok({ did: d }))
195 ]);
196
197 const mapped = map(res, (data) => data.did as AtprotoDid);
198
199 if (mapped.ok) handleCache.set(identifier, mapped.value);
200
201 return mapped;
202 }
203
204 async resolveDidDoc(handleOrDid: ActorIdentifier): Promise<Result<MiniDoc, string>> {
205 const cached = didDocCache.get(handleOrDid);
206 if (cached) return ok(cached);
207 const cachedSignal = didDocCache.getSignal(handleOrDid);
208
209 const result = await Promise.race([
210 fetchMicrocosm(slingshotUrl, MiniDocQuery, {
211 identifier: handleOrDid
212 }),
213 cachedSignal.then((d): Result<MiniDoc, string> => ok(d))
214 ]);
215
216 if (result.ok) didDocCache.set(handleOrDid, result.value);
217
218 return result;
219 }
220
221 async getBacklinksUri(
222 uri: ResourceUri,
223 source: BacklinksSource
224 ): Promise<Result<Backlinks, string>> {
225 const parsedResourceUri = expect(parseCanonicalResourceUri(uri));
226 return await this.getBacklinks(
227 parsedResourceUri.repo,
228 parsedResourceUri.collection,
229 parsedResourceUri.rkey,
230 source
231 );
232 }
233
234 async getBacklinks(
235 repo: ActorIdentifier,
236 collection: Nsid,
237 rkey: RecordKey,
238 source: BacklinksSource
239 ): Promise<Result<Backlinks, string>> {
240 const did = await this.resolveHandle(repo);
241 if (!did.ok) return err(`cant resolve handle: ${did.error}`);
242
243 const timeout = new Promise<null>((resolve) => setTimeout(() => resolve(null), 2000));
244 const query = fetchMicrocosm(constellationUrl, BacklinksQuery, {
245 subject: `at://${did.value}/${collection}/${rkey}`,
246 source,
247 limit: 100
248 });
249
250 const results = await Promise.race([query, timeout]);
251 if (!results) return err('cant fetch backlinks: timeout');
252
253 return results;
254 }
255
256 streamNotifications(subjects: Did[], ...sources: BacklinksSource[]): NotificationsStream {
257 const url = new URL(spacedustUrl);
258 url.protocol = 'wss:';
259 url.pathname = '/subscribe';
260 const searchParams = [];
261 sources.every((source) => searchParams.push(['wantedSources', source]));
262 subjects.every((subject) => searchParams.push(['wantedSubjectDids', subject]));
263 subjects.every((subject) => searchParams.push(['wantedSubjects', `at://${subject}`]));
264 searchParams.push(['instant', 'true']);
265 url.search = `?${new URLSearchParams(searchParams)}`;
266 // console.log(`streaming notifications: ${url}`);
267 const encoder = WebSocket.getDefaultEncoder<undefined, Notification>();
268 const ws = new WebSocket<typeof encoder>(url.toString(), {
269 encoder
270 });
271 return ws;
272 }
273
274 // streamJetstream(subjects: Did[], ...collections: Nsid[]) {
275 // return new JetstreamSubscription({
276 // url: 'wss://jetstream2.fr.hose.cam',
277 // wantedCollections: collections,
278 // wantedDids: subjects
279 // });
280 // }
281}
282
283const fetchMicrocosm = async <
284 Schema extends XRPCQueryMetadata,
285 Input extends Schema['params'] extends ObjectSchema ? InferOutput<Schema['params']> : undefined,
286 Output extends InferXRPCBodyOutput<Schema['output']>
287>(
288 api: URL,
289 schema: Schema,
290 params: Input,
291 init?: RequestInit
292): Promise<Result<Output, string>> => {
293 if (!schema.output || schema.output.type === 'blob') return err('schema must be blob');
294 api.pathname = `/xrpc/${schema.nsid}`;
295 api.search = params ? `?${new URLSearchParams(params)}` : '';
296 try {
297 const body = await fetchJson(api, init);
298 if (!body.ok) return err(body.error);
299 const parsed = safeParse(schema.output.schema, body.value);
300 if (!parsed.ok) return err(parsed.message);
301 return ok(parsed.value as Output);
302 } catch (error) {
303 return err(`FetchError: ${error}`);
304 }
305};
306
307const fetchJson = async (url: URL, init?: RequestInit): Promise<Result<unknown, string>> => {
308 try {
309 const response = await fetch(url, init);
310 const body = await response.json();
311 if (response.status === 400) return err(`${body.error}: ${body.message}`);
312 if (response.status !== 200) return err(`UnexpectedStatusCode: ${response.status}`);
313 return ok(body);
314 } catch (error) {
315 return err(`FetchError: ${error}`);
316 }
317};