1import { Firehose } from "@skyware/firehose";
2import { A, useLocation, useSearchParams } from "@solidjs/router";
3import { createSignal, For, onCleanup, onMount, Show } from "solid-js";
4import { Button } from "../components/button";
5import { JSONValue } from "../components/json";
6import { StickyOverlay } from "../components/sticky";
7import { TextInput } from "../components/text-input";
8
9const LIMIT = 25;
10type Parameter = { name: string; param: string | string[] | undefined };
11
12const StreamView = () => {
13 const [searchParams, setSearchParams] = useSearchParams();
14 const [parameters, setParameters] = createSignal<Parameter[]>([]);
15 const streamType = useLocation().pathname === "/firehose" ? "firehose" : "jetstream";
16 const [records, setRecords] = createSignal<Array<any>>([]);
17 const [connected, setConnected] = createSignal(false);
18 const [notice, setNotice] = createSignal("");
19 let socket: WebSocket;
20 let firehose: Firehose;
21 let formRef!: HTMLFormElement;
22
23 const connectSocket = async (formData: FormData) => {
24 setNotice("");
25 if (connected()) {
26 if (streamType === "jetstream") socket?.close();
27 else firehose?.close();
28 setConnected(false);
29 return;
30 }
31 setRecords([]);
32
33 let url = "";
34 if (streamType === "jetstream") {
35 url =
36 formData.get("instance")?.toString() ?? "wss://jetstream1.us-east.bsky.network/subscribe";
37 url = url.concat("?");
38 } else {
39 url = formData.get("instance")?.toString() ?? "wss://bsky.network";
40 url = url.replace("/xrpc/com.atproto.sync.subscribeRepos", "");
41 if (!(url.startsWith("wss://") || url.startsWith("ws://"))) url = "wss://" + url;
42 }
43
44 const collections = formData.get("collections")?.toString().split(",");
45 collections?.forEach((collection) => {
46 if (collection.length) url = url.concat(`wantedCollections=${collection}&`);
47 });
48
49 const dids = formData.get("dids")?.toString().split(",");
50 dids?.forEach((did) => {
51 if (did.length) url = url.concat(`wantedDids=${did}&`);
52 });
53
54 const cursor = formData.get("cursor")?.toString();
55 if (streamType === "jetstream") {
56 if (cursor?.length) url = url.concat(`cursor=${cursor}`);
57 if (url.endsWith("&")) url = url.slice(0, -1);
58 }
59
60 setSearchParams({
61 instance: formData.get("instance")?.toString(),
62 collections: formData.get("collections")?.toString(),
63 dids: formData.get("dids")?.toString(),
64 cursor: formData.get("cursor")?.toString(),
65 allEvents: formData.get("allEvents")?.toString(),
66 });
67
68 setParameters([
69 { name: "Instance", param: formData.get("instance")?.toString() },
70 { name: "Collections", param: formData.get("collections")?.toString() },
71 { name: "DIDs", param: formData.get("dids")?.toString() },
72 { name: "Cursor", param: formData.get("cursor")?.toString() },
73 { name: "All Events", param: formData.get("allEvents")?.toString() },
74 ]);
75
76 setConnected(true);
77 if (streamType === "jetstream") {
78 socket = new WebSocket(url);
79 socket.addEventListener("message", (event) => {
80 const rec = JSON.parse(event.data);
81 if (searchParams.allEvents === "on" || (rec.kind !== "account" && rec.kind !== "identity"))
82 setRecords(records().concat(rec).slice(-LIMIT));
83 });
84 socket.addEventListener("error", () => {
85 setNotice("Connection error");
86 setConnected(false);
87 });
88 } else {
89 firehose = new Firehose({
90 relay: url,
91 cursor: cursor,
92 autoReconnect: false,
93 });
94 firehose.on("error", (err) => {
95 console.error(err);
96 });
97 firehose.on("commit", (commit) => {
98 for (const op of commit.ops) {
99 const record = {
100 $type: commit.$type,
101 repo: commit.repo,
102 seq: commit.seq,
103 time: commit.time,
104 rev: commit.rev,
105 since: commit.since,
106 op: op,
107 };
108 setRecords(records().concat(record).slice(-LIMIT));
109 }
110 });
111 firehose.on("identity", (identity) => {
112 setRecords(records().concat(identity).slice(-LIMIT));
113 });
114 firehose.on("account", (account) => {
115 setRecords(records().concat(account).slice(-LIMIT));
116 });
117 firehose.on("sync", (sync) => {
118 const event = {
119 $type: sync.$type,
120 did: sync.did,
121 rev: sync.rev,
122 seq: sync.seq,
123 time: sync.time,
124 };
125 setRecords(records().concat(event).slice(-LIMIT));
126 });
127 firehose.start();
128 }
129 };
130
131 onMount(async () => {
132 const formData = new FormData();
133 if (searchParams.instance) formData.append("instance", searchParams.instance.toString());
134 if (searchParams.collections)
135 formData.append("collections", searchParams.collections.toString());
136 if (searchParams.dids) formData.append("dids", searchParams.dids.toString());
137 if (searchParams.cursor) formData.append("cursor", searchParams.cursor.toString());
138 if (searchParams.allEvents) formData.append("allEvents", searchParams.allEvents.toString());
139 if (searchParams.instance) connectSocket(formData);
140 });
141
142 onCleanup(() => socket?.close());
143
144 return (
145 <div class="flex w-full flex-col items-center">
146 <div class="flex gap-2 text-sm">
147 <A
148 class="flex items-center gap-1 border-b-2 p-1"
149 inactiveClass="border-transparent hover:border-neutral-400 dark:hover:border-neutral-600"
150 href="/jetstream"
151 >
152 Jetstream
153 </A>
154 <A
155 class="flex items-center gap-1 border-b-2 p-1"
156 inactiveClass="border-transparent hover:border-neutral-400 dark:hover:border-neutral-600"
157 href="/firehose"
158 >
159 Firehose
160 </A>
161 </div>
162 <StickyOverlay>
163 <form ref={formRef} class="flex w-full flex-col gap-1 text-sm">
164 <Show when={!connected()}>
165 <label class="flex items-center justify-end gap-x-1">
166 <span class="min-w-20">Instance</span>
167 <TextInput
168 name="instance"
169 value={
170 searchParams.instance ??
171 (streamType === "jetstream" ?
172 "wss://jetstream1.us-east.bsky.network/subscribe"
173 : "wss://bsky.network")
174 }
175 class="grow"
176 />
177 </label>
178 <Show when={streamType === "jetstream"}>
179 <label class="flex items-center justify-end gap-x-1">
180 <span class="min-w-20">Collections</span>
181 <textarea
182 name="collections"
183 spellcheck={false}
184 placeholder="Comma-separated list of collections"
185 value={searchParams.collections ?? ""}
186 class="dark:bg-dark-100 dark:inset-shadow-dark-200 grow rounded-lg border-[0.5px] border-neutral-300 bg-white px-2 py-1 inset-shadow-xs focus:outline-[1px] focus:outline-neutral-600 dark:border-neutral-600 dark:focus:outline-neutral-400"
187 />
188 </label>
189 </Show>
190 <Show when={streamType === "jetstream"}>
191 <label class="flex items-center justify-end gap-x-1">
192 <span class="min-w-20">DIDs</span>
193 <textarea
194 name="dids"
195 spellcheck={false}
196 placeholder="Comma-separated list of DIDs"
197 value={searchParams.dids ?? ""}
198 class="dark:bg-dark-100 dark:inset-shadow-dark-200 grow rounded-lg border-[0.5px] border-neutral-300 bg-white px-2 py-1 inset-shadow-xs focus:outline-[1px] focus:outline-neutral-600 dark:border-neutral-600 dark:focus:outline-neutral-400"
199 />
200 </label>
201 </Show>
202 <label class="flex items-center justify-end gap-x-1">
203 <span class="min-w-20">Cursor</span>
204 <TextInput
205 name="cursor"
206 placeholder="Leave empty for live-tail"
207 value={searchParams.cursor ?? ""}
208 class="grow"
209 />
210 </label>
211 <Show when={streamType === "jetstream"}>
212 <div class="flex items-center justify-end gap-x-1">
213 <input
214 type="checkbox"
215 name="allEvents"
216 id="allEvents"
217 checked={searchParams.allEvents === "on" ? true : false}
218 />
219 <label for="allEvents" class="select-none">
220 Show account and identity events
221 </label>
222 </div>
223 </Show>
224 </Show>
225 <Show when={connected()}>
226 <div class="flex flex-col gap-1 wrap-anywhere">
227 <For each={parameters()}>
228 {(param) => (
229 <Show when={param.param}>
230 <div class="flex">
231 <div class="min-w-24 font-semibold">{param.name}</div>
232 {param.param}
233 </div>
234 </Show>
235 )}
236 </For>
237 </div>
238 </Show>
239 <div class="flex justify-end">
240 <Button onClick={() => connectSocket(new FormData(formRef))}>
241 {connected() ? "Disconnect" : "Connect"}
242 </Button>
243 </div>
244 </form>
245 </StickyOverlay>
246 <Show when={notice().length}>
247 <div class="text-red-500 dark:text-red-400">{notice()}</div>
248 </Show>
249 <div class="flex w-full flex-col gap-2 divide-y-[0.5px] divide-neutral-500 font-mono text-sm wrap-anywhere whitespace-pre-wrap md:w-3xl">
250 <For each={records().toReversed()}>
251 {(rec) => (
252 <div class="pb-2">
253 <JSONValue data={rec} repo={rec.did ?? rec.repo} />
254 </div>
255 )}
256 </For>
257 </div>
258 </div>
259 );
260};
261
262export { StreamView };