replies timeline only, appview-less bluesky client
at main 10 kB view raw
1import { err, expect, map, ok, type Result } from '$lib/result'; 2import { 3 ComAtprotoIdentityResolveHandle, 4 ComAtprotoRepoGetRecord, 5 ComAtprotoRepoListRecords 6} from '@atcute/atproto'; 7import { Client as AtcuteClient } from '@atcute/client'; 8import { safeParse, type Handle, type InferOutput } from '@atcute/lexicons'; 9import { 10 isDid, 11 parseCanonicalResourceUri, 12 parseResourceUri, 13 type ActorIdentifier, 14 type AtprotoDid, 15 type Cid, 16 type Did, 17 type Nsid, 18 type RecordKey, 19 type ResourceUri 20} from '@atcute/lexicons/syntax'; 21import type { 22 InferInput, 23 InferXRPCBodyOutput, 24 ObjectSchema, 25 RecordKeySchema, 26 RecordSchema, 27 XRPCQueryMetadata 28} from '@atcute/lexicons/validations'; 29import * as v from '@atcute/lexicons/validations'; 30import { MiniDocQuery, type MiniDoc } from './slingshot'; 31import { BacklinksQuery, type Backlinks, type BacklinksSource } from './constellation'; 32import type { Records } from '@atcute/lexicons/ambient'; 33import { PersistedLRU } from '$lib/cache'; 34import { AppBskyActorProfile } from '@atcute/bluesky'; 35import { WebSocket } from '@soffinal/websocket'; 36import type { Notification } from './stardust'; 37import { get } from 'svelte/store'; 38import { settings } from '$lib/settings'; 39import type { OAuthUserAgent } from '@atcute/oauth-browser-client'; 40// import { JetstreamSubscription } from '@atcute/jetstream'; 41 42const cacheTtl = 1000 * 60 * 60 * 24; 43export const handleCache = new PersistedLRU<Handle, AtprotoDid>({ 44 max: 1000, 45 ttl: cacheTtl, 46 prefix: 'handle' 47}); 48export const didDocCache = new PersistedLRU<ActorIdentifier, MiniDoc>({ 49 max: 1000, 50 ttl: cacheTtl, 51 prefix: 'didDoc' 52}); 53export const recordCache = new PersistedLRU< 54 string, 55 InferOutput<typeof ComAtprotoRepoGetRecord.mainSchema.output.schema> 56>({ 57 max: 5000, 58 ttl: cacheTtl, 59 prefix: 'record' 60}); 61 62export const slingshotUrl: URL = new URL(get(settings).endpoints.slingshot); 63export const spacedustUrl: URL = new URL(get(settings).endpoints.spacedust); 64export const constellationUrl: URL = new URL(get(settings).endpoints.constellation); 65 66type NotificationsStreamEncoder = WebSocket.Encoder<undefined, Notification>; 67export type NotificationsStream = WebSocket<NotificationsStreamEncoder>; 68export type NotificationsStreamEvent = WebSocket.Event<NotificationsStreamEncoder>; 69 70export type RecordOutput<Output> = { uri: ResourceUri; cid: Cid | undefined; record: Output }; 71 72export class AtpClient { 73 public atcute: AtcuteClient | null = null; 74 public user: { did: Did; handle: Handle } | null = null; 75 76 async login(agent: OAuthUserAgent): Promise<Result<null, string>> { 77 try { 78 const rpc = new AtcuteClient({ handler: agent }); 79 const res = await rpc.get('com.atproto.server.getSession'); 80 if (!res.ok) throw res.data.error; 81 this.user = { 82 did: res.data.did, 83 handle: res.data.handle 84 }; 85 this.atcute = rpc; 86 } catch (error) { 87 return err(`failed to login: ${error}`); 88 } 89 90 return ok(null); 91 } 92 93 async getRecordUri< 94 Collection extends Nsid, 95 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } }, 96 TKey extends RecordKeySchema, 97 Schema extends RecordSchema<TObject, TKey>, 98 Output extends InferInput<Schema> 99 >(schema: Schema, uri: ResourceUri): Promise<Result<RecordOutput<Output>, string>> { 100 const parsedUri = expect(parseResourceUri(uri)); 101 if (parsedUri.collection !== schema.object.shape.$type.expected) 102 return err( 103 `collections don't match: ${parsedUri.collection} != ${schema.object.shape.$type.expected}` 104 ); 105 return await this.getRecord(schema, parsedUri.repo!, parsedUri.rkey!); 106 } 107 108 async getRecord< 109 Collection extends Nsid, 110 TObject extends ObjectSchema & { shape: { $type: v.LiteralSchema<Collection> } }, 111 TKey extends RecordKeySchema, 112 Schema extends RecordSchema<TObject, TKey>, 113 Output extends InferInput<Schema> 114 >( 115 schema: Schema, 116 repo: ActorIdentifier, 117 rkey: RecordKey 118 ): Promise<Result<RecordOutput<Output>, string>> { 119 const collection = schema.object.shape.$type.expected; 120 const cacheKey = `${repo}:${collection}:${rkey}`; 121 122 const cached = recordCache.get(cacheKey); 123 if (cached) return ok({ uri: cached.uri, cid: cached.cid, record: cached.value as Output }); 124 const cachedSignal = recordCache.getSignal(cacheKey); 125 126 const result = await Promise.race([ 127 fetchMicrocosm(slingshotUrl, ComAtprotoRepoGetRecord.mainSchema, { 128 repo, 129 collection, 130 rkey 131 }).then((result): Result<RecordOutput<Output>, string> => { 132 if (!result.ok) return result; 133 134 const parsed = safeParse(schema, result.value.value); 135 if (!parsed.ok) return err(parsed.message); 136 137 recordCache.set(cacheKey, result.value); 138 139 return ok({ 140 uri: result.value.uri, 141 cid: result.value.cid, 142 record: parsed.value as Output 143 }); 144 }), 145 cachedSignal.then( 146 (d): Result<RecordOutput<Output>, string> => 147 ok({ uri: d.uri, cid: d.cid, record: d.value as Output }) 148 ) 149 ]); 150 151 if (!result.ok) return result; 152 153 return ok(result.value); 154 } 155 156 async getProfile(repo?: ActorIdentifier): Promise<Result<AppBskyActorProfile.Main, string>> { 157 repo = repo ?? this.user?.did; 158 if (!repo) return err('not authenticated'); 159 return map(await this.getRecord(AppBskyActorProfile.mainSchema, repo, 'self'), (d) => d.record); 160 } 161 162 async listRecords<Collection extends keyof Records>( 163 collection: Collection, 164 repo: ActorIdentifier, 165 cursor?: string, 166 limit?: number 167 ): Promise< 168 Result<InferXRPCBodyOutput<(typeof ComAtprotoRepoListRecords.mainSchema)['output']>, string> 169 > { 170 if (!this.atcute) return err('not authenticated'); 171 const res = await this.atcute.get('com.atproto.repo.listRecords', { 172 params: { 173 repo, 174 collection, 175 cursor, 176 limit 177 } 178 }); 179 if (!res.ok) return err(`${res.data.error}: ${res.data.message ?? 'no details'}`); 180 return ok(res.data); 181 } 182 183 async resolveHandle(identifier: ActorIdentifier): Promise<Result<AtprotoDid, string>> { 184 if (isDid(identifier)) return ok(identifier as AtprotoDid); 185 186 const cached = handleCache.get(identifier); 187 if (cached) return ok(cached); 188 const cachedSignal = handleCache.getSignal(identifier); 189 190 const res = await Promise.race([ 191 fetchMicrocosm(slingshotUrl, ComAtprotoIdentityResolveHandle.mainSchema, { 192 handle: identifier 193 }), 194 cachedSignal.then((d): Result<{ did: Did }, string> => ok({ did: d })) 195 ]); 196 197 const mapped = map(res, (data) => data.did as AtprotoDid); 198 199 if (mapped.ok) handleCache.set(identifier, mapped.value); 200 201 return mapped; 202 } 203 204 async resolveDidDoc(handleOrDid: ActorIdentifier): Promise<Result<MiniDoc, string>> { 205 const cached = didDocCache.get(handleOrDid); 206 if (cached) return ok(cached); 207 const cachedSignal = didDocCache.getSignal(handleOrDid); 208 209 const result = await Promise.race([ 210 fetchMicrocosm(slingshotUrl, MiniDocQuery, { 211 identifier: handleOrDid 212 }), 213 cachedSignal.then((d): Result<MiniDoc, string> => ok(d)) 214 ]); 215 216 if (result.ok) didDocCache.set(handleOrDid, result.value); 217 218 return result; 219 } 220 221 async getBacklinksUri( 222 uri: ResourceUri, 223 source: BacklinksSource 224 ): Promise<Result<Backlinks, string>> { 225 const parsedResourceUri = expect(parseCanonicalResourceUri(uri)); 226 return await this.getBacklinks( 227 parsedResourceUri.repo, 228 parsedResourceUri.collection, 229 parsedResourceUri.rkey, 230 source 231 ); 232 } 233 234 async getBacklinks( 235 repo: ActorIdentifier, 236 collection: Nsid, 237 rkey: RecordKey, 238 source: BacklinksSource 239 ): Promise<Result<Backlinks, string>> { 240 const did = await this.resolveHandle(repo); 241 if (!did.ok) return err(`cant resolve handle: ${did.error}`); 242 243 const timeout = new Promise<null>((resolve) => setTimeout(() => resolve(null), 2000)); 244 const query = fetchMicrocosm(constellationUrl, BacklinksQuery, { 245 subject: `at://${did.value}/${collection}/${rkey}`, 246 source, 247 limit: 100 248 }); 249 250 const results = await Promise.race([query, timeout]); 251 if (!results) return err('cant fetch backlinks: timeout'); 252 253 return results; 254 } 255 256 streamNotifications(subjects: Did[], ...sources: BacklinksSource[]): NotificationsStream { 257 const url = new URL(spacedustUrl); 258 url.protocol = 'wss:'; 259 url.pathname = '/subscribe'; 260 const searchParams = []; 261 sources.every((source) => searchParams.push(['wantedSources', source])); 262 subjects.every((subject) => searchParams.push(['wantedSubjectDids', subject])); 263 subjects.every((subject) => searchParams.push(['wantedSubjects', `at://${subject}`])); 264 searchParams.push(['instant', 'true']); 265 url.search = `?${new URLSearchParams(searchParams)}`; 266 // console.log(`streaming notifications: ${url}`); 267 const encoder = WebSocket.getDefaultEncoder<undefined, Notification>(); 268 const ws = new WebSocket<typeof encoder>(url.toString(), { 269 encoder 270 }); 271 return ws; 272 } 273 274 // streamJetstream(subjects: Did[], ...collections: Nsid[]) { 275 // return new JetstreamSubscription({ 276 // url: 'wss://jetstream2.fr.hose.cam', 277 // wantedCollections: collections, 278 // wantedDids: subjects 279 // }); 280 // } 281} 282 283const fetchMicrocosm = async < 284 Schema extends XRPCQueryMetadata, 285 Input extends Schema['params'] extends ObjectSchema ? InferOutput<Schema['params']> : undefined, 286 Output extends InferXRPCBodyOutput<Schema['output']> 287>( 288 api: URL, 289 schema: Schema, 290 params: Input, 291 init?: RequestInit 292): Promise<Result<Output, string>> => { 293 if (!schema.output || schema.output.type === 'blob') return err('schema must be blob'); 294 api.pathname = `/xrpc/${schema.nsid}`; 295 api.search = params ? `?${new URLSearchParams(params)}` : ''; 296 try { 297 const body = await fetchJson(api, init); 298 if (!body.ok) return err(body.error); 299 const parsed = safeParse(schema.output.schema, body.value); 300 if (!parsed.ok) return err(parsed.message); 301 return ok(parsed.value as Output); 302 } catch (error) { 303 return err(`FetchError: ${error}`); 304 } 305}; 306 307const fetchJson = async (url: URL, init?: RequestInit): Promise<Result<unknown, string>> => { 308 try { 309 const response = await fetch(url, init); 310 const body = await response.json(); 311 if (response.status === 400) return err(`${body.error}: ${body.message}`); 312 if (response.status !== 200) return err(`UnexpectedStatusCode: ${response.status}`); 313 return ok(body); 314 } catch (error) { 315 return err(`FetchError: ${error}`); 316 } 317};