import { Firehose } from "@skyware/firehose"; import { A, useLocation, useSearchParams } from "@solidjs/router"; import { createSignal, For, onCleanup, onMount, Show } from "solid-js"; import { Button } from "../components/button"; import { JSONValue } from "../components/json"; import { StickyOverlay } from "../components/sticky"; import { TextInput } from "../components/text-input"; const LIMIT = 25; type Parameter = { name: string; param: string | string[] | undefined }; const StreamView = () => { const [searchParams, setSearchParams] = useSearchParams(); const [parameters, setParameters] = createSignal([]); const streamType = useLocation().pathname === "/firehose" ? "firehose" : "jetstream"; const [records, setRecords] = createSignal>([]); const [connected, setConnected] = createSignal(false); const [notice, setNotice] = createSignal(""); let socket: WebSocket; let firehose: Firehose; let formRef!: HTMLFormElement; const connectSocket = async (formData: FormData) => { setNotice(""); if (connected()) { if (streamType === "jetstream") socket?.close(); else firehose?.close(); setConnected(false); return; } setRecords([]); let url = ""; if (streamType === "jetstream") { url = formData.get("instance")?.toString() ?? "wss://jetstream1.us-east.bsky.network/subscribe"; url = url.concat("?"); } else { url = formData.get("instance")?.toString() ?? "wss://bsky.network"; url = url.replace("/xrpc/com.atproto.sync.subscribeRepos", ""); if (!(url.startsWith("wss://") || url.startsWith("ws://"))) url = "wss://" + url; } const collections = formData.get("collections")?.toString().split(","); collections?.forEach((collection) => { if (collection.length) url = url.concat(`wantedCollections=${collection}&`); }); const dids = formData.get("dids")?.toString().split(","); dids?.forEach((did) => { if (did.length) url = url.concat(`wantedDids=${did}&`); }); const cursor = formData.get("cursor")?.toString(); if (streamType === "jetstream") { if (cursor?.length) url = url.concat(`cursor=${cursor}`); if (url.endsWith("&")) url = url.slice(0, -1); } setSearchParams({ instance: formData.get("instance")?.toString(), collections: formData.get("collections")?.toString(), dids: formData.get("dids")?.toString(), cursor: formData.get("cursor")?.toString(), allEvents: formData.get("allEvents")?.toString(), }); setParameters([ { name: "Instance", param: formData.get("instance")?.toString() }, { name: "Collections", param: formData.get("collections")?.toString() }, { name: "DIDs", param: formData.get("dids")?.toString() }, { name: "Cursor", param: formData.get("cursor")?.toString() }, { name: "All Events", param: formData.get("allEvents")?.toString() }, ]); setConnected(true); if (streamType === "jetstream") { socket = new WebSocket(url); socket.addEventListener("message", (event) => { const rec = JSON.parse(event.data); if (searchParams.allEvents === "on" || (rec.kind !== "account" && rec.kind !== "identity")) setRecords(records().concat(rec).slice(-LIMIT)); }); socket.addEventListener("error", () => { setNotice("Connection error"); setConnected(false); }); } else { firehose = new Firehose({ relay: url, cursor: cursor, autoReconnect: false, }); firehose.on("error", (err) => { console.error(err); }); firehose.on("commit", (commit) => { for (const op of commit.ops) { const record = { $type: commit.$type, repo: commit.repo, seq: commit.seq, time: commit.time, rev: commit.rev, since: commit.since, op: op, }; setRecords(records().concat(record).slice(-LIMIT)); } }); firehose.on("identity", (identity) => { setRecords(records().concat(identity).slice(-LIMIT)); }); firehose.on("account", (account) => { setRecords(records().concat(account).slice(-LIMIT)); }); firehose.on("sync", (sync) => { const event = { $type: sync.$type, did: sync.did, rev: sync.rev, seq: sync.seq, time: sync.time, }; setRecords(records().concat(event).slice(-LIMIT)); }); firehose.start(); } }; onMount(async () => { const formData = new FormData(); if (searchParams.instance) formData.append("instance", searchParams.instance.toString()); if (searchParams.collections) formData.append("collections", searchParams.collections.toString()); if (searchParams.dids) formData.append("dids", searchParams.dids.toString()); if (searchParams.cursor) formData.append("cursor", searchParams.cursor.toString()); if (searchParams.allEvents) formData.append("allEvents", searchParams.allEvents.toString()); if (searchParams.instance) connectSocket(formData); }); onCleanup(() => socket?.close()); return (