feat: implement Discord webhook integration for scrobble events #1

merged
opened by tsiry-sandratraina.com targeting main from feat/discord-webhook
Changed files
+52 -18
crates
jetstream
+10 -8
Cargo.lock
···
[[package]]
name = "deranged"
-
version = "0.4.0"
+
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e"
+
checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc"
dependencies = [
"powerfmt",
"serde",
···
"dotenv",
"futures-util",
"owo-colors",
+
"redis 0.29.5",
"reqwest",
"serde",
"serde_json",
"sha256",
"sqlx",
+
"time",
"tokio",
"tokio-stream",
"tokio-tungstenite",
···
[[package]]
name = "time"
-
version = "0.3.41"
+
version = "0.3.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40"
+
checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d"
dependencies = [
"deranged",
"itoa",
···
[[package]]
name = "time-core"
-
version = "0.1.4"
+
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c"
+
checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b"
[[package]]
name = "time-macros"
-
version = "0.2.22"
+
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49"
+
checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3"
dependencies = [
"num-conv",
"time-core",
+2
crates/jetstream/Cargo.toml
···
"json",
], default-features = false }
sha256 = "1.6.0"
+
time = { version = "0.3.44", features = ["formatting", "macros"] }
+
redis = { version = "0.29.0", features = ["aio", "tokio-comp"] }
+16 -3
crates/jetstream/src/lib.rs
···
use anyhow::Error;
-
use std::env;
-
+
use std::{env, sync::Arc};
use subscriber::ScrobbleSubscriber;
+
use tokio::sync::Mutex;
+
+
use crate::webhook_worker::{start_worker, AppState};
pub mod profile;
pub mod repo;
pub mod subscriber;
pub mod types;
+
pub mod webhook;
+
pub mod webhook_worker;
pub mod xata;
pub async fn subscribe() -> Result<(), Error> {
+
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
+
let redis = redis::Client::open(redis_url)?;
+
let queue_key =
+
env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string());
+
+
let state = Arc::new(Mutex::new(AppState { redis, queue_key }));
+
+
start_worker(state.clone()).await?;
+
let jetstream_server = env::var("JETSTREAM_SERVER")
.unwrap_or_else(|_| "wss://jetstream2.us-west.bsky.network".to_string());
let url = format!(
···
);
let subscriber = ScrobbleSubscriber::new(&url);
-
subscriber.run().await?;
+
subscriber.run(state).await?;
Ok(())
}
+14 -2
crates/jetstream/src/main.rs
···
-
use std::env;
+
use std::{env, sync::Arc};
use dotenv::dotenv;
use subscriber::ScrobbleSubscriber;
+
use tokio::sync::Mutex;
+
+
use crate::webhook_worker::AppState;
pub mod profile;
pub mod repo;
pub mod subscriber;
pub mod types;
+
pub mod webhook;
+
pub mod webhook_worker;
pub mod xata;
#[tokio::main]
···
);
let subscriber = ScrobbleSubscriber::new(&url);
-
subscriber.run().await?;
+
let redis_url = env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
+
let redis = redis::Client::open(redis_url)?;
+
let queue_key =
+
env::var("WEBHOOK_QUEUE_KEY").unwrap_or_else(|_| "rocksky:webhook_queue".to_string());
+
+
let state = Arc::new(Mutex::new(AppState { redis, queue_key }));
+
+
subscriber.run(state).await?;
Ok(())
}
+9 -5
crates/jetstream/src/subscriber.rs
···
use tokio::sync::Mutex;
use tokio_tungstenite::{connect_async, tungstenite::Message};
-
use crate::{repo::save_scrobble, types::Root};
+
use crate::{repo::save_scrobble, types::Root, webhook_worker::AppState};
pub const SCROBBLE_NSID: &str = "app.rocksky.scrobble";
pub const ARTIST_NSID: &str = "app.rocksky.artist";
···
}
}
-
pub async fn run(&self) -> Result<(), Error> {
+
pub async fn run(&self, state: Arc<Mutex<AppState>>) -> Result<(), Error> {
// Get the connection string outside of the task
let db_url = env::var("XATA_POSTGRES_URL")
.context("Failed to get XATA_POSTGRES_URL environment variable")?;
···
while let Some(msg) = ws_stream.next().await {
match msg {
Ok(msg) => {
-
if let Err(e) = handle_message(pool.clone(), msg).await {
+
if let Err(e) = handle_message(state.clone(), pool.clone(), msg).await {
eprintln!("Error handling message: {}", e);
}
}
···
}
}
-
async fn handle_message(pool: Arc<Mutex<sqlx::PgPool>>, msg: Message) -> Result<(), Error> {
+
async fn handle_message(
+
state: Arc<Mutex<AppState>>,
+
pool: Arc<Mutex<sqlx::PgPool>>,
+
msg: Message,
+
) -> Result<(), Error> {
tokio::spawn(async move {
if let Message::Text(text) = msg {
let message: Root = serde_json::from_str(&text)?;
···
println!("Received message: {:#?}", message);
if let Some(commit) = message.commit {
-
match save_scrobble(pool, &message.did, commit).await {
+
match save_scrobble(state, pool, &message.did, commit).await {
Ok(_) => {
println!("Scrobble saved successfully");
}
+1
crates/jetstream/src/webhook/mod.rs
···
+
pub mod discord;