1import { err, expect, map, ok, type Result } from '$lib/result';
2import {
3 ComAtprotoIdentityResolveHandle,
4 ComAtprotoRepoGetRecord,
5 ComAtprotoRepoListRecords
6} from '@atcute/atproto';
7import { Client as AtcuteClient } from '@atcute/client';
8import { safeParse, type Handle, type InferOutput } from '@atcute/lexicons';
9import {
10 isDid,
11 parseCanonicalResourceUri,
12 parseResourceUri,
13 type ActorIdentifier,
14 type AtprotoDid,
15 type CanonicalResourceUri,
16 type Cid,
17 type Did,
18 type Nsid,
19 type RecordKey,
20 type ResourceUri
21} from '@atcute/lexicons/syntax';
22import type {
23 InferInput,
24 InferXRPCBodyOutput,
25 ObjectSchema,
26 RecordKeySchema,
27 RecordSchema,
28 XRPCQueryMetadata
29} from '@atcute/lexicons/validations';
30import * as v from '@atcute/lexicons/validations';
31import { MiniDocQuery, type MiniDoc } from './slingshot';
32import { BacklinksQuery, type Backlinks, type BacklinksSource } from './constellation';
33import type { Records } from '@atcute/lexicons/ambient';
34import { PersistedLRU } from '$lib/cache';
35import { AppBskyActorProfile } from '@atcute/bluesky';
36import { WebSocket } from '@soffinal/websocket';
37import type { Notification } from './stardust';
38import { get } from 'svelte/store';
39import { settings } from '$lib/settings';
40import type { OAuthUserAgent } from '@atcute/oauth-browser-client';
41// import { JetstreamSubscription } from '@atcute/jetstream';
42
43const cacheTtl = 1000 * 60 * 60 * 24;
44export const handleCache = new PersistedLRU<Handle, AtprotoDid>({
45 max: 1000,
46 ttl: cacheTtl,
47 prefix: 'handle'
48});
49export const didDocCache = new PersistedLRU<ActorIdentifier, MiniDoc>({
50 max: 1000,
51 ttl: cacheTtl,
52 prefix: 'didDoc'
53});
54export const recordCache = new PersistedLRU<
55 string,
56 InferOutput<typeof ComAtprotoRepoGetRecord.mainSchema.output.schema>
57>({
58 max: 5000,
59 ttl: cacheTtl,
60 prefix: 'record'
61});
62
63export const slingshotUrl: URL = new URL(get(settings).endpoints.slingshot);
64export const spacedustUrl: URL = new URL(get(settings).endpoints.spacedust);
65export const constellationUrl: URL = new URL(get(settings).endpoints.constellation);
66
67type NotificationsStreamEncoder = WebSocket.Encoder<undefined, Notification>;
68export type NotificationsStream = WebSocket<NotificationsStreamEncoder>;
69export type NotificationsStreamEvent = WebSocket.Event<NotificationsStreamEncoder>;
70
71export type RecordOutput<Output> = { uri: ResourceUri; cid: Cid | undefined; record: Output };
72
73export class AtpClient {
74 public atcute: AtcuteClient | null = null;
75 public didDoc: MiniDoc | null = null;
76
77 async login(identifier: ActorIdentifier, agent: OAuthUserAgent): Promise<Result<null, string>> {
78 if ((agent.session.token.expires_at ?? 0) < Date.now()) {
79 return err('token expired, relogin');
80 }
81
82 const didDoc = await this.resolveDidDoc(identifier);
83 if (!didDoc.ok) return err(didDoc.error);
84 this.didDoc = didDoc.value;
85
86 try {
87 const rpc = new AtcuteClient({ handler: agent });
88 this.atcute = rpc;
89 } catch (error) {
90 return err(`failed to login: ${error}`);
91 }
92
93 return ok(null);
94 }
95
96 async getRecordUri<
97 Collection extends Nsid,
98 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } },
99 TKey extends RecordKeySchema,
100 Schema extends RecordSchema<TObject, TKey>,
101 Output extends InferInput<Schema>
102 >(schema: Schema, uri: ResourceUri): Promise<Result<RecordOutput<Output>, string>> {
103 const parsedUri = expect(parseResourceUri(uri));
104 if (parsedUri.collection !== schema.object.shape.$type.expected)
105 return err(
106 `collections don't match: ${parsedUri.collection} != ${schema.object.shape.$type.expected}`
107 );
108 return await this.getRecord(schema, parsedUri.repo!, parsedUri.rkey!);
109 }
110
111 async getRecord<
112 Collection extends Nsid,
113 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } },
114 TKey extends RecordKeySchema,
115 Schema extends RecordSchema<TObject, TKey>,
116 Output extends InferInput<Schema>
117 >(
118 schema: Schema,
119 repo: ActorIdentifier,
120 rkey: RecordKey
121 ): Promise<Result<RecordOutput<Output>, string>> {
122 const collection = schema.object.shape.$type.expected;
123 const cacheKey = `${repo}:${collection}:${rkey}`;
124
125 const cached = recordCache.get(cacheKey);
126 if (cached) return ok({ uri: cached.uri, cid: cached.cid, record: cached.value as Output });
127 const cachedSignal = recordCache.getSignal(cacheKey);
128
129 const result = await Promise.race([
130 fetchMicrocosm(slingshotUrl, ComAtprotoRepoGetRecord.mainSchema, {
131 repo,
132 collection,
133 rkey
134 }).then((result): Result<RecordOutput<Output>, string> => {
135 if (!result.ok) return result;
136
137 const parsed = safeParse(schema, result.value.value);
138 if (!parsed.ok) return err(parsed.message);
139
140 recordCache.set(cacheKey, result.value);
141
142 return ok({
143 uri: result.value.uri,
144 cid: result.value.cid,
145 record: parsed.value as Output
146 });
147 }),
148 cachedSignal.then(
149 (d): Result<RecordOutput<Output>, string> =>
150 ok({ uri: d.uri, cid: d.cid, record: d.value as Output })
151 )
152 ]);
153
154 if (!result.ok) return result;
155
156 return ok(result.value);
157 }
158
159 async getProfile(repo?: ActorIdentifier): Promise<Result<AppBskyActorProfile.Main, string>> {
160 repo = repo ?? this.didDoc?.did;
161 if (!repo) return err('not authenticated');
162 return map(await this.getRecord(AppBskyActorProfile.mainSchema, repo, 'self'), (d) => d.record);
163 }
164
165 async listRecords<Collection extends keyof Records>(
166 collection: Collection,
167 repo: ActorIdentifier,
168 cursor?: string,
169 limit?: number
170 ): Promise<
171 Result<InferXRPCBodyOutput<(typeof ComAtprotoRepoListRecords.mainSchema)['output']>, string>
172 > {
173 if (!this.atcute) return err('not authenticated');
174 const res = await this.atcute.get('com.atproto.repo.listRecords', {
175 params: {
176 repo,
177 collection,
178 cursor,
179 limit
180 }
181 });
182 if (!res.ok) return err(`${res.data.error}: ${res.data.message ?? 'no details'}`);
183 return ok(res.data);
184 }
185
186 async resolveHandle(identifier: ActorIdentifier): Promise<Result<AtprotoDid, string>> {
187 if (isDid(identifier)) return ok(identifier as AtprotoDid);
188
189 const cached = handleCache.get(identifier);
190 if (cached) return ok(cached);
191 const cachedSignal = handleCache.getSignal(identifier);
192
193 const res = await Promise.race([
194 fetchMicrocosm(slingshotUrl, ComAtprotoIdentityResolveHandle.mainSchema, {
195 handle: identifier
196 }),
197 cachedSignal.then((d): Result<{ did: Did }, string> => ok({ did: d }))
198 ]);
199
200 const mapped = map(res, (data) => data.did as AtprotoDid);
201
202 if (mapped.ok) {
203 handleCache.set(identifier, mapped.value);
204 }
205
206 return mapped;
207 }
208
209 async resolveDidDoc(handleOrDid: ActorIdentifier): Promise<Result<MiniDoc, string>> {
210 const cached = didDocCache.get(handleOrDid);
211 if (cached) return ok(cached);
212 const cachedSignal = didDocCache.getSignal(handleOrDid);
213
214 const result = await Promise.race([
215 fetchMicrocosm(slingshotUrl, MiniDocQuery, {
216 identifier: handleOrDid
217 }),
218 cachedSignal.then((d): Result<MiniDoc, string> => ok(d))
219 ]);
220
221 if (result.ok) {
222 didDocCache.set(handleOrDid, result.value);
223 }
224
225 return result;
226 }
227
228 async getBacklinksUri(
229 uri: CanonicalResourceUri,
230 source: BacklinksSource
231 ): Promise<Result<Backlinks, string>> {
232 const parsedResourceUri = expect(parseCanonicalResourceUri(uri));
233 return await this.getBacklinks(
234 parsedResourceUri.repo,
235 parsedResourceUri.collection,
236 parsedResourceUri.rkey,
237 source
238 );
239 }
240
241 async getBacklinks(
242 repo: ActorIdentifier,
243 collection: Nsid,
244 rkey: RecordKey,
245 source: BacklinksSource
246 ): Promise<Result<Backlinks, string>> {
247 const did = await this.resolveHandle(repo);
248 if (!did.ok) {
249 return err(`failed to resolve handle: ${did.error}`);
250 }
251
252 return await fetchMicrocosm(constellationUrl, BacklinksQuery, {
253 subject: `at://${did.value}/${collection}/${rkey}`,
254 source,
255 limit: 100
256 });
257 }
258
259 streamNotifications(subjects: Did[], ...sources: BacklinksSource[]): NotificationsStream {
260 const url = new URL(spacedustUrl);
261 url.protocol = 'wss:';
262 url.pathname = '/subscribe';
263 const searchParams = [];
264 sources.every((source) => searchParams.push(['wantedSources', source]));
265 subjects.every((subject) => searchParams.push(['wantedSubjectDids', subject]));
266 subjects.every((subject) => searchParams.push(['wantedSubjects', `at://${subject}`]));
267 searchParams.push(['instant', 'true']);
268 url.search = `?${new URLSearchParams(searchParams)}`;
269 // console.log(`streaming notifications: ${url}`);
270 const encoder = WebSocket.getDefaultEncoder<undefined, Notification>();
271 const ws = new WebSocket<typeof encoder>(url.toString(), {
272 encoder
273 });
274 return ws;
275 }
276
277 // streamJetstream(subjects: Did[], ...collections: Nsid[]) {
278 // return new JetstreamSubscription({
279 // url: 'wss://jetstream2.fr.hose.cam',
280 // wantedCollections: collections,
281 // wantedDids: subjects
282 // });
283 // }
284}
285
286const fetchMicrocosm = async <
287 Schema extends XRPCQueryMetadata,
288 Input extends Schema['params'] extends ObjectSchema ? InferOutput<Schema['params']> : undefined,
289 Output extends InferXRPCBodyOutput<Schema['output']>
290>(
291 api: URL,
292 schema: Schema,
293 params: Input,
294 init?: RequestInit
295): Promise<Result<Output, string>> => {
296 if (!schema.output || schema.output.type === 'blob') return err('schema must be blob');
297 api.pathname = `/xrpc/${schema.nsid}`;
298 api.search = params ? `?${new URLSearchParams(params)}` : '';
299 try {
300 const body = await fetchJson(api, init);
301 if (!body.ok) return err(body.error);
302 const parsed = safeParse(schema.output.schema, body.value);
303 if (!parsed.ok) return err(parsed.message);
304 return ok(parsed.value as Output);
305 } catch (error) {
306 return err(`FetchError: ${error}`);
307 }
308};
309
310const fetchJson = async (url: URL, init?: RequestInit): Promise<Result<unknown, string>> => {
311 try {
312 const response = await fetch(url, init);
313 const body = await response.json();
314 if (response.status === 400) return err(`${body.error}: ${body.message}`);
315 if (response.status !== 200) return err(`UnexpectedStatusCode: ${response.status}`);
316 return ok(body);
317 } catch (error) {
318 return err(`FetchError: ${error}`);
319 }
320};