From f2120ffc885d59bceded642db5a772b5324e4108 Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Mon, 22 Sep 2025 09:45:43 +0300 Subject: [PATCH] feat: implement Discord webhook integration for scrobble events --- Cargo.lock | 18 ++- crates/jetstream/Cargo.toml | 2 + crates/jetstream/src/lib.rs | 19 ++- crates/jetstream/src/main.rs | 16 +- crates/jetstream/src/repo.rs | 61 ++++++++ crates/jetstream/src/subscriber.rs | 14 +- crates/jetstream/src/webhook/discord/mod.rs | 54 +++++++ crates/jetstream/src/webhook/discord/model.rs | 77 ++++++++++ crates/jetstream/src/webhook/mod.rs | 1 + crates/jetstream/src/webhook_worker.rs | 139 ++++++++++++++++++ 10 files changed, 383 insertions(+), 18 deletions(-) create mode 100644 crates/jetstream/src/webhook/discord/mod.rs create mode 100644 crates/jetstream/src/webhook/discord/model.rs create mode 100644 crates/jetstream/src/webhook/mod.rs create mode 100644 crates/jetstream/src/webhook_worker.rs diff --git a/Cargo.lock b/Cargo.lock index ac1f99a..e3a5af0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1509,9 +1509,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.4.0" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" dependencies = [ "powerfmt", "serde", @@ -4931,11 +4931,13 @@ dependencies = [ "dotenv", "futures-util", "owo-colors", + "redis 0.29.5", "reqwest", "serde", "serde_json", "sha256", "sqlx", + "time", "tokio", "tokio-stream", "tokio-tungstenite", @@ -6440,9 +6442,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.41" +version = "0.3.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", "itoa", @@ -6455,15 +6457,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.4" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" [[package]] name = "time-macros" -version = "0.2.22" +version = "0.2.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" dependencies = [ "num-conv", "time-core", diff --git a/crates/jetstream/Cargo.toml b/crates/jetstream/Cargo.toml index 3c74a4e..3b54511 100644 --- a/crates/jetstream/Cargo.toml +++ b/crates/jetstream/Cargo.toml @@ -36,3 +36,5 @@ reqwest = { version = "0.12.12", features = [ "json", ], default-features = false } sha256 = "1.6.0" +time = { version = "0.3.44", features = ["formatting", "macros"] } +redis = { version = "0.29.0", features = ["aio", "tokio-comp"] } diff --git a/crates/jetstream/src/lib.rs b/crates/jetstream/src/lib.rs index 56e29d4..a6bf767 100644 --- a/crates/jetstream/src/lib.rs +++ b/crates/jetstream/src/lib.rs @@ -1,15 +1,28 @@ use anyhow::Error; -use std::env; - +use std::{env, sync::Arc}; use subscriber::ScrobbleSubscriber; +use tokio::sync::Mutex; + +use crate::webhook_worker::{start_worker, AppState}; pub mod profile; pub mod repo; pub mod subscriber; pub mod types; +pub mod webhook; +pub mod webhook_worker; pub mod xata; pub async fn subscribe() -> Result<(), Error> { + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let redis = redis::Client::open(redis_url)?; + let queue_key = + env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string()); + + let state = Arc::new(Mutex::new(AppState { redis, queue_key })); + + start_worker(state.clone()).await?; + let jetstream_server = env::var("JETSTREAM_SERVER") .unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network".to_string()); let url = format!( @@ -18,7 +31,7 @@ pub async fn subscribe() -> Result<(), Error> { ); let subscriber = ScrobbleSubscriber::new(&url); - subscriber.run().await?; + subscriber.run(state).await?; Ok(()) } diff --git a/crates/jetstream/src/main.rs b/crates/jetstream/src/main.rs index efdb0da..d4c5aee 100644 --- a/crates/jetstream/src/main.rs +++ b/crates/jetstream/src/main.rs @@ -1,12 +1,17 @@ -use std::env; +use std::{env, sync::Arc}; use dotenv::dotenv; use subscriber::ScrobbleSubscriber; +use tokio::sync::Mutex; + +use crate::webhook_worker::AppState; pub mod profile; pub mod repo; pub mod subscriber; pub mod types; +pub mod webhook; +pub mod webhook_worker; pub mod xata; #[tokio::main] @@ -20,6 +25,13 @@ async fn main() -> Result<(), anyhow::Error> { ); let subscriber = ScrobbleSubscriber::new(&url); - subscriber.run().await?; + let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string()); + let redis = redis::Client::open(redis_url)?; + let queue_key = + env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string()); + + let state = Arc::new(Mutex::new(AppState { redis, queue_key })); + + subscriber.run(state).await?; Ok(()) } diff --git a/crates/jetstream/src/repo.rs b/crates/jetstream/src/repo.rs index 7410225..4cf07ec 100644 --- a/crates/jetstream/src/repo.rs +++ b/crates/jetstream/src/repo.rs @@ -10,6 +10,11 @@ use crate::{ profile::did_to_profile, subscriber::{ALBUM_NSID, ARTIST_NSID, SCROBBLE_NSID, SONG_NSID}, types::{AlbumRecord, ArtistRecord, Commit, ScrobbleRecord, SongRecord}, + webhook::discord::{ + self, + model::{ScrobbleData, WebhookEnvelope}, + }, + webhook_worker::{push_to_queue, AppState}, xata::{ album::Album, album_track::AlbumTrack, artist::Artist, artist_album::ArtistAlbum, artist_track::ArtistTrack, track::Track, user::User, user_album::UserAlbum, @@ -18,6 +23,7 @@ use crate::{ }; pub async fn save_scrobble( + state: Arc>, pool: Arc>>, did: &str, commit: Commit, @@ -85,6 +91,61 @@ pub async fn save_scrobble( .await?; tx.commit().await?; + + let users: Vec = + sqlx::query_as::<_, User>("SELECT * FROM users WHERE did = $1") + .bind(did) + .fetch_all(&*pool) + .await?; + + if users.is_empty() { + return Err(anyhow::anyhow!( + "User with DID {} not found in database", + did + )); + } + + // Push to webhook queue (Discord) + match push_to_queue( + state, + &WebhookEnvelope { + r#type: "scrobble.created".to_string(), + id: Some(commit.rkey.clone()), + data: ScrobbleData { + user: discord::model::User { + did: did.to_string(), + display_name: users[0].display_name.clone(), + handle: users[0].handle.clone(), + }, + track: discord::model::Track { + title: scrobble_record.title.clone(), + artist: scrobble_record.artist.clone(), + album: scrobble_record.album.clone(), + duration: scrobble_record.duration, + artwork_url: scrobble_record.album_art.clone().map(|x| { + format!( + "https://cdn.bsky.app/img/feed_thumbnail/plain/{}/{}@{}", + did, + x.r#ref.link, + x.mime_type.split('/').last().unwrap_or("jpeg") + ) + }), + spotify_url: scrobble_record.spotify_link.clone(), + tidal_url: scrobble_record.tidal_link.clone(), + youtube_url: scrobble_record.youtube_link.clone(), + }, + played_at: scrobble_record.created_at.clone(), + }, + delivered_at: Some(chrono::Utc::now().to_rfc3339()), + }, + ) + .await + { + Ok(_) => {} + Err(e) => { + eprintln!("Failed to push to webhook queue: {}", e); + } + } } if commit.collection == ARTIST_NSID { diff --git a/crates/jetstream/src/subscriber.rs b/crates/jetstream/src/subscriber.rs index 9698b35..4250c49 100644 --- a/crates/jetstream/src/subscriber.rs +++ b/crates/jetstream/src/subscriber.rs @@ -7,7 +7,7 @@ use sqlx::postgres::PgPoolOptions; use tokio::sync::Mutex; use tokio_tungstenite::{connect_async, tungstenite::Message}; -use crate::{repo::save_scrobble, types::Root}; +use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState}; pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble"; pub const ARTIST_NSID: &str = "app.rocksky.artist"; @@ -28,7 +28,7 @@ impl ScrobbleSubscriber { } } - pub async fn run(&self) -> Result<(), Error> { + pub async fn run(&self, state: Arc>) -> Result<(), Error> { // Get the connection string outside of the task let db_url = env::var("XATA_POSTGRES_URL") .context("Failed to get XATA_POSTGRES_URL environment variable")?; @@ -48,7 +48,7 @@ impl ScrobbleSubscriber { while let Some(msg) = ws_stream.next().await { match msg { Ok(msg) => { - if let Err(e) = handle_message(pool.clone(), msg).await { + if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await { eprintln!("Error handling message: {}", e); } } @@ -63,7 +63,11 @@ impl ScrobbleSubscriber { } } -async fn handle_message(pool: Arc>, msg: Message) -> Result<(), Error> { +async fn handle_message( + state: Arc>, + pool: Arc>, + msg: Message, +) -> Result<(), Error> { tokio::spawn(async move { if let Message::Text(text) = msg { let message: Root = serde_json::from_str(&text)?; @@ -74,7 +78,7 @@ async fn handle_message(pool: Arc>, msg: Message) -> Result< println!("Received message: {:#?}", message); if let Some(commit) = message.commit { - match save_scrobble(pool, &message.did, commit).await { + match save_scrobble(state, pool, &message.did, commit).await { Ok(_) => { println!("Scrobble saved successfully"); } diff --git a/crates/jetstream/src/webhook/discord/mod.rs b/crates/jetstream/src/webhook/discord/mod.rs new file mode 100644 index 0000000..e4f983b --- /dev/null +++ b/crates/jetstream/src/webhook/discord/mod.rs @@ -0,0 +1,54 @@ +pub mod model; + +use crate::webhook::discord::model::*; +use reqwest::Client; + +pub fn embed_from_scrobble(s: &ScrobbleData) -> DiscordEmbed { + let url = s + .track + .spotify_url + .as_ref() + .or(s.track.tidal_url.as_ref()) + .or(s.track.youtube_url.as_ref()) + .cloned(); + + let mut desc = format!("**{}**\nby {}", esc(&s.track.title), esc(&s.track.artist)); + desc.push_str(&format!("\non *{}*", esc(&s.track.album))); + + DiscordEmbed { + title: s.user.display_name.clone(), + url, + description: Some(desc), + timestamp: Some(s.played_at.clone()), + thumbnail: s.track.artwork_url.clone().map(|u| DiscordThumb { url: u }), + footer: Some(DiscordFooter { + text: format!("Rocksky • {}", s.user.handle.clone()), + }), + } +} + +pub async fn post_embeds( + http: &Client, + discord_webhook_url: &str, + embeds: Vec, +) -> reqwest::Result<()> { + if discord_webhook_url.is_empty() { + println!("DISCORD_WEBHOOK_URL is not set, skipping webhook post"); + return Ok(()); + } + + let body = DiscordWebhookPayload { + content: String::new(), + embeds, + }; + let res = http.post(discord_webhook_url).json(&body).send().await?; + if !res.status().is_success() { + let text = res.text().await.unwrap_or_default(); + eprintln!("Failed to post to Discord webhook: {}", text); + } + Ok(()) +} + +fn esc(s: &str) -> String { + s.replace(['*', '_', '~', '`', '>'], "\\$0") +} diff --git a/crates/jetstream/src/webhook/discord/model.rs b/crates/jetstream/src/webhook/discord/model.rs new file mode 100644 index 0000000..9bf3ded --- /dev/null +++ b/crates/jetstream/src/webhook/discord/model.rs @@ -0,0 +1,77 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct WebhookEnvelope { + #[serde(default)] + pub r#type: String, + #[serde(default)] + pub id: Option, + #[serde(default)] + pub delivered_at: Option, + pub data: ScrobbleData, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct ScrobbleData { + pub user: User, + pub track: Track, + pub played_at: String, // ISO 8601 +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct User { + pub did: String, + pub display_name: String, + pub handle: String, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct Track { + pub title: String, + pub artist: String, + pub album: String, + pub duration: i32, + #[serde(default)] + pub artwork_url: Option, + #[serde(default)] + pub spotify_url: Option, + #[serde(default)] + pub tidal_url: Option, + #[serde(default)] + pub youtube_url: Option, +} + +/* ---------- Discord payloads ---------- */ + +#[derive(Debug, Serialize)] +pub struct DiscordWebhookPayload { + #[serde(default)] + pub content: String, + #[serde(default)] + pub embeds: Vec, +} + +#[derive(Debug, Serialize)] +pub struct DiscordEmbed { + pub title: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub url: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub thumbnail: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub footer: Option, +} + +#[derive(Debug, Serialize)] +pub struct DiscordThumb { + pub url: String, +} + +#[derive(Debug, Serialize)] +pub struct DiscordFooter { + pub text: String, +} diff --git a/crates/jetstream/src/webhook/mod.rs b/crates/jetstream/src/webhook/mod.rs new file mode 100644 index 0000000..3228a62 --- /dev/null +++ b/crates/jetstream/src/webhook/mod.rs @@ -0,0 +1 @@ +pub mod discord; diff --git a/crates/jetstream/src/webhook_worker.rs b/crates/jetstream/src/webhook_worker.rs new file mode 100644 index 0000000..2835396 --- /dev/null +++ b/crates/jetstream/src/webhook_worker.rs @@ -0,0 +1,139 @@ +use crate::webhook::discord::{self, model::WebhookEnvelope}; +use anyhow::Error; +use std::{ + env, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::{sync::Mutex, time::interval}; + +#[derive(Clone)] +pub struct AppState { + pub redis: redis::Client, + pub queue_key: String, +} + +pub async fn start_worker(state: Arc>) -> Result<(), Error> { + let max_rps: u32 = env::var("MAX_REQUESTS_PER_SEC") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(5); + let max_embeds_per: usize = env::var("MAX_EMBEDS_PER_REQUEST") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(10); + let batch_window_ms: u64 = env::var("BATCH_WINDOW_MS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(400); + let discord_webhook_url = env::var("DISCORD_WEBHOOK_URL").unwrap_or(String::new()); + + tokio::spawn(run_worker( + state.clone(), + discord_webhook_url, + max_rps, + max_embeds_per, + Duration::from_millis(batch_window_ms), + )); + + Ok(()) +} + +async fn run_worker( + st: Arc>, + discord_webhook_url: String, + max_rps: u32, + max_embeds_per: usize, + batch_window: Duration, +) { + let http = reqwest::Client::builder() + .user_agent("rocksky-discord-bridge/0.1") + .build() + .expect("http client"); + + let mut tokens = max_rps as i32; + let mut refill = interval(Duration::from_secs(1)); + refill.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = refill.tick() => { + tokens = (tokens + max_rps as i32).min(max_rps as i32); + } + _ = tokio::time::sleep(Duration::from_millis(10)) => { /* tick */ } + } + + if tokens <= 0 { + continue; + } + + let start = Instant::now(); + let mut embeds = Vec::with_capacity(max_embeds_per); + + while embeds.len() < max_embeds_per && start.elapsed() < batch_window { + match brpop_once(st.clone(), 1).await { + Ok(Some(json_str)) => { + if let Ok(env) = serde_json::from_str::(&json_str) { + embeds.push(discord::embed_from_scrobble(&env.data)); + } + } + Ok(None) => break, + Err(e) => { + eprintln!("Failed to pop from Redis: {}", e); + break; + } + } + } + + if embeds.is_empty() { + tokio::time::sleep(Duration::from_millis(50)).await; + continue; + } + + tokens -= 1; + + if let Err(e) = discord::post_embeds(&http, &discord_webhook_url, embeds).await { + eprintln!("Failed to post to Discord webhook: {}", e); + } + } +} + +async fn brpop_once( + state: Arc>, + timeout_secs: u64, +) -> redis::RedisResult> { + let AppState { + redis: client, + queue_key: key, + } = &*state.lock().await; + let mut conn = client.get_multiplexed_async_connection().await?; + let res: Option<(String, String)> = redis::cmd("BRPOP") + .arg(key) + .arg(timeout_secs as usize) + .query_async(&mut conn) + .await?; + Ok(res.map(|(_, v)| v)) +} + +pub async fn push_to_queue( + state: Arc>, + item: &WebhookEnvelope, +) -> redis::RedisResult<()> { + let payload = serde_json::to_string(item).unwrap(); + let AppState { + redis: client, + queue_key: key, + } = &*state.lock().await; + let mut conn = client.get_multiplexed_async_connection().await?; + let _: () = redis::pipe() + .cmd("RPUSH") + .arg(key) + .arg(payload) + .ignore() + .cmd("EXPIRE") + .arg(key) + .arg(60 * 60 * 24) // 24h + .query_async(&mut conn) + .await?; + Ok(()) +} -- 2.43.0 From 52b51438f838194403eb3bd64f7606ae2d3d0a7b Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Mon, 22 Sep 2025 09:55:57 +0300 Subject: [PATCH] fix: update Discord webhook integration to use non-optional IDs and improve embed generation --- crates/jetstream/src/repo.rs | 2 +- crates/jetstream/src/webhook/discord/mod.rs | 10 ++-------- crates/jetstream/src/webhook/discord/model.rs | 6 ++---- crates/jetstream/src/webhook_worker.rs | 2 +- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/crates/jetstream/src/repo.rs b/crates/jetstream/src/repo.rs index 4cf07ec..37baa89 100644 --- a/crates/jetstream/src/repo.rs +++ b/crates/jetstream/src/repo.rs @@ -110,7 +110,7 @@ pub async fn save_scrobble( state, &WebhookEnvelope { r#type: "scrobble.created".to_string(), - id: Some(commit.rkey.clone()), + id: commit.rkey.clone(), data: ScrobbleData { user: discord::model::User { did: did.to_string(), diff --git a/crates/jetstream/src/webhook/discord/mod.rs b/crates/jetstream/src/webhook/discord/mod.rs index e4f983b..5ba67b1 100644 --- a/crates/jetstream/src/webhook/discord/mod.rs +++ b/crates/jetstream/src/webhook/discord/mod.rs @@ -3,14 +3,8 @@ pub mod model; use crate::webhook::discord::model::*; use reqwest::Client; -pub fn embed_from_scrobble(s: &ScrobbleData) -> DiscordEmbed { - let url = s - .track - .spotify_url - .as_ref() - .or(s.track.tidal_url.as_ref()) - .or(s.track.youtube_url.as_ref()) - .cloned(); +pub fn embed_from_scrobble(s: &ScrobbleData, rkey: &str) -> DiscordEmbed { + let url = format!("https://rocksky.app/{}/scrobble/{}", s.user.did, rkey); let mut desc = format!("**{}**\nby {}", esc(&s.track.title), esc(&s.track.artist)); desc.push_str(&format!("\non *{}*", esc(&s.track.album))); diff --git a/crates/jetstream/src/webhook/discord/model.rs b/crates/jetstream/src/webhook/discord/model.rs index 9bf3ded..7678835 100644 --- a/crates/jetstream/src/webhook/discord/model.rs +++ b/crates/jetstream/src/webhook/discord/model.rs @@ -4,8 +4,7 @@ use serde::{Deserialize, Serialize}; pub struct WebhookEnvelope { #[serde(default)] pub r#type: String, - #[serde(default)] - pub id: Option, + pub id: String, #[serde(default)] pub delivered_at: Option, pub data: ScrobbleData, @@ -54,8 +53,7 @@ pub struct DiscordWebhookPayload { #[derive(Debug, Serialize)] pub struct DiscordEmbed { pub title: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub url: Option, + pub url: String, #[serde(skip_serializing_if = "Option::is_none")] pub description: Option, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/crates/jetstream/src/webhook_worker.rs b/crates/jetstream/src/webhook_worker.rs index 2835396..0c072b0 100644 --- a/crates/jetstream/src/webhook_worker.rs +++ b/crates/jetstream/src/webhook_worker.rs @@ -74,7 +74,7 @@ async fn run_worker( match brpop_once(st.clone(), 1).await { Ok(Some(json_str)) => { if let Ok(env) = serde_json::from_str::(&json_str) { - embeds.push(discord::embed_from_scrobble(&env.data)); + embeds.push(discord::embed_from_scrobble(&env.data, &env.id)); } } Ok(None) => break, -- 2.43.0