From 870ec98166bbd29825eff74b293543d2626f0c62 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Sun, 12 Oct 2025 17:51:32 -0500 Subject: [PATCH] feat: add jetstream firehose backend with SSE streaming MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit adds rust backend for real-time firehose visualization: - firehose module connects to jetstream and broadcasts all atproto events - uses rocketman crate with lexicon ingester pattern - SSE endpoint at /api/firehose/watch streams filtered events by DID - auto-reconnects on connection drop remaining work: - add UI toggle button and toast notifications in templates - implement particle animation system in static/app.js - test with live events 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- Cargo.lock | 450 +++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 5 + src/firehose.rs | 146 ++++++++++++++++ src/main.rs | 6 + src/routes.rs | 37 ++++ 5 files changed, 640 insertions(+), 4 deletions(-) create mode 100644 src/firehose.rs diff --git a/Cargo.lock b/Cargo.lock index efdf92c..9390142 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,6 +275,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -394,6 +406,28 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -412,14 +446,19 @@ dependencies = [ "actix-files", "actix-session", "actix-web", + "anyhow", + "async-stream", + "async-trait", "atrium-api", "atrium-common", "atrium-identity", "atrium-oauth", "env_logger", + "futures-util", "hickory-resolver", "log", "reqwest", + "rocketman", "serde", "serde_json", "tokio", @@ -575,6 +614,12 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "base64" version = "0.22.1" @@ -602,6 +647,31 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bon" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebeb9aaf9329dff6ceb65c689ca3db33dbf15f324909c60e4e5eef5701ce31b1" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "3.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77e9d642a7e3a318e37c2c9427b5a6a48aa1ad55dcd986f3034ab2239045a645" +dependencies = [ + "darling 0.21.3", + "ident_case", + "prettyplease", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.106", +] + [[package]] name = "brotli" version = "8.0.2" @@ -629,6 +699,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.10.1" @@ -861,6 +937,76 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" +dependencies = [ + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core 0.21.3", + "darling_macro 0.21.3", +] + +[[package]] +name = "darling_core" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d00b9596d185e565c2207a0b01f8bd1a135483d02d9b7b0a54b11da8d53412e" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.106", +] + +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.106", +] + +[[package]] +name = "darling_macro" +version = "0.20.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" +dependencies = [ + "darling_core 0.20.11", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core 0.21.3", + "quote", + "syn 2.0.106", +] + [[package]] name = "dashmap" version = "6.1.0" @@ -920,6 +1066,37 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_builder" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" +dependencies = [ + "darling 0.20.11", + "proc-macro2", + "quote", + "syn 2.0.106", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c" +dependencies = [ + "derive_builder_core", + "syn 2.0.106", +] + [[package]] name = "derive_more" version = "1.0.0" @@ -1129,6 +1306,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1217,6 +1406,7 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", "futures-macro", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", @@ -1241,8 +1431,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi 0.11.1+wasi-snapshot-preview1", + "wasm-bindgen", ] [[package]] @@ -1508,10 +1700,10 @@ dependencies = [ "http 1.3.1", "hyper", "hyper-util", - "rustls", + "rustls 0.23.31", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.2", "tower-service", ] @@ -1667,6 +1859,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -1868,6 +2066,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4345964bb142484797b161f473a503a434de77149dd8c7427788c6e13379388" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.176" @@ -1959,6 +2163,16 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +[[package]] +name = "metrics" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25dea7ac8057892855ec285c440160265225438c3c45072613c25a4b26e98ef5" +dependencies = [ + "ahash", + "portable-atomic", +] + [[package]] name = "mime" version = "0.3.17" @@ -2041,6 +2255,15 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -2058,6 +2281,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.1", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -2260,6 +2492,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.106", +] + [[package]] name = "primeorder" version = "0.13.6" @@ -2469,6 +2711,30 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rocketman" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90cfc4ee9daf6e9d0ee217b9709aa3bd6c921e6926aa15c6ff5ba9162c2c649a" +dependencies = [ + "anyhow", + "async-trait", + "bon", + "derive_builder", + "flume", + "futures-util", + "metrics", + "rand 0.8.5", + "serde", + "serde_json", + "tokio", + "tokio-tungstenite", + "tracing", + "tracing-subscriber", + "url", + "zstd", +] + [[package]] name = "rustc-demangle" version = "0.1.26" @@ -2497,6 +2763,18 @@ dependencies = [ "windows-sys 0.61.1", ] +[[package]] +name = "rustls" +version = "0.21.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" +dependencies = [ + "log", + "ring", + "rustls-webpki 0.101.7", + "sct", +] + [[package]] name = "rustls" version = "0.23.31" @@ -2505,11 +2783,32 @@ checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc" dependencies = [ "once_cell", "rustls-pki-types", - "rustls-webpki", + "rustls-webpki 0.103.4", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pki-types" version = "1.12.0" @@ -2519,6 +2818,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustls-webpki" version = "0.103.4" @@ -2557,6 +2866,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "sct" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "sec1" version = "0.7.3" @@ -2699,6 +3018,15 @@ dependencies = [ "digest", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -2762,12 +3090,27 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -2876,6 +3219,15 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "time" version = "0.3.44" @@ -2973,14 +3325,40 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ - "rustls", + "rustls 0.23.31", + "tokio", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" +dependencies = [ + "futures-util", + "log", + "rustls 0.21.12", + "rustls-native-certs", "tokio", + "tokio-rustls 0.24.1", + "tungstenite", + "webpki-roots", ] [[package]] @@ -3071,6 +3449,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d12581f227e93f094d3af2ae690a574abb8a2b9b7a96e7cfe9647b2b617678" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2054a14f5307d601f88daf0553e1cbf472acc4f2c51afab632431cdcd72124d5" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", ] [[package]] @@ -3090,6 +3494,26 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 0.2.12", + "httparse", + "log", + "rand 0.8.5", + "rustls 0.21.12", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.19.0" @@ -3148,6 +3572,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -3177,6 +3607,12 @@ version = "0.15.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4e8257fbc510f0a46eb602c10215901938b5c2a7d5e70fc11483b1d3c9b5b18c" +[[package]] +name = "valuable" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" + [[package]] name = "vcpkg" version = "0.2.15" @@ -3314,6 +3750,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.25.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" + [[package]] name = "widestring" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 0b5f254..aadb58e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,8 @@ hickory-resolver = "0.24" env_logger = "0.11" log = "0.4" reqwest = { version = "0.12", features = ["json"] } +rocketman = "0.2.0" +futures-util = "0.3" +anyhow = "1.0" +async-stream = "0.3" +async-trait = "0.1" diff --git a/src/firehose.rs b/src/firehose.rs new file mode 100644 index 0000000..3bb9483 --- /dev/null +++ b/src/firehose.rs @@ -0,0 +1,146 @@ +use anyhow::Result; +use async_trait::async_trait; +use log::{error, info}; +use rocketman::{ + connection::JetstreamConnection, + ingestion::LexiconIngestor, + options::JetstreamOptions, + types::event::{Event, Operation}, +}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; +use tokio::sync::broadcast; + +/// Represents a firehose event that will be sent to the browser +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct FirehoseEvent { + pub did: String, + pub action: String, // "create", "update", or "delete" + pub collection: String, + pub rkey: String, + pub namespace: String, // e.g., "app.bsky" extracted from collection +} + +/// Broadcaster for firehose events +pub type FirehoseBroadcaster = Arc>; + +/// A generic ingester that broadcasts all events +struct BroadcastIngester { + broadcaster: FirehoseBroadcaster, +} + +#[async_trait] +impl LexiconIngestor for BroadcastIngester { + async fn ingest(&self, message: Event) -> Result<()> { + // Only process commit events + let Some(commit) = &message.commit else { + return Ok(()); + }; + + // Extract namespace from collection (e.g., "app.bsky.feed.post" -> "app.bsky") + let collection_parts: Vec<&str> = commit.collection.split('.').collect(); + let namespace = if collection_parts.len() >= 2 { + format!("{}.{}", collection_parts[0], collection_parts[1]) + } else { + commit.collection.clone() + }; + + let action = match commit.operation { + Operation::Create => "create", + Operation::Update => "update", + Operation::Delete => "delete", + }; + + let firehose_event = FirehoseEvent { + did: message.did.clone(), + action: action.to_string(), + collection: commit.collection.clone(), + rkey: commit.rkey.clone(), + namespace, + }; + + // Broadcast the event (ignore if no receivers) + let _ = self.broadcaster.send(firehose_event); + + Ok(()) + } +} + +/// Start the Jetstream ingester that broadcasts events to all listeners +pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { + // Create a broadcast channel with a buffer of 100 events + let (tx, _rx) = broadcast::channel::(100); + let broadcaster = Arc::new(tx); + + let broadcaster_clone = broadcaster.clone(); + + tokio::spawn(async move { + loop { + info!("Starting Jetstream connection..."); + + // Configure Jetstream to receive all events (no collection filter) + let opts = JetstreamOptions::builder().build(); + let jetstream = JetstreamConnection::new(opts); + + // Create ingesters - we use a wildcard to capture all collections + let mut ingesters: HashMap> = + HashMap::new(); + + // Use "*" as a catch-all for all collections + ingesters.insert( + "*".to_string(), + Box::new(BroadcastIngester { + broadcaster: broadcaster_clone.clone(), + }), + ); + + // Get channels + let msg_rx = jetstream.get_msg_rx(); + let reconnect_tx = jetstream.get_reconnect_tx(); + + // Cursor for tracking last processed message + let cursor: Arc>> = Arc::new(Mutex::new(None)); + let c_cursor = cursor.clone(); + + // Spawn task to process messages + tokio::spawn(async move { + while let Ok(message) = msg_rx.recv_async().await { + if let Err(e) = rocketman::handler::handle_message( + message, + &ingesters, + reconnect_tx.clone(), + c_cursor.clone(), + ) + .await + { + error!("Error processing message: {}", e); + } + } + }); + + // Connect to Jetstream + let failed = { + let connect_result = jetstream.connect(cursor).await; + if let Err(e) = connect_result { + error!("Jetstream connection failed: {}", e); + true + } else { + false + } + }; + + if failed { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + continue; + } + + info!("Jetstream connection dropped, reconnecting in 5 seconds..."); + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + }); + + broadcaster +} diff --git a/src/main.rs b/src/main.rs index 8f28b48..9abf5e0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use actix_session::{SessionMiddleware, config::PersistentSession, storage::Cooki use actix_web::{App, HttpServer, cookie::{Key, time::Duration}, middleware, web}; use actix_files::Files; +mod firehose; mod mst; mod oauth; mod routes; @@ -13,6 +14,9 @@ async fn main() -> std::io::Result<()> { let client = oauth::create_oauth_client(); + // Start the firehose broadcaster + let firehose_broadcaster = firehose::start_firehose_broadcaster().await; + println!("starting server at http://localhost:8080"); HttpServer::new(move || { @@ -31,6 +35,7 @@ async fn main() -> std::io::Result<()> { .build(), ) .app_data(web::Data::new(client.clone())) + .app_data(web::Data::new(firehose_broadcaster.clone())) .service(routes::index) .service(routes::login) .service(routes::callback) @@ -41,6 +46,7 @@ async fn main() -> std::io::Result<()> { .service(routes::init) .service(routes::get_avatar) .service(routes::validate_url) + .service(routes::firehose_watch) .service(routes::favicon) .service(Files::new("/static", "./static")) }) diff --git a/src/routes.rs b/src/routes.rs index e4bf8a4..fa1cb59 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,6 +3,7 @@ use actix_web::{get, post, web, HttpResponse, Responder}; use atrium_oauth::{AuthorizeOptions, CallbackParams, KnownScope, Scope}; use serde::Deserialize; +use crate::firehose::FirehoseBroadcaster; use crate::mst; use crate::oauth::OAuthClientType; use crate::templates; @@ -390,3 +391,39 @@ pub async fn validate_url(query: web::Query) -> HttpResponse { "valid": is_valid })) } + +#[derive(Deserialize)] +pub struct FirehoseQuery { + did: String, +} + +#[get("/api/firehose/watch")] +pub async fn firehose_watch( + query: web::Query, + broadcaster: web::Data, +) -> HttpResponse { + let did = query.did.clone(); + let mut rx = broadcaster.subscribe(); + + let stream = async_stream::stream! { + // Send initial connection message + yield Ok::<_, actix_web::Error>( + web::Bytes::from(format!("data: {{\"type\":\"connected\"}}\n\n")) + ); + + // Stream firehose events filtered by DID + while let Ok(event) = rx.recv().await { + // Only send events for this DID + if event.did == did { + let json = serde_json::to_string(&event).unwrap_or_default(); + yield Ok(web::Bytes::from(format!("data: {}\n\n", json))); + } + } + }; + + HttpResponse::Ok() + .content_type("text/event-stream") + .insert_header(("Cache-Control", "no-cache")) + .insert_header(("X-Accel-Buffering", "no")) + .streaming(Box::pin(stream)) +} -- 2.43.0 From c0985ab919c78a5a7097315cd30308062f565178 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Sun, 12 Oct 2025 18:06:15 -0500 Subject: [PATCH] feat: add frontend firehose visualization with particle animations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit implements the visual layer for real-time firehose events: - watch live toggle button in top-right corner - toast notifications showing event actions (create/update/delete) - particle animation system using canvas overlay - colored particles flow from identity to app circles - particles: green for create, blue for update, red for delete - app circles pulse when receiving data - auto-reconnects on connection drop - clean shutdown when toggled off complete implementation: - backend: rust jetstream connector + SSE endpoint (src/firehose.rs, src/routes.rs) - frontend: particle system + event handling (static/app.js) - ui: button, toast, css animations (src/templates.rs) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/templates.rs | 105 +++++++++++++++++ static/app.js | 293 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 398 insertions(+) diff --git a/src/templates.rs b/src/templates.rs index a2bfaa2..66d1601 100644 --- a/src/templates.rs +++ b/src/templates.rs @@ -1179,12 +1179,117 @@ pub fn app_page(did: &str) -> String { .ownership-text strong {{ color: var(--text); }} + + .watch-live-btn {{ + position: fixed; + top: clamp(1rem, 2vmin, 1.5rem); + right: clamp(6rem, 14vmin, 9rem); + font-size: clamp(0.65rem, 1.4vmin, 0.75rem); + color: var(--text-light); + border: 1px solid var(--border); + background: var(--bg); + padding: clamp(0.4rem, 1vmin, 0.5rem) clamp(0.8rem, 2vmin, 1rem); + transition: all 0.2s ease; + z-index: 100; + cursor: pointer; + border-radius: 2px; + display: flex; + align-items: center; + gap: 0.5rem; + }} + + .watch-live-btn:hover {{ + background: var(--surface); + color: var(--text); + border-color: var(--text-light); + }} + + .watch-live-btn.active {{ + background: var(--surface-hover); + color: var(--text); + border-color: var(--text); + }} + + .watch-indicator {{ + width: 8px; + height: 8px; + border-radius: 50%; + background: var(--text-light); + display: none; + }} + + .watch-live-btn.active .watch-indicator {{ + display: block; + animation: pulse 2s ease-in-out infinite; + }} + + @keyframes pulse {{ + 0%, 100% {{ opacity: 1; }} + 50% {{ opacity: 0.3; }} + }} + + .firehose-toast {{ + position: fixed; + top: clamp(4rem, 8vmin, 5rem); + right: clamp(1rem, 2vmin, 1.5rem); + background: var(--surface); + border: 1px solid var(--border); + padding: 0.75rem 1rem; + border-radius: 4px; + font-size: 0.7rem; + color: var(--text); + z-index: 200; + opacity: 0; + transform: translateY(-10px); + transition: all 0.3s ease; + pointer-events: none; + max-width: 300px; + }} + + .firehose-toast.visible {{ + opacity: 1; + transform: translateY(0); + }} + + .firehose-toast-action {{ + font-weight: 600; + color: var(--text); + }} + + .firehose-toast-collection {{ + color: var(--text-light); + font-size: 0.65rem; + margin-top: 0.25rem; + }} + + @media (max-width: 768px) {{ + .watch-live-btn {{ + right: clamp(1rem, 2vmin, 1.5rem); + top: clamp(4rem, 8vmin, 5rem); + }} + + .firehose-toast {{ + top: clamp(7rem, 12vmin, 8rem); + right: clamp(1rem, 2vmin, 1.5rem); + left: clamp(1rem, 2vmin, 1.5rem); + max-width: none; + }} + }}
?
+ logout +
+
+
+
+

@me - your repository

diff --git a/static/app.js b/static/app.js index f074e5d..996c64a 100644 --- a/static/app.js +++ b/static/app.js @@ -731,3 +731,296 @@ function layoutTree(tree, width, height) { traverse(tree, 0, padding, width - padding); return nodes; } + +// ============================================================================ +// FIREHOSE VISUALIZATION +// ============================================================================ + +// Particle class for animating firehose events +class FirehoseParticle { + constructor(startX, startY, endX, endY, color, metadata) { + this.x = startX; + this.y = startY; + this.startX = startX; + this.startY = startY; + this.endX = endX; + this.endY = endY; + this.color = color; + this.metadata = metadata; // {action, collection, namespace} + this.progress = 0; + this.speed = 0.012; // Slower for visibility + this.size = 5; + this.glowSize = 10; + } + + update() { + if (this.progress < 1) { + this.progress += this.speed; + // Cubic ease-in-out + const eased = this.progress < 0.5 + ? 4 * this.progress * this.progress * this.progress + : 1 - Math.pow(-2 * this.progress + 2, 3) / 2; + + this.x = this.startX + (this.endX - this.startX) * eased; + this.y = this.startY + (this.endY - this.startY) * eased; + } + return this.progress < 1; + } + + draw(ctx) { + // Outer glow + ctx.beginPath(); + ctx.arc(this.x, this.y, this.glowSize, 0, Math.PI * 2); + const gradient = ctx.createRadialGradient( + this.x, this.y, 0, + this.x, this.y, this.glowSize + ); + gradient.addColorStop(0, this.color + '80'); + gradient.addColorStop(1, this.color + '00'); + ctx.fillStyle = gradient; + ctx.fill(); + + // Inner particle + ctx.beginPath(); + ctx.arc(this.x, this.y, this.size, 0, Math.PI * 2); + ctx.fillStyle = this.color; + ctx.fill(); + } +} + +// Firehose state +let firehoseParticles = []; +let firehoseCanvas = null; +let firehoseCtx = null; +let firehoseAnimationId = null; +let firehoseEventSource = null; +let isWatchingLive = false; + +function initFirehoseCanvas() { + // Create canvas overlay + firehoseCanvas = document.createElement('canvas'); + firehoseCanvas.id = 'firehoseCanvas'; + firehoseCanvas.style.position = 'fixed'; + firehoseCanvas.style.top = '0'; + firehoseCanvas.style.left = '0'; + firehoseCanvas.style.width = '100%'; + firehoseCanvas.style.height = '100%'; + firehoseCanvas.style.pointerEvents = 'none'; + firehoseCanvas.style.zIndex = '50'; + firehoseCanvas.width = window.innerWidth; + firehoseCanvas.height = window.innerHeight; + + document.body.appendChild(firehoseCanvas); + firehoseCtx = firehoseCanvas.getContext('2d'); + + // Handle window resize + window.addEventListener('resize', () => { + firehoseCanvas.width = window.innerWidth; + firehoseCanvas.height = window.innerHeight; + }); +} + +function animateFirehoseParticles() { + if (!firehoseCtx) return; + + firehoseCtx.clearRect(0, 0, firehoseCanvas.width, firehoseCanvas.height); + + // Update and draw all particles + firehoseParticles = firehoseParticles.filter(particle => { + const alive = particle.update(); + if (alive) { + particle.draw(firehoseCtx); + } else { + // Particle reached destination - pulse the app circle + pulseAppCircle(particle.metadata.namespace); + } + return alive; + }); + + if (isWatchingLive) { + firehoseAnimationId = requestAnimationFrame(animateFirehoseParticles); + } +} + +function pulseAppCircle(namespace) { + const appCircle = document.querySelector(`[data-namespace="${namespace}"]`); + if (appCircle) { + appCircle.style.transition = 'all 0.3s ease'; + appCircle.style.transform = 'scale(1.2)'; + appCircle.style.boxShadow = '0 0 20px rgba(255, 255, 255, 0.5)'; + + setTimeout(() => { + appCircle.style.transform = ''; + appCircle.style.boxShadow = ''; + }, 300); + } +} + +function showFirehoseToast(action, collection) { + const toast = document.getElementById('firehoseToast'); + const actionEl = toast.querySelector('.firehose-toast-action'); + const collectionEl = toast.querySelector('.firehose-toast-collection'); + + const actionText = { + 'create': 'created', + 'update': 'updated', + 'delete': 'deleted' + }[action] || action; + + actionEl.textContent = `${actionText} record`; + collectionEl.textContent = collection; + + toast.classList.add('visible'); + setTimeout(() => { + toast.classList.remove('visible'); + }, 3000); +} + +function getParticleColor(action) { + const colors = { + 'create': '#4ade80', // green + 'update': '#60a5fa', // blue + 'delete': '#f87171' // red + }; + return colors[action] || '#a0a0a0'; +} + +function createFirehoseParticle(event) { + // Get identity circle position + const identity = document.querySelector('.identity'); + if (!identity) return; + + const identityRect = identity.getBoundingClientRect(); + const startX = identityRect.left + identityRect.width / 2; + const startY = identityRect.top + identityRect.height / 2; + + // Get target app circle position + const appCircle = document.querySelector(`[data-namespace="${event.namespace}"]`); + if (!appCircle) return; + + const appRect = appCircle.getBoundingClientRect(); + const endX = appRect.left + appRect.width / 2; + const endY = appRect.top + appRect.height / 2; + + // Create particle + const particle = new FirehoseParticle( + startX, startY, + endX, endY, + getParticleColor(event.action), + { + action: event.action, + collection: event.collection, + namespace: event.namespace + } + ); + + firehoseParticles.push(particle); +} + +function connectFirehose() { + if (!did || firehoseEventSource) return; + + const url = `/api/firehose/watch?did=${encodeURIComponent(did)}`; + console.log('Connecting to firehose:', url); + + firehoseEventSource = new EventSource(url); + + const watchBtn = document.getElementById('watchLiveBtn'); + const watchLabel = watchBtn.querySelector('.watch-label'); + + firehoseEventSource.onopen = () => { + console.log('Firehose connected'); + watchLabel.textContent = 'watching...'; + watchBtn.classList.add('active'); + }; + + firehoseEventSource.onmessage = (e) => { + try { + const data = JSON.parse(e.data); + + // Skip connection message + if (data.type === 'connected') { + console.log('Firehose connection established'); + return; + } + + console.log('Firehose event:', data); + + // Create particle animation + createFirehoseParticle(data); + + // Show toast notification + showFirehoseToast(data.action, data.collection); + } catch (error) { + console.error('Error processing firehose message:', error); + } + }; + + firehoseEventSource.onerror = (error) => { + console.error('Firehose error:', error); + watchLabel.textContent = 'connection error'; + + // Attempt to reconnect after delay + if (isWatchingLive) { + setTimeout(() => { + if (firehoseEventSource) { + firehoseEventSource.close(); + firehoseEventSource = null; + } + if (isWatchingLive) { + watchLabel.textContent = 'reconnecting...'; + connectFirehose(); + } + }, 3000); + } + }; +} + +function disconnectFirehose() { + if (firehoseEventSource) { + firehoseEventSource.close(); + firehoseEventSource = null; + } + + if (firehoseAnimationId) { + cancelAnimationFrame(firehoseAnimationId); + firehoseAnimationId = null; + } + + firehoseParticles = []; + if (firehoseCtx) { + firehoseCtx.clearRect(0, 0, firehoseCanvas.width, firehoseCanvas.height); + } +} + +// Toggle watch live +document.addEventListener('DOMContentLoaded', () => { + const watchBtn = document.getElementById('watchLiveBtn'); + if (!watchBtn) return; + + const watchLabel = watchBtn.querySelector('.watch-label'); + + watchBtn.addEventListener('click', () => { + isWatchingLive = !isWatchingLive; + + if (isWatchingLive) { + // Start watching + watchLabel.textContent = 'connecting...'; + initFirehoseCanvas(); + connectFirehose(); + animateFirehoseParticles(); + } else { + // Stop watching + watchLabel.textContent = 'watch live'; + watchBtn.classList.remove('active'); + disconnectFirehose(); + + // Clean up canvas + if (firehoseCanvas) { + firehoseCanvas.remove(); + firehoseCanvas = null; + firehoseCtx = null; + } + } + }); +}); -- 2.43.0 From eb58414081b30d8a878032711d7d00ccee7c7364 Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Sun, 12 Oct 2025 22:50:17 -0500 Subject: [PATCH] feat: add app-agnostic record links to firehose toasts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add clickable "view record" links to firehose toast notifications that open raw record JSON from user's PDS. Also fix particle animation direction to flow from apps to PDS (correctly showing data writes). Changes: - Add /api/record endpoint to fetch individual records - Refactor firehose to use DID-specific connections via manager - Add toast link element with hover styling - Fetch record details for richer toast messages - Reverse particle flow direction (app → PDS) - Add "Your PDS" label to identity circle 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- src/firehose.rs | 90 ++++++++++++++++++++------ src/main.rs | 7 ++- src/routes.rs | 57 ++++++++++++++--- src/templates.rs | 28 +++++++++ static/app.js | 161 ++++++++++++++++++++++++++++++++++++----------- 5 files changed, 278 insertions(+), 65 deletions(-) diff --git a/src/firehose.rs b/src/firehose.rs index 3bb9483..732f6ca 100644 --- a/src/firehose.rs +++ b/src/firehose.rs @@ -27,6 +27,9 @@ pub struct FirehoseEvent { /// Broadcaster for firehose events pub type FirehoseBroadcaster = Arc>; +/// Manager for DID-specific firehose connections +pub type FirehoseManager = Arc>>; + /// A generic ingester that broadcasts all events struct BroadcastIngester { broadcaster: FirehoseBroadcaster, @@ -59,43 +62,93 @@ impl LexiconIngestor for BroadcastIngester { action: action.to_string(), collection: commit.collection.clone(), rkey: commit.rkey.clone(), - namespace, + namespace: namespace.clone(), }; + info!( + "Received event: {} {} {} (namespace: {})", + action, message.did, commit.collection, namespace + ); + // Broadcast the event (ignore if no receivers) - let _ = self.broadcaster.send(firehose_event); + match self.broadcaster.send(firehose_event) { + Ok(receivers) => { + info!("Broadcast to {} receivers", receivers); + } + Err(_) => { + // No receivers, that's ok + } + } Ok(()) } } -/// Start the Jetstream ingester that broadcasts events to all listeners -pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { +/// Create a new FirehoseManager +pub fn create_firehose_manager() -> FirehoseManager { + Arc::new(Mutex::new(HashMap::new())) +} + +/// Get or create a firehose broadcaster for a specific DID +pub async fn get_or_create_broadcaster( + manager: &FirehoseManager, + did: String, +) -> FirehoseBroadcaster { + // Check if we already have a broadcaster for this DID + { + let broadcasters = manager.lock().unwrap(); + if let Some(broadcaster) = broadcasters.get(&did) { + info!("Reusing existing firehose connection for DID: {}", did); + return broadcaster.clone(); + } + } + + info!("Creating new firehose connection for DID: {}", did); + // Create a broadcast channel with a buffer of 100 events let (tx, _rx) = broadcast::channel::(100); let broadcaster = Arc::new(tx); + // Store in manager + { + let mut broadcasters = manager.lock().unwrap(); + broadcasters.insert(did.clone(), broadcaster.clone()); + } + + // Clone for the spawn let broadcaster_clone = broadcaster.clone(); + let did_clone = did.clone(); tokio::spawn(async move { loop { - info!("Starting Jetstream connection..."); + info!("Starting Jetstream connection for DID: {}...", did_clone); - // Configure Jetstream to receive all events (no collection filter) - let opts = JetstreamOptions::builder().build(); + // Configure Jetstream to receive events ONLY for this DID + let opts = JetstreamOptions::builder() + .wanted_dids(vec![did_clone.clone()]) + .build(); let jetstream = JetstreamConnection::new(opts); - // Create ingesters - we use a wildcard to capture all collections let mut ingesters: HashMap> = HashMap::new(); - // Use "*" as a catch-all for all collections - ingesters.insert( - "*".to_string(), - Box::new(BroadcastIngester { - broadcaster: broadcaster_clone.clone(), - }), - ); + // Register ingesters for common Bluesky collections + let collections = vec![ + "app.bsky.feed.post", + "app.bsky.feed.like", + "app.bsky.feed.repost", + "app.bsky.graph.follow", + "app.bsky.actor.profile", + ]; + + for collection in collections { + ingesters.insert( + collection.to_string(), + Box::new(BroadcastIngester { + broadcaster: broadcaster_clone.clone(), + }), + ); + } // Get channels let msg_rx = jetstream.get_msg_rx(); @@ -105,8 +158,9 @@ pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { let cursor: Arc>> = Arc::new(Mutex::new(None)); let c_cursor = cursor.clone(); - // Spawn task to process messages + // Spawn task to process messages using proper handler tokio::spawn(async move { + info!("Starting message processing loop for DID-filtered connection"); while let Ok(message) = msg_rx.recv_async().await { if let Err(e) = rocketman::handler::handle_message( message, @@ -125,7 +179,7 @@ pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { let failed = { let connect_result = jetstream.connect(cursor).await; if let Err(e) = connect_result { - error!("Jetstream connection failed: {}", e); + error!("Jetstream connection failed for DID {}: {}", did_clone, e); true } else { false @@ -137,7 +191,7 @@ pub async fn start_firehose_broadcaster() -> FirehoseBroadcaster { continue; } - info!("Jetstream connection dropped, reconnecting in 5 seconds..."); + info!("Jetstream connection dropped for DID: {}, reconnecting in 5 seconds...", did_clone); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } }); diff --git a/src/main.rs b/src/main.rs index 9abf5e0..bd86d97 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,8 @@ async fn main() -> std::io::Result<()> { let client = oauth::create_oauth_client(); - // Start the firehose broadcaster - let firehose_broadcaster = firehose::start_firehose_broadcaster().await; + // Create the firehose manager (connections created lazily per-DID) + let firehose_manager = firehose::create_firehose_manager(); println!("starting server at http://localhost:8080"); @@ -35,7 +35,7 @@ async fn main() -> std::io::Result<()> { .build(), ) .app_data(web::Data::new(client.clone())) - .app_data(web::Data::new(firehose_broadcaster.clone())) + .app_data(web::Data::new(firehose_manager.clone())) .service(routes::index) .service(routes::login) .service(routes::callback) @@ -46,6 +46,7 @@ async fn main() -> std::io::Result<()> { .service(routes::init) .service(routes::get_avatar) .service(routes::validate_url) + .service(routes::get_record) .service(routes::firehose_watch) .service(routes::favicon) .service(Files::new("/static", "./static")) diff --git a/src/routes.rs b/src/routes.rs index fa1cb59..381cab9 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,7 +3,7 @@ use actix_web::{get, post, web, HttpResponse, Responder}; use atrium_oauth::{AuthorizeOptions, CallbackParams, KnownScope, Scope}; use serde::Deserialize; -use crate::firehose::FirehoseBroadcaster; +use crate::firehose::FirehoseManager; use crate::mst; use crate::oauth::OAuthClientType; use crate::templates; @@ -392,6 +392,42 @@ pub async fn validate_url(query: web::Query) -> HttpResponse { })) } +#[derive(Deserialize)] +pub struct RecordQuery { + pds: String, + did: String, + collection: String, + rkey: String, +} + +#[get("/api/record")] +pub async fn get_record(query: web::Query) -> HttpResponse { + let record_url = format!( + "{}/xrpc/com.atproto.repo.getRecord?repo={}&collection={}&rkey={}", + query.pds, query.did, query.collection, query.rkey + ); + + match reqwest::get(&record_url).await { + Ok(response) => { + if !response.status().is_success() { + return HttpResponse::Ok().json(serde_json::json!({ + "error": "record not found" + })); + } + + match response.json::().await { + Ok(data) => HttpResponse::Ok().json(data), + Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ + "error": format!("failed to parse record: {}", e) + })), + } + } + Err(e) => HttpResponse::InternalServerError().json(serde_json::json!({ + "error": format!("failed to fetch record: {}", e) + })), + } +} + #[derive(Deserialize)] pub struct FirehoseQuery { did: String, @@ -400,24 +436,29 @@ pub struct FirehoseQuery { #[get("/api/firehose/watch")] pub async fn firehose_watch( query: web::Query, - broadcaster: web::Data, + manager: web::Data, ) -> HttpResponse { let did = query.did.clone(); + + // Get or create a broadcaster for this DID + let broadcaster = crate::firehose::get_or_create_broadcaster(&manager, did.clone()).await; let mut rx = broadcaster.subscribe(); + log::info!("SSE connection established for DID: {}", did); + let stream = async_stream::stream! { // Send initial connection message yield Ok::<_, actix_web::Error>( web::Bytes::from(format!("data: {{\"type\":\"connected\"}}\n\n")) ); - // Stream firehose events filtered by DID + log::info!("Sent initial connection message to client"); + + // Stream firehose events (already filtered by DID at Jetstream level) while let Ok(event) = rx.recv().await { - // Only send events for this DID - if event.did == did { - let json = serde_json::to_string(&event).unwrap_or_default(); - yield Ok(web::Bytes::from(format!("data: {}\n\n", json))); - } + log::info!("Sending event to client: {} {} {}", event.action, event.did, event.collection); + let json = serde_json::to_string(&event).unwrap_or_default(); + yield Ok(web::Bytes::from(format!("data: {}\n\n", json))); } }; diff --git a/src/templates.rs b/src/templates.rs index 66d1601..b9cd0f2 100644 --- a/src/templates.rs +++ b/src/templates.rs @@ -469,6 +469,15 @@ pub fn app_page(did: &str) -> String { letter-spacing: 0.05em; }} + .identity-pds-label {{ + position: absolute; + bottom: clamp(-1.5rem, -3vmin, -2rem); + font-size: clamp(0.55rem, 1.1vmin, 0.65rem); + color: var(--text-light); + letter-spacing: 0.05em; + font-weight: 500; + }} + .identity-avatar {{ width: clamp(30px, 6vmin, 45px); height: clamp(30px, 6vmin, 45px); @@ -1249,6 +1258,7 @@ pub fn app_page(did: &str) -> String { .firehose-toast.visible {{ opacity: 1; transform: translateY(0); + pointer-events: auto; }} .firehose-toast-action {{ @@ -1262,6 +1272,22 @@ pub fn app_page(did: &str) -> String { margin-top: 0.25rem; }} + .firehose-toast-link {{ + display: inline-block; + color: var(--text-light); + font-size: 0.6rem; + margin-top: 0.5rem; + text-decoration: none; + border-bottom: 1px solid transparent; + transition: all 0.2s ease; + pointer-events: auto; + }} + + .firehose-toast-link:hover {{ + color: var(--text); + border-bottom-color: var(--text); + }} + @media (max-width: 768px) {{ .watch-live-btn {{ right: clamp(1rem, 2vmin, 1.5rem); @@ -1288,6 +1314,7 @@ pub fn app_page(did: &str) -> String {
@@ -1310,6 +1337,7 @@ pub fn app_page(did: &str) -> String {
@
loading...
tap for details
+
Your PDS
loading...
diff --git a/static/app.js b/static/app.js index 996c64a..ac53c62 100644 --- a/static/app.js +++ b/static/app.js @@ -189,7 +189,7 @@ fetch(`/api/init?did=${encodeURIComponent(did)}`) let html = `

${namespace}

-
records stored in your pds:
+
records stored in your PDS:
`; if (collections && collections.length > 0) { @@ -831,8 +831,8 @@ function animateFirehoseParticles() { if (alive) { particle.draw(firehoseCtx); } else { - // Particle reached destination - pulse the app circle - pulseAppCircle(particle.metadata.namespace); + // Particle reached destination - pulse the identity/PDS + pulseIdentity(); } return alive; }); @@ -842,38 +842,114 @@ function animateFirehoseParticles() { } } -function pulseAppCircle(namespace) { - const appCircle = document.querySelector(`[data-namespace="${namespace}"]`); - if (appCircle) { - appCircle.style.transition = 'all 0.3s ease'; - appCircle.style.transform = 'scale(1.2)'; - appCircle.style.boxShadow = '0 0 20px rgba(255, 255, 255, 0.5)'; - +function pulseIdentity() { + const identity = document.querySelector('.identity'); + if (identity) { + identity.style.transition = 'all 0.3s ease'; + identity.style.transform = 'scale(1.15)'; + identity.style.boxShadow = '0 0 25px rgba(255, 255, 255, 0.6)'; + setTimeout(() => { - appCircle.style.transform = ''; - appCircle.style.boxShadow = ''; + identity.style.transform = ''; + identity.style.boxShadow = ''; }, 300); } } -function showFirehoseToast(action, collection) { - const toast = document.getElementById('firehoseToast'); - const actionEl = toast.querySelector('.firehose-toast-action'); - const collectionEl = toast.querySelector('.firehose-toast-collection'); +async function fetchRecordDetails(pds, did, collection, rkey) { + try { + const response = await fetch( + `/api/record?pds=${encodeURIComponent(pds)}&did=${encodeURIComponent(did)}&collection=${encodeURIComponent(collection)}&rkey=${encodeURIComponent(rkey)}` + ); + const data = await response.json(); + if (data.error) return null; + return data.value; + } catch (e) { + console.error('Error fetching record:', e); + return null; + } +} +function formatToastMessage(action, collection, record) { const actionText = { 'create': 'created', 'update': 'updated', 'delete': 'deleted' }[action] || action; - actionEl.textContent = `${actionText} record`; - collectionEl.textContent = collection; + // If we don't have record details, fall back to basic message + if (!record) { + return { + action: `${actionText} record`, + details: collection + }; + } + + // Format based on collection type + if (collection === 'app.bsky.feed.post') { + const text = record.text || ''; + const preview = text.length > 50 ? text.substring(0, 50) + '...' : text; + return { + action: `${actionText} post`, + details: preview || 'no text' + }; + } else if (collection === 'app.bsky.feed.like') { + return { + action: `${actionText} like`, + details: '' + }; + } else if (collection === 'app.bsky.feed.repost') { + return { + action: `${actionText} repost`, + details: '' + }; + } else if (collection === 'app.bsky.graph.follow') { + return { + action: `${actionText} follow`, + details: '' + }; + } else if (collection === 'app.bsky.actor.profile') { + const displayName = record.displayName || ''; + return { + action: `${actionText} profile`, + details: displayName || 'updated profile' + }; + } + + // Default for unknown collections + return { + action: `${actionText} record`, + details: collection + }; +} + +async function showFirehoseToast(event) { + const toast = document.getElementById('firehoseToast'); + const actionEl = toast.querySelector('.firehose-toast-action'); + const collectionEl = toast.querySelector('.firehose-toast-collection'); + const linkEl = document.getElementById('firehoseToastLink'); + + // Build PDS link for the record + if (globalPds && event.did && event.collection && event.rkey) { + const recordUrl = `${globalPds}/xrpc/com.atproto.repo.getRecord?repo=${encodeURIComponent(event.did)}&collection=${encodeURIComponent(event.collection)}&rkey=${encodeURIComponent(event.rkey)}`; + linkEl.href = recordUrl; + } + + // Fetch record details if available (skip for deletes) + let record = null; + if (event.action !== 'delete' && event.rkey && globalPds) { + record = await fetchRecordDetails(globalPds, event.did, event.collection, event.rkey); + } + + const formatted = formatToastMessage(event.action, event.collection, record); + + actionEl.textContent = formatted.action; + collectionEl.textContent = formatted.details; toast.classList.add('visible'); setTimeout(() => { toast.classList.remove('visible'); - }, 3000); + }, 4000); // Slightly longer to read details } function getParticleColor(action) { @@ -886,23 +962,23 @@ function getParticleColor(action) { } function createFirehoseParticle(event) { - // Get identity circle position - const identity = document.querySelector('.identity'); - if (!identity) return; - - const identityRect = identity.getBoundingClientRect(); - const startX = identityRect.left + identityRect.width / 2; - const startY = identityRect.top + identityRect.height / 2; - - // Get target app circle position + // Get source app circle position (where the action happened) const appCircle = document.querySelector(`[data-namespace="${event.namespace}"]`); if (!appCircle) return; const appRect = appCircle.getBoundingClientRect(); - const endX = appRect.left + appRect.width / 2; - const endY = appRect.top + appRect.height / 2; + const startX = appRect.left + appRect.width / 2; + const startY = appRect.top + appRect.height / 2; + + // Get target identity/PDS position (where data is written) + const identity = document.querySelector('.identity'); + if (!identity) return; + + const identityRect = identity.getBoundingClientRect(); + const endX = identityRect.left + identityRect.width / 2; + const endY = identityRect.top + identityRect.height / 2; - // Create particle + // Create particle (flows from app TO PDS) const particle = new FirehoseParticle( startX, startY, endX, endY, @@ -918,10 +994,14 @@ function createFirehoseParticle(event) { } function connectFirehose() { - if (!did || firehoseEventSource) return; + console.log('[Firehose] connectFirehose called, did =', did, 'existing connection?', !!firehoseEventSource); + if (!did || firehoseEventSource) { + console.warn('[Firehose] Exiting early - did:', did, 'firehoseEventSource:', firehoseEventSource); + return; + } const url = `/api/firehose/watch?did=${encodeURIComponent(did)}`; - console.log('Connecting to firehose:', url); + console.log('[Firehose] Connecting to:', url); firehoseEventSource = new EventSource(url); @@ -948,9 +1028,9 @@ function connectFirehose() { // Create particle animation createFirehoseParticle(data); - + // Show toast notification - showFirehoseToast(data.action, data.collection); + showFirehoseToast(data); } catch (error) { console.error('Error processing firehose message:', error); } @@ -995,22 +1075,31 @@ function disconnectFirehose() { // Toggle watch live document.addEventListener('DOMContentLoaded', () => { + console.log('[Firehose] DOMContentLoaded fired, setting up watch button'); const watchBtn = document.getElementById('watchLiveBtn'); - if (!watchBtn) return; + if (!watchBtn) { + console.error('[Firehose] Watch button not found!'); + return; + } + console.log('[Firehose] Watch button found, attaching click handler'); const watchLabel = watchBtn.querySelector('.watch-label'); watchBtn.addEventListener('click', () => { + console.log('[Firehose] Watch button clicked! isWatchingLive was:', isWatchingLive); isWatchingLive = !isWatchingLive; + console.log('[Firehose] isWatchingLive now:', isWatchingLive); if (isWatchingLive) { // Start watching + console.log('[Firehose] Starting watch mode'); watchLabel.textContent = 'connecting...'; initFirehoseCanvas(); connectFirehose(); animateFirehoseParticles(); } else { // Stop watching + console.log('[Firehose] Stopping watch mode'); watchLabel.textContent = 'watch live'; watchBtn.classList.remove('active'); disconnectFirehose(); -- 2.43.0