replies timeline only, appview-less bluesky client
1import { err, expect, map, ok, type Result } from '$lib/result'; 2import { 3 ComAtprotoIdentityResolveHandle, 4 ComAtprotoRepoGetRecord, 5 ComAtprotoRepoListRecords 6} from '@atcute/atproto'; 7import { Client as AtcuteClient } from '@atcute/client'; 8import { safeParse, type Handle, type InferOutput } from '@atcute/lexicons'; 9import { 10 isDid, 11 parseCanonicalResourceUri, 12 parseResourceUri, 13 type ActorIdentifier, 14 type AtprotoDid, 15 type CanonicalResourceUri, 16 type Cid, 17 type Did, 18 type Nsid, 19 type RecordKey, 20 type ResourceUri 21} from '@atcute/lexicons/syntax'; 22import type { 23 InferInput, 24 InferXRPCBodyOutput, 25 ObjectSchema, 26 RecordKeySchema, 27 RecordSchema, 28 XRPCQueryMetadata 29} from '@atcute/lexicons/validations'; 30import * as v from '@atcute/lexicons/validations'; 31import { MiniDocQuery, type MiniDoc } from './slingshot'; 32import { BacklinksQuery, type Backlinks, type BacklinksSource } from './constellation'; 33import type { Records } from '@atcute/lexicons/ambient'; 34import { PersistedLRU } from '$lib/cache'; 35import { AppBskyActorProfile } from '@atcute/bluesky'; 36import { WebSocket } from '@soffinal/websocket'; 37import type { Notification } from './stardust'; 38import { get } from 'svelte/store'; 39import { settings } from '$lib/settings'; 40import type { OAuthUserAgent } from '@atcute/oauth-browser-client'; 41// import { JetstreamSubscription } from '@atcute/jetstream'; 42 43const cacheTtl = 1000 * 60 * 60 * 24; 44export const handleCache = new PersistedLRU<Handle, AtprotoDid>({ 45 max: 1000, 46 ttl: cacheTtl, 47 prefix: 'handle' 48}); 49export const didDocCache = new PersistedLRU<ActorIdentifier, MiniDoc>({ 50 max: 1000, 51 ttl: cacheTtl, 52 prefix: 'didDoc' 53}); 54export const recordCache = new PersistedLRU< 55 string, 56 InferOutput<typeof ComAtprotoRepoGetRecord.mainSchema.output.schema> 57>({ 58 max: 5000, 59 ttl: cacheTtl, 60 prefix: 'record' 61}); 62 63export const slingshotUrl: URL = new URL(get(settings).endpoints.slingshot); 64export const spacedustUrl: URL = new URL(get(settings).endpoints.spacedust); 65export const constellationUrl: URL = new URL(get(settings).endpoints.constellation); 66 67type NotificationsStreamEncoder = WebSocket.Encoder<undefined, Notification>; 68export type NotificationsStream = WebSocket<NotificationsStreamEncoder>; 69export type NotificationsStreamEvent = WebSocket.Event<NotificationsStreamEncoder>; 70 71export type RecordOutput<Output> = { uri: ResourceUri; cid: Cid | undefined; record: Output }; 72 73export class AtpClient { 74 public atcute: AtcuteClient | null = null; 75 public didDoc: MiniDoc | null = null; 76 77 async login(identifier: ActorIdentifier, agent: OAuthUserAgent): Promise<Result<null, string>> { 78 if ((agent.session.token.expires_at ?? 0) < Date.now()) { 79 return err('token expired, relogin'); 80 } 81 82 const didDoc = await this.resolveDidDoc(identifier); 83 if (!didDoc.ok) return err(didDoc.error); 84 this.didDoc = didDoc.value; 85 86 try { 87 const rpc = new AtcuteClient({ handler: agent }); 88 this.atcute = rpc; 89 } catch (error) { 90 return err(`failed to login: ${error}`); 91 } 92 93 return ok(null); 94 } 95 96 async getRecordUri< 97 Collection extends Nsid, 98 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } }, 99 TKey extends RecordKeySchema, 100 Schema extends RecordSchema<TObject, TKey>, 101 Output extends InferInput<Schema> 102 >(schema: Schema, uri: ResourceUri): Promise<Result<RecordOutput<Output>, string>> { 103 const parsedUri = expect(parseResourceUri(uri)); 104 if (parsedUri.collection !== schema.object.shape.$type.expected) 105 return err( 106 `collections don't match: ${parsedUri.collection} != ${schema.object.shape.$type.expected}` 107 ); 108 return await this.getRecord(schema, parsedUri.repo!, parsedUri.rkey!); 109 } 110 111 async getRecord< 112 Collection extends Nsid, 113 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } }, 114 TKey extends RecordKeySchema, 115 Schema extends RecordSchema<TObject, TKey>, 116 Output extends InferInput<Schema> 117 >( 118 schema: Schema, 119 repo: ActorIdentifier, 120 rkey: RecordKey 121 ): Promise<Result<RecordOutput<Output>, string>> { 122 const collection = schema.object.shape.$type.expected; 123 const cacheKey = `${repo}:${collection}:${rkey}`; 124 125 const cached = recordCache.get(cacheKey); 126 if (cached) return ok({ uri: cached.uri, cid: cached.cid, record: cached.value as Output }); 127 const cachedSignal = recordCache.getSignal(cacheKey); 128 129 const result = await Promise.race([ 130 fetchMicrocosm(slingshotUrl, ComAtprotoRepoGetRecord.mainSchema, { 131 repo, 132 collection, 133 rkey 134 }).then((result): Result<RecordOutput<Output>, string> => { 135 if (!result.ok) return result; 136 137 const parsed = safeParse(schema, result.value.value); 138 if (!parsed.ok) return err(parsed.message); 139 140 recordCache.set(cacheKey, result.value); 141 142 return ok({ 143 uri: result.value.uri, 144 cid: result.value.cid, 145 record: parsed.value as Output 146 }); 147 }), 148 cachedSignal.then( 149 (d): Result<RecordOutput<Output>, string> => 150 ok({ uri: d.uri, cid: d.cid, record: d.value as Output }) 151 ) 152 ]); 153 154 if (!result.ok) return result; 155 156 return ok(result.value); 157 } 158 159 async getProfile(repo?: ActorIdentifier): Promise<Result<AppBskyActorProfile.Main, string>> { 160 repo = repo ?? this.didDoc?.did; 161 if (!repo) return err('not authenticated'); 162 return map(await this.getRecord(AppBskyActorProfile.mainSchema, repo, 'self'), (d) => d.record); 163 } 164 165 async listRecords<Collection extends keyof Records>( 166 collection: Collection, 167 repo: ActorIdentifier, 168 cursor?: string, 169 limit?: number 170 ): Promise< 171 Result<InferXRPCBodyOutput<(typeof ComAtprotoRepoListRecords.mainSchema)['output']>, string> 172 > { 173 if (!this.atcute) return err('not authenticated'); 174 const res = await this.atcute.get('com.atproto.repo.listRecords', { 175 params: { 176 repo, 177 collection, 178 cursor, 179 limit 180 } 181 }); 182 if (!res.ok) return err(`${res.data.error}: ${res.data.message ?? 'no details'}`); 183 return ok(res.data); 184 } 185 186 async resolveHandle(identifier: ActorIdentifier): Promise<Result<AtprotoDid, string>> { 187 if (isDid(identifier)) return ok(identifier as AtprotoDid); 188 189 const cached = handleCache.get(identifier); 190 if (cached) return ok(cached); 191 const cachedSignal = handleCache.getSignal(identifier); 192 193 const res = await Promise.race([ 194 fetchMicrocosm(slingshotUrl, ComAtprotoIdentityResolveHandle.mainSchema, { 195 handle: identifier 196 }), 197 cachedSignal.then((d): Result<{ did: Did }, string> => ok({ did: d })) 198 ]); 199 200 const mapped = map(res, (data) => data.did as AtprotoDid); 201 202 if (mapped.ok) { 203 handleCache.set(identifier, mapped.value); 204 } 205 206 return mapped; 207 } 208 209 async resolveDidDoc(handleOrDid: ActorIdentifier): Promise<Result<MiniDoc, string>> { 210 const cached = didDocCache.get(handleOrDid); 211 if (cached) return ok(cached); 212 const cachedSignal = didDocCache.getSignal(handleOrDid); 213 214 const result = await Promise.race([ 215 fetchMicrocosm(slingshotUrl, MiniDocQuery, { 216 identifier: handleOrDid 217 }), 218 cachedSignal.then((d): Result<MiniDoc, string> => ok(d)) 219 ]); 220 221 if (result.ok) { 222 didDocCache.set(handleOrDid, result.value); 223 } 224 225 return result; 226 } 227 228 async getBacklinksUri( 229 uri: CanonicalResourceUri, 230 source: BacklinksSource 231 ): Promise<Result<Backlinks, string>> { 232 const parsedResourceUri = expect(parseCanonicalResourceUri(uri)); 233 return await this.getBacklinks( 234 parsedResourceUri.repo, 235 parsedResourceUri.collection, 236 parsedResourceUri.rkey, 237 source 238 ); 239 } 240 241 async getBacklinks( 242 repo: ActorIdentifier, 243 collection: Nsid, 244 rkey: RecordKey, 245 source: BacklinksSource 246 ): Promise<Result<Backlinks, string>> { 247 const did = await this.resolveHandle(repo); 248 if (!did.ok) { 249 return err(`failed to resolve handle: ${did.error}`); 250 } 251 252 return await fetchMicrocosm(constellationUrl, BacklinksQuery, { 253 subject: `at://${did.value}/${collection}/${rkey}`, 254 source, 255 limit: 100 256 }); 257 } 258 259 streamNotifications(subjects: Did[], ...sources: BacklinksSource[]): NotificationsStream { 260 const url = new URL(spacedustUrl); 261 url.protocol = 'wss:'; 262 url.pathname = '/subscribe'; 263 const searchParams = []; 264 sources.every((source) => searchParams.push(['wantedSources', source])); 265 subjects.every((subject) => searchParams.push(['wantedSubjectDids', subject])); 266 subjects.every((subject) => searchParams.push(['wantedSubjects', `at://${subject}`])); 267 searchParams.push(['instant', 'true']); 268 url.search = `?${new URLSearchParams(searchParams)}`; 269 // console.log(`streaming notifications: ${url}`); 270 const encoder = WebSocket.getDefaultEncoder<undefined, Notification>(); 271 const ws = new WebSocket<typeof encoder>(url.toString(), { 272 encoder 273 }); 274 return ws; 275 } 276 277 // streamJetstream(subjects: Did[], ...collections: Nsid[]) { 278 // return new JetstreamSubscription({ 279 // url: 'wss://jetstream2.fr.hose.cam', 280 // wantedCollections: collections, 281 // wantedDids: subjects 282 // }); 283 // } 284} 285 286const fetchMicrocosm = async < 287 Schema extends XRPCQueryMetadata, 288 Input extends Schema['params'] extends ObjectSchema ? InferOutput<Schema['params']> : undefined, 289 Output extends InferXRPCBodyOutput<Schema['output']> 290>( 291 api: URL, 292 schema: Schema, 293 params: Input, 294 init?: RequestInit 295): Promise<Result<Output, string>> => { 296 if (!schema.output || schema.output.type === 'blob') return err('schema must be blob'); 297 api.pathname = `/xrpc/${schema.nsid}`; 298 api.search = params ? `?${new URLSearchParams(params)}` : ''; 299 try { 300 const body = await fetchJson(api, init); 301 if (!body.ok) return err(body.error); 302 const parsed = safeParse(schema.output.schema, body.value); 303 if (!parsed.ok) return err(parsed.message); 304 return ok(parsed.value as Output); 305 } catch (error) { 306 return err(`FetchError: ${error}`); 307 } 308}; 309 310const fetchJson = async (url: URL, init?: RequestInit): Promise<Result<unknown, string>> => { 311 try { 312 const response = await fetch(url, init); 313 const body = await response.json(); 314 if (response.status === 400) return err(`${body.error}: ${body.message}`); 315 if (response.status !== 200) return err(`UnexpectedStatusCode: ${response.status}`); 316 return ok(body); 317 } catch (error) { 318 return err(`FetchError: ${error}`); 319 } 320};