replies timeline only, appview-less bluesky client

feat: add stardust to be notified of new replies

+15
deno.lock
···
"npm:@atcute/lexicons@^1.2.2": "1.2.2",
"npm:@eslint/compat@^1.4.0": "1.4.0_eslint@9.37.0",
"npm:@eslint/js@^9.36.0": "9.37.0",
+
"npm:@soffinal/websocket@~0.2.1": "0.2.1_typescript@5.9.3",
"npm:@sveltejs/adapter-auto@^6.1.0": "6.1.1_@sveltejs+kit@2.47.0__@sveltejs+vite-plugin-svelte@6.2.1___svelte@5.40.1____acorn@8.15.0___vite@7.1.10____@types+node@24.8.0____picomatch@4.0.3___@types+node@24.8.0__svelte@5.40.1___acorn@8.15.0__vite@7.1.10___@types+node@24.8.0___picomatch@4.0.3__acorn@8.15.0__@types+node@24.8.0_@sveltejs+vite-plugin-svelte@6.2.1__svelte@5.40.1___acorn@8.15.0__vite@7.1.10___@types+node@24.8.0___picomatch@4.0.3__@types+node@24.8.0_svelte@5.40.1__acorn@8.15.0_vite@7.1.10__@types+node@24.8.0__picomatch@4.0.3_@types+node@24.8.0",
"npm:@sveltejs/kit@^2.43.2": "2.47.0_@sveltejs+vite-plugin-svelte@6.2.1__svelte@5.40.1___acorn@8.15.0__vite@7.1.10___@types+node@24.8.0___picomatch@4.0.3__@types+node@24.8.0_svelte@5.40.1__acorn@8.15.0_vite@7.1.10__@types+node@24.8.0__picomatch@4.0.3_acorn@8.15.0_@types+node@24.8.0",
"npm:@sveltejs/vite-plugin-svelte@^6.2.0": "6.2.1_svelte@5.40.1__acorn@8.15.0_vite@7.1.10__@types+node@24.8.0__picomatch@4.0.3_@types+node@24.8.0",
···
"integrity": "sha512-bf9PtUa0u8IXDVxzRToFQKsNCRz9qLYfR/MpECxl4mRoWYjAeFjgxj1XdZr2M/GNVpT05p+LgQOHopYDlUu6/w==",
"os": ["win32"],
"cpu": ["x64"]
+
},
+
"@soffinal/stream@0.2.3_typescript@5.9.3": {
+
"integrity": "sha512-B0xWaDsVa6/HxttZmKqD7BmsveQQzuEoY9wztwGIuLF+nsVW1DW2V0kOJZIwTxp1wP4iKPalje1uZaZ+cYv7fg==",
+
"dependencies": [
+
"typescript"
+
]
+
},
+
"@soffinal/websocket@0.2.1_typescript@5.9.3": {
+
"integrity": "sha512-OvBZCtWLRT3gZpseHdd7qBsKNTVYnZsMUwk1aF5m/hZ632MOhaumi4WS/D/hasTHYQFh1XZXy7To+rMVWwubCw==",
+
"dependencies": [
+
"@soffinal/stream",
+
"typescript"
+
]
},
"@standard-schema/spec@1.0.0": {
"integrity": "sha512-m2bOd0f2RT9k8QJx1JN85cZYyH1RqFBdlwtkSlf4tBDYLCiiZnv1fIIwacK6cqwXavOydf0NPToMQgpKq+dVlA=="
···
"npm:@atcute/lexicons@^1.2.2",
"npm:@eslint/compat@^1.4.0",
"npm:@eslint/js@^9.36.0",
+
"npm:@soffinal/websocket@~0.2.1",
"npm:@sveltejs/adapter-auto@^6.1.0",
"npm:@sveltejs/kit@^2.43.2",
"npm:@sveltejs/vite-plugin-svelte@^6.2.0",
+1
package.json
···
"@atcute/client": "^4.0.5",
"@atcute/identity": "^1.1.1",
"@atcute/lexicons": "^1.2.2",
+
"@soffinal/websocket": "^0.2.1",
"@wora/cache-persist": "^2.2.1",
"hash-wasm": "^4.12.0",
"lru-cache": "^11.2.2",
+60 -23
src/lib/at/client.ts
···
import { Client as AtcuteClient, CredentialManager } from '@atcute/client';
import { safeParse, type Handle, type InferOutput } from '@atcute/lexicons';
import {
-
isHandle,
+
isDid,
parseCanonicalResourceUri,
parseResourceUri,
type ActorIdentifier,
···
import type { Records } from '@atcute/lexicons/ambient';
import { PersistedLRU } from '$lib/cache';
import { AppBskyActorProfile } from '@atcute/bluesky';
+
import { WebSocket } from '@soffinal/websocket';
+
import type { Notification } from './stardust';
+
// import { JetstreamSubscription } from '@atcute/jetstream';
const cacheTtl = 1000 * 60 * 60 * 24;
const handleCache = new PersistedLRU<Handle, AtprotoDid>({
···
prefix: 'record'
});
+
export let slingshotUrl: URL = new URL(
+
localStorage.getItem('slingshotUrl') ?? 'https://slingshot.microcosm.blue'
+
);
+
export let spacedustUrl: URL = new URL(
+
localStorage.getItem('spacedustUrl') ?? 'https://spacedust.microcosm.blue'
+
);
+
export let constellationUrl: URL = new URL(
+
localStorage.getItem('constellationUrl') ?? 'https://constellation.microcosm.blue'
+
);
+
+
type NotificationsStreamEncoder = WebSocket.Encoder<undefined, Notification>;
+
export type NotificationsStream = WebSocket<NotificationsStreamEncoder>;
+
export type NotificationsStreamEvent = WebSocket.Event<NotificationsStreamEncoder>;
+
export class AtpClient {
public atcute: AtcuteClient | null = null;
public didDoc: MiniDoc | null = null;
-
private slingshotUrl: URL = new URL('https://slingshot.microcosm.blue');
-
private spacedustUrl: URL = new URL('https://spacedust.microcosm.blue');
-
private constellationUrl: URL = new URL('https://constellation.microcosm.blue');
-
async login(handle: Handle, password: string): Promise<Result<null, string>> {
const didDoc = await this.resolveDidDoc(handle);
if (!didDoc.ok) return err(didDoc.error);
···
const cachedSignal = recordCache.getSignal(cacheKey);
const result = await Promise.race([
-
fetchMicrocosm(this.slingshotUrl, ComAtprotoRepoGetRecord.mainSchema, {
+
fetchMicrocosm(slingshotUrl, ComAtprotoRepoGetRecord.mainSchema, {
repo,
collection,
rkey
···
return ok(res.data);
}
-
async resolveHandle(handle: Handle): Promise<Result<AtprotoDid, string>> {
-
const cached = handleCache.get(handle);
+
async resolveHandle(identifier: ActorIdentifier): Promise<Result<AtprotoDid, string>> {
+
if (isDid(identifier)) return ok(identifier as AtprotoDid);
+
+
const cached = handleCache.get(identifier);
if (cached) return ok(cached);
-
const cachedSignal = handleCache.getSignal(handle);
+
const cachedSignal = handleCache.getSignal(identifier);
const res = await Promise.race([
-
fetchMicrocosm(this.slingshotUrl, ComAtprotoIdentityResolveHandle.mainSchema, {
-
handle
+
fetchMicrocosm(slingshotUrl, ComAtprotoIdentityResolveHandle.mainSchema, {
+
handle: identifier
}),
cachedSignal.then((d): Result<{ did: Did }, string> => ok({ did: d }))
]);
···
const mapped = map(res, (data) => data.did as AtprotoDid);
if (mapped.ok) {
-
handleCache.set(handle, mapped.value);
+
handleCache.set(identifier, mapped.value);
}
return mapped;
···
const cachedSignal = didDocCache.getSignal(handleOrDid);
const result = await Promise.race([
-
fetchMicrocosm(this.slingshotUrl, MiniDocQuery, {
+
fetchMicrocosm(slingshotUrl, MiniDocQuery, {
identifier: handleOrDid
}),
cachedSignal.then((d): Result<MiniDoc, string> => ok(d))
···
rkey: RecordKey,
source: BacklinksSource
): Promise<Result<Backlinks, string>> {
-
let did = repo;
-
if (isHandle(did)) {
-
const resolvedDid = await this.resolveHandle(did);
-
if (!resolvedDid.ok) {
-
return err(`failed to resolve handle: ${resolvedDid.error}`);
-
}
-
did = resolvedDid.value;
+
const did = await this.resolveHandle(repo);
+
if (!did.ok) {
+
return err(`failed to resolve handle: ${did.error}`);
}
-
return await fetchMicrocosm(this.constellationUrl, BacklinksQuery, {
-
subject: `at://${did}/${collection}/${rkey}`,
+
return await fetchMicrocosm(constellationUrl, BacklinksQuery, {
+
subject: `at://${did.value}/${collection}/${rkey}`,
source,
limit: 100
});
}
+
+
streamNotifications(subjects: Did[], ...sources: BacklinksSource[]): NotificationsStream {
+
const url = new URL(spacedustUrl);
+
url.protocol = 'wss:';
+
url.pathname = '/subscribe';
+
const searchParams = [];
+
sources.every((source) => searchParams.push(['wantedSources', source]));
+
subjects.every((subject) => searchParams.push(['wantedSubjectDids', subject]));
+
subjects.every((subject) => searchParams.push(['wantedSubjects', `at://${subject}`]));
+
searchParams.push(['instant', 'true']);
+
url.search = `?${new URLSearchParams(searchParams)}`;
+
// console.log(`streaming notifications: ${url}`);
+
const encoder = WebSocket.getDefaultEncoder<undefined, Notification>();
+
const ws = new WebSocket<typeof encoder>(url.toString(), {
+
encoder
+
});
+
return ws;
+
}
+
+
// streamJetstream(subjects: Did[], ...collections: Nsid[]) {
+
// return new JetstreamSubscription({
+
// url: 'wss://jetstream2.fr.hose.cam',
+
// wantedCollections: collections,
+
// wantedDids: subjects
+
// });
+
// }
}
const fetchMicrocosm = async <
···
try {
api.pathname = `/xrpc/${schema.nsid}`;
api.search = params ? `?${new URLSearchParams(params)}` : '';
-
console.info(`fetching:`, api.href);
+
// console.info(`fetching:`, api.href);
const response = await fetch(api, init);
const body = await response.json();
if (response.status === 400) return err(`${body.error}: ${body.message}`);
+16
src/lib/at/stardust.ts
···
+
import type { CanonicalResourceUri, RecordKey } from '@atcute/lexicons';
+
import type { BacklinksSource } from './constellation';
+
+
export type Notification = {
+
kind: 'link';
+
origin: string;
+
link: LinkNotification;
+
};
+
+
export type LinkNotification = {
+
operation: 'create' | 'update' | 'delete';
+
source: BacklinksSource;
+
source_record: CanonicalResourceUri;
+
source_rev: RecordKey;
+
subject: CanonicalResourceUri;
+
};
+1 -1
src/lib/cache.ts
···
const state = this.storage.getState();
for (const [key, val] of Object.entries(state)) {
try {
-
console.log('restoring', key);
+
// console.log('restoring', key);
const k = this.unprefix(key) as unknown as K;
const v = val as V;
this.memory.set(k, v);
+5 -2
src/lib/index.ts
···
-
import { AtpClient } from './at/client';
+
import { writable } from 'svelte/store';
+
import { type NotificationsStream } from './at/client';
+
// import type { JetstreamSubscription } from '@atcute/jetstream';
-
export const client = new AtpClient();
+
export const notificationStream = writable<NotificationsStream | null>(null);
+
// export const jetstream = writable<JetstreamSubscription | null>(null);
+122 -44
src/routes/+page.svelte
···
import BskyPost from '$components/BskyPost.svelte';
import PostComposer from '$components/PostComposer.svelte';
import AccountSelector from '$components/AccountSelector.svelte';
-
import { AtpClient } from '$lib/at/client';
+
import { AtpClient, type NotificationsStreamEvent } from '$lib/at/client';
import { accounts, addAccount, type Account } from '$lib/accounts';
import {
type Did,
···
import { onMount } from 'svelte';
import { theme } from '$lib/theme.svelte';
import { fetchPostsWithBacklinks, hydratePosts } from '$lib/at/fetch';
-
import { expect } from '$lib/result';
-
import type { AppBskyFeedPost } from '@atcute/bluesky';
+
import { expect, ok } from '$lib/result';
+
import { AppBskyFeedPost } from '@atcute/bluesky';
import { SvelteMap } from 'svelte/reactivity';
import { InfiniteLoader, LoaderState } from 'svelte-infinite';
+
import { notificationStream } from '$lib';
+
import { get } from 'svelte/store';
let loaderState = new LoaderState();
let scrollContainer = $state<HTMLDivElement>();
···
let viewClient = $state<AtpClient>(new AtpClient());
+
let posts = new SvelteMap<Did, SvelteMap<ResourceUri, AppBskyFeedPost.Main>>();
+
let cursors = new SvelteMap<Did, { value?: string; end: boolean }>();
+
+
const addPosts = (did: Did, accTimeline: Map<ResourceUri, AppBskyFeedPost.Main>) => {
+
if (!posts.has(did)) {
+
posts.set(did, new SvelteMap(accTimeline));
+
return;
+
}
+
const map = posts.get(did)!;
+
for (const [uri, record] of accTimeline) map.set(uri, record);
+
};
+
+
const fetchTimeline = async (account: Account) => {
+
const client = clients.get(account.did);
+
if (!client) return;
+
+
const cursor = cursors.get(account.did);
+
if (cursor && cursor.end) return;
+
+
const accPosts = await fetchPostsWithBacklinks(client, account.did, cursor?.value, 12);
+
if (!accPosts.ok)
+
throw `failed to fetch posts for account ${account.handle}: ${accPosts.error}`;
+
+
// if the cursor is undefined, we've reached the end of the timeline
+
if (!accPosts.value.cursor) {
+
cursors.set(account.did, { ...cursor, end: true });
+
return;
+
}
+
+
cursors.set(account.did, { value: accPosts.value.cursor, end: false });
+
addPosts(account.did, await hydratePosts(client, accPosts.value.posts));
+
};
+
+
const fetchTimelines = (newAccounts: Account[]) => Promise.all(newAccounts.map(fetchTimeline));
+
+
const handleNotification = async (event: NotificationsStreamEvent) => {
+
if (event.type === 'message') {
+
// console.log(event.data);
+
const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject));
+
const subjectPost = await viewClient.getRecord(
+
AppBskyFeedPost.mainSchema,
+
parsedSubjectUri.repo,
+
parsedSubjectUri.rkey
+
);
+
if (!subjectPost.ok) return;
+
+
const parsedSourceUri = expect(parseCanonicalResourceUri(event.data.link.source_record));
+
const hydrated = await hydratePosts(viewClient, [
+
{
+
record: subjectPost.value,
+
uri: event.data.link.subject,
+
replies: ok({
+
cursor: null,
+
total: 1,
+
records: [
+
{
+
did: parsedSourceUri.repo,
+
collection: parsedSourceUri.collection,
+
rkey: parsedSourceUri.rkey
+
}
+
]
+
})
+
}
+
]);
+
+
// console.log(hydrated);
+
addPosts(parsedSubjectUri.repo, hydrated);
+
}
+
};
+
+
// const handleJetstream = async (subscription: JetstreamSubscription) => {
+
// for await (const event of subscription) {
+
// if (event.kind !== 'commit') continue;
+
// const commit = event.commit;
+
// if (commit.operation === 'delete') {
+
// continue;
+
// }
+
// const record = commit.record as AppBskyFeedPost.Main;
+
// addPosts(
+
// event.did,
+
// new Map([[`at://${event.did}/${commit.collection}/${commit.rkey}` as ResourceUri, record]])
+
// );
+
// }
+
// };
+
onMount(async () => {
+
accounts.subscribe((newAccounts) => {
+
get(notificationStream)?.stop();
+
// jetstream.set(null);
+
if (newAccounts.length === 0) return;
+
notificationStream.set(
+
viewClient.streamNotifications(
+
newAccounts.map((account) => account.did),
+
'app.bsky.feed.post:reply.parent.uri'
+
)
+
);
+
// jetstream.set(
+
// viewClient.streamJetstream(
+
// newAccounts.map((account) => account.did),
+
// 'app.bsky.feed.post'
+
// )
+
// );
+
});
+
notificationStream.subscribe((stream) => {
+
if (!stream) return;
+
stream.listen(handleNotification);
+
});
+
// jetstream.subscribe((stream) => {
+
// if (!stream) return;
+
// handleJetstream(stream);
+
// });
if ($accounts.length > 0) {
loaderState.status = 'LOADING';
selectedDid = $accounts[0].did;
···
loginAccount(newAccount).then(() => fetchTimeline(newAccount));
};
-
let posts = new SvelteMap<Did, SvelteMap<ResourceUri, AppBskyFeedPost.Main>>();
-
let cursors = new SvelteMap<Did, { value?: string; end: boolean }>();
-
-
const fetchTimeline = async (account: Account) => {
-
const client = clients.get(account.did);
-
if (!client) return;
-
-
const cursor = cursors.get(account.did);
-
if (cursor && cursor.end) return;
-
-
const accPosts = await fetchPostsWithBacklinks(client, account.did, cursor?.value, 6);
-
if (!accPosts.ok) {
-
throw `failed to fetch posts for account ${account.handle}: ${accPosts.error}`;
-
}
-
-
// if the cursor is undefined, we've reached the end of the timeline
-
if (!accPosts.value.cursor) {
-
cursors.set(account.did, { ...cursor, end: true });
-
return;
-
}
-
-
cursors.set(account.did, { value: accPosts.value.cursor, end: false });
-
const accTimeline = await hydratePosts(client, accPosts.value.posts);
-
if (!posts.has(account.did)) {
-
posts.set(account.did, new SvelteMap(accTimeline));
-
return;
-
}
-
const map = posts.get(account.did)!;
-
for (const [uri, record] of accTimeline) map.set(uri, record);
-
};
-
-
const fetchTimelines = (newAccounts: Account[]) => Promise.all(newAccounts.map(fetchTimeline));
-
let loading = $state(false);
let loadError = $state('');
const loadMore = async () => {
···
};
let reverseChronological = $state(true);
-
let viewOwnPosts = $state(false);
+
let viewOwnPosts = $state(true);
type ThreadPost = {
uri: ResourceUri;
···
newestTime: new Date(record.createdAt).getTime()
};
-
if (!threadMap.has(rootUri)) {
-
threadMap.set(rootUri, []);
-
}
+
if (!threadMap.has(rootUri)) threadMap.set(rootUri, []);
+
threadMap.get(rootUri)!.push(post);
}
}
···
// Sort threads by newest time (descending) so older branches appear first
threads.sort((a, b) => b.newestTime - a.newestTime);
+
// console.log(threads);
+
return threads;
};
···
posts.some((post) => !isOwnPost(post, accounts));
const filterThreads = (threads: Thread[], accounts: Account[]) =>
threads.filter((thread) => {
-
if (!viewOwnPosts) {
-
return hasNonOwnPost(thread.posts, accounts);
-
}
+
if (!viewOwnPosts) return hasNonOwnPost(thread.posts, accounts);
return true;
});
···
{/snippet}
{#snippet threadsView()}
-
{#each threads as thread (thread.rootUri)}
+
{#each threads as thread ([thread.rootUri, thread.branchParentPost, ...thread.posts.map((post) => post.uri)])}
<div class="flex {reverseChronological ? 'flex-col' : 'flex-col-reverse'} mb-6.5">
{#if thread.branchParentPost}
{@const post = thread.branchParentPost}