feat: Enhance MusicBrainz integration and improve data handling #10

merged
opened by tsiry-sandratraina.com targeting main from feat/musicbrainz
Changed files
+789 -87
crates
scrobbler
webscrobbler
+57 -4
Cargo.lock
···
"windows-sys 0.59.0",
+
[[package]]
+
name = "nanoid"
+
version = "0.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "3ffa00dec017b5b1a8b7cf5e2c008bfda1aa7e0697ac1508b491fdf2622fb4d8"
+
dependencies = [
+
"rand 0.8.5",
+
]
+
[[package]]
name = "nkeys"
version = "0.4.4"
···
[[package]]
name = "regex"
-
version = "1.11.1"
+
version = "1.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
+
checksum = "8b5288124840bee7b386bc413c487869b360b2b4ec421ea56425128692f2a82c"
dependencies = [
"aho-corasick",
"memchr",
···
[[package]]
name = "regex-automata"
-
version = "0.4.9"
+
version = "0.4.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
+
checksum = "833eb9ce86d40ef33cb1306d8accf7bc8ec2bfea4355cbdebb3df68b40925cad"
dependencies = [
"aho-corasick",
"memchr",
···
"hex",
"jsonwebtoken",
"md5",
+
"nanoid",
"owo-colors",
"quick-xml 0.37.5",
"rand 0.9.2",
···
"reqwest",
"serde",
"serde_json",
+
"serial_test",
"sqlx",
"tokio",
"tokio-stream",
···
"hex",
"jsonwebtoken",
"md5",
+
"nanoid",
"owo-colors",
"rand 0.9.2",
"redis 0.29.5",
"reqwest",
"serde",
"serde_json",
+
"serial_test",
"sqlx",
"tokio",
"tokio-stream",
···
"winapi-util",
+
[[package]]
+
name = "scc"
+
version = "2.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "46e6f046b7fef48e2660c57ed794263155d713de679057f2d0c169bfc6e756cc"
+
dependencies = [
+
"sdd",
+
]
+
[[package]]
name = "schannel"
version = "0.1.27"
···
"untrusted",
+
[[package]]
+
name = "sdd"
+
version = "3.0.10"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "490dcfcbfef26be6800d11870ff2df8774fa6e86d047e3e8c8a76b25655e41ca"
+
[[package]]
name = "seahash"
version = "4.1.0"
···
"serde",
+
[[package]]
+
name = "serial_test"
+
version = "3.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9"
+
dependencies = [
+
"futures",
+
"log",
+
"once_cell",
+
"parking_lot",
+
"scc",
+
"serial_test_derive",
+
]
+
+
[[package]]
+
name = "serial_test_derive"
+
version = "3.2.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef"
+
dependencies = [
+
"proc-macro2",
+
"quote",
+
"syn 2.0.101",
+
]
+
[[package]]
name = "sha1"
version = "0.10.6"
+5 -1
crates/scrobbler/Cargo.toml
···
dotenv = "0.15.0"
anyhow = "1.0.96"
actix-web = "4.9.0"
-
redis = "0.29.0"
+
redis = { version = "0.29.0", features = ["tokio-comp"] }
hex = "0.4.3"
jsonwebtoken = "9.3.1"
md5 = "0.7.0"
···
actix-session = "0.10.1"
tokio-stream = { version = "0.1.17", features = ["full"] }
tracing = "0.1.41"
+
nanoid = "0.4.0"
+
+
[dev-dependencies]
+
serial_test = "3.0.0"
+15 -5
crates/scrobbler/src/handlers/mod.rs
···
use v1::submission::submission;
use crate::cache::Cache;
+
use crate::musicbrainz::client::MusicbrainzClient;
use crate::BANNER;
pub mod scrobble;
···
pub async fn handle_submission(
data: web::Data<Arc<Pool<Postgres>>>,
cache: web::Data<Cache>,
+
mb_client: web::Data<Arc<MusicbrainzClient>>,
form: web::Form<BTreeMap<String, String>>,
) -> impl Responder {
-
submission(form.into_inner(), cache.get_ref(), data.get_ref())
-
.await
-
.map_err(actix_web::error::ErrorInternalServerError)
+
submission(
+
form.into_inner(),
+
cache.get_ref(),
+
data.get_ref(),
+
mb_client.get_ref(),
+
)
+
.await
+
.map_err(actix_web::error::ErrorInternalServerError)
}
#[get("/2.0")]
···
data: web::Data<Arc<Pool<Postgres>>>,
cache: web::Data<Cache>,
form: web::Form<BTreeMap<String, String>>,
+
mb_client: web::Data<Arc<MusicbrainzClient>>,
) -> impl Responder {
let conn = data.get_ref();
let cache = cache.get_ref();
+
let mb_client = mb_client.get_ref();
let method = form.get("method").unwrap_or(&"".to_string()).to_string();
-
call_method(&method, conn, cache, form.into_inner())
+
call_method(&method, conn, cache, mb_client, form.into_inner())
.await
.map_err(actix_web::error::ErrorInternalServerError)
}
···
method: &str,
pool: &Arc<Pool<Postgres>>,
cache: &Cache,
+
mb_client: &Arc<MusicbrainzClient>,
form: BTreeMap<String, String>,
) -> Result<HttpResponse, Error> {
match method {
-
"track.scrobble" => handle_scrobble(form, pool, cache).await,
+
"track.scrobble" => handle_scrobble(form, pool, cache, mb_client).await,
_ => Err(Error::msg(format!("Unsupported method: {}", method))),
}
}
+4 -3
crates/scrobbler/src/handlers/scrobble.rs
···
use std::collections::BTreeMap;
use crate::{
-
auth::authenticate, cache::Cache, params::validate_scrobble_params, response::build_response,
-
scrobbler::scrobble,
+
auth::authenticate, cache::Cache, musicbrainz::client::MusicbrainzClient,
+
params::validate_scrobble_params, response::build_response, scrobbler::scrobble,
};
pub async fn handle_scrobble(
form: BTreeMap<String, String>,
conn: &Pool<sqlx::Postgres>,
cache: &Cache,
+
mb_client: &MusicbrainzClient,
) -> Result<HttpResponse, Error> {
let params = match validate_scrobble_params(&form, &["api_key", "api_sig", "sk", "method"]) {
Ok(params) => params,
···
})));
}
-
match scrobble(&conn, cache, &form).await {
+
match scrobble(&conn, cache, mb_client, &form).await {
Ok(scrobbles) => Ok(HttpResponse::Ok().json(build_response(scrobbles))),
Err(e) => {
if e.to_string().contains("Timestamp") {
+4 -2
crates/scrobbler/src/handlers/v1/submission.rs
···
use std::{collections::BTreeMap, sync::Arc};
use crate::{
-
auth::verify_session_id, cache::Cache, params::validate_required_params, scrobbler::scrobble_v1,
+
auth::verify_session_id, cache::Cache, musicbrainz::client::MusicbrainzClient,
+
params::validate_required_params, scrobbler::scrobble_v1,
};
pub async fn submission(
form: BTreeMap<String, String>,
cache: &Cache,
pool: &Arc<sqlx::Pool<sqlx::Postgres>>,
+
mb_client: &Arc<MusicbrainzClient>,
) -> Result<HttpResponse, Error> {
match validate_required_params(&form, &["s", "a[0]", "t[0]", "i[0]"]) {
Ok(_) => {
···
let user_id = user_id.unwrap();
tracing::info!(artist = %a, track = %t, timestamp = %i, user_id = %user_id, "Submission");
-
match scrobble_v1(pool, cache, &form).await {
+
match scrobble_v1(pool, cache, mb_client, &form).await {
Ok(_) => Ok(HttpResponse::Ok().body("OK\n")),
Err(e) => Ok(HttpResponse::BadRequest().json(json!({
"error": 4,
+5 -1
crates/scrobbler/src/lib.rs
···
use owo_colors::OwoColorize;
use sqlx::postgres::PgPoolOptions;
-
use crate::cache::Cache;
+
use crate::{cache::Cache, musicbrainz::client::MusicbrainzClient};
pub const BANNER: &str = r#"
___ ___ _____ __ __ __
···
.unwrap(),
);
+
let mb_client = MusicbrainzClient::new().await?;
+
let mb_client = Arc::new(mb_client);
+
HttpServer::new(move || {
App::new()
.wrap(RateLimiter::default())
.app_data(limiter.clone())
.app_data(Data::new(conn.clone()))
.app_data(Data::new(cache.clone()))
+
.app_data(Data::new(mb_client.clone()))
.service(handlers::handle_methods)
.service(handlers::handle_nowplaying)
.service(handlers::handle_submission)
+3 -1
crates/scrobbler/src/listenbrainz/core/submit.rs
···
use std::sync::Arc;
use crate::auth::decode_token;
+
use crate::musicbrainz::client::MusicbrainzClient;
use crate::repo;
use crate::{cache::Cache, scrobbler::scrobble_listenbrainz};
···
payload: SubmitListensRequest,
cache: &Cache,
pool: &Arc<sqlx::Pool<sqlx::Postgres>>,
+
mb_client: &Arc<MusicbrainzClient>,
token: &str,
) -> Result<HttpResponse, Error> {
if payload.listen_type != "playing_now" {
···
const RETRIES: usize = 15;
for attempt in 1..=RETRIES {
-
match scrobble_listenbrainz(pool, cache, &payload, token).await {
+
match scrobble_listenbrainz(pool, cache, mb_client, &payload, token).await {
Ok(_) => {
return Ok(HttpResponse::Ok().json(json!({
"status": "ok",
+4 -1
crates/scrobbler/src/listenbrainz/handlers.rs
···
},
types::SubmitListensRequest,
},
+
musicbrainz::client::MusicbrainzClient,
read_payload, repo,
};
use tokio_stream::StreamExt;
···
req: HttpRequest,
data: web::Data<Arc<Pool<Postgres>>>,
cache: web::Data<Cache>,
+
mb_client: web::Data<Arc<MusicbrainzClient>>,
mut payload: web::Payload,
) -> impl Responder {
let token = match req.headers().get("Authorization") {
···
})
.map_err(actix_web::error::ErrorBadRequest)?;
-
submit_listens(req, cache.get_ref(), data.get_ref(), token)
+
let mb_client = mb_client.get_ref();
+
submit_listens(req, cache.get_ref(), data.get_ref(), &mb_client, token)
.await
.map_err(actix_web::error::ErrorInternalServerError)
}
+2 -2
crates/scrobbler/src/musicbrainz/artist.rs
···
use serde::{Deserialize, Serialize};
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Artist {
pub name: String,
#[serde(rename = "sort-name")]
···
pub aliases: Option<Vec<Alias>>,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct ArtistCredit {
pub joinphrase: Option<String>,
pub name: String,
+310 -24
crates/scrobbler/src/musicbrainz/client.rs
···
+
use std::env;
+
use super::recording::{Recording, Recordings};
-
use anyhow::Error;
+
use anyhow::{anyhow, Context, Error};
+
use redis::aio::MultiplexedConnection;
+
use redis::AsyncCommands;
+
use serde::{Deserialize, Serialize};
+
use tokio::time::{timeout, Duration, Instant};
pub const BASE_URL: &str = "https://musicbrainz.org/ws/2";
-
pub const USER_AGENT: &str = "Rocksky/0.1.0";
+
pub const USER_AGENT: &str = "Rocksky/0.1.0 (+https://rocksky.app)";
+
+
const Q_QUEUE: &str = "mb:queue:v1";
+
const CACHE_SEARCH_PREFIX: &str = "mb:cache:search:";
+
const CACHE_REC_PREFIX: &str = "mb:cache:rec:";
+
const INFLIGHT_PREFIX: &str = "mb:inflight:";
+
const RESP_PREFIX: &str = "mb:resp:";
-
pub struct MusicbrainzClient {}
+
const CACHE_TTL_SECS: u64 = 60 * 60 * 24;
+
const WAIT_TIMEOUT_SECS: u64 = 20;
+
const INFLIGHT_TTL_SECS: i64 = 30; // de-dup window while the worker is fetching
+
+
#[derive(Clone)]
+
pub struct MusicbrainzClient {
+
http: reqwest::Client,
+
redis: MultiplexedConnection,
+
cache_ttl: u64,
+
}
+
+
#[derive(Debug, Serialize, Deserialize)]
+
#[serde(tag = "kind", rename_all = "snake_case")]
+
enum Job {
+
Search { id: String, query: String },
+
GetRecording { id: String, mbid: String },
+
}
impl MusicbrainzClient {
-
pub fn new() -> Self {
-
MusicbrainzClient {}
+
pub async fn new() -> Result<Self, Error> {
+
let client =
+
redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))?;
+
let redis = client.get_multiplexed_tokio_connection().await?;
+
let http = reqwest::Client::builder()
+
.user_agent(USER_AGENT)
+
.build()
+
.context("build http client")?;
+
let me = MusicbrainzClient {
+
http,
+
redis,
+
cache_ttl: CACHE_TTL_SECS,
+
};
+
+
let mut worker_conn = client.get_multiplexed_async_connection().await?;
+
+
let http = me.http.clone();
+
tokio::spawn(async move { worker_loop(http, &mut worker_conn).await });
+
+
Ok(me)
}
pub async fn search(&self, query: &str) -> Result<Recordings, Error> {
-
let url = format!("{}/recording", BASE_URL);
-
let client = reqwest::Client::new();
-
let response = client
-
.get(&url)
-
.header("Accept", "application/json")
-
.header("User-Agent", USER_AGENT)
-
.query(&[("query", query), ("inc", "artist-credits+releases")])
-
.send()
+
if let Some(h) = self.get_cache(&cache_key_search(query)).await? {
+
return Ok(serde_json::from_str(&h).context("decode cached search")?);
+
}
+
let id = nanoid::nanoid!();
+
let job = Job::Search {
+
id: id.clone(),
+
query: query.to_string(),
+
};
+
self.enqueue_if_needed(&job, &infl_key_search(query))
.await?;
-
Ok(response.json().await?)
+
let raw = self.wait_for_response(&id).await?;
+
let parsed: Recordings = serde_json::from_str(&raw).context("decode search response")?;
+
+
self.set_cache(&cache_key_search(query), &raw).await?;
+
Ok(parsed)
}
pub async fn get_recording(&self, mbid: &str) -> Result<Recording, Error> {
-
let url = format!("{}/recording/{}", BASE_URL, mbid);
-
let client = reqwest::Client::new();
-
let response = client
-
.get(&url)
-
.header("Accept", "application/json")
-
.header("User-Agent", USER_AGENT)
-
.query(&[("inc", "artist-credits+releases")])
-
.send()
-
.await?;
+
if let Some(h) = self.get_cache(&cache_key_rec(mbid)).await? {
+
return Ok(serde_json::from_str(&h).context("decode cached recording")?);
+
}
+
let id = nanoid::nanoid!();
+
let job = Job::GetRecording {
+
id: id.clone(),
+
mbid: mbid.to_string(),
+
};
+
self.enqueue_if_needed(&job, &infl_key_rec(mbid)).await?;
+
let raw = self.wait_for_response(&id).await?;
+
let parsed: Recording = serde_json::from_str(&raw).context("decode recording response")?;
+
self.set_cache(&cache_key_rec(mbid), &raw).await?;
+
Ok(parsed)
+
}
+
+
// ---------- Redis helpers ----------
+
+
async fn get_cache(&self, key: &str) -> Result<Option<String>, Error> {
+
let mut r = self.redis.clone();
+
let val: Option<String> = r.get(key).await?;
+
Ok(val)
+
}
+
+
async fn set_cache(&self, key: &str, json: &str) -> Result<(), Error> {
+
let mut r = self.redis.clone();
+
let _: () = r
+
.set_ex(key, json, self.cache_ttl)
+
.await
+
.with_context(|| format!("cache set {key}"))?;
+
Ok(())
+
}
+
+
async fn enqueue_if_needed(&self, job: &Job, inflight_key: &str) -> Result<(), Error> {
+
let mut r = self.redis.clone();
+
+
// set NX to avoid duplicate work; short TTL
+
let set: bool = r.set_nx(inflight_key, "1").await.context("set in-flight")?;
+
if set {
+
let _: () = r
+
.expire(inflight_key, INFLIGHT_TTL_SECS)
+
.await
+
.context("expire inflight")?;
+
let payload = serde_json::to_string(job).expect("serialize job");
+
let _: () = r.rpush(Q_QUEUE, payload).await.context("enqueue job")?;
+
}
+
Ok(())
+
}
+
+
async fn wait_for_response(&self, id: &str) -> Result<String, Error> {
+
let mut r = self.redis.clone();
+
let resp_q = resp_key(id);
+
+
let fut = async {
+
loop {
+
let popped: Option<(String, String)> = r.brpop(&resp_q, 2.0).await?;
+
if let Some((_key, json)) = popped {
+
return Ok::<String, Error>(json);
+
}
+
}
+
};
+
+
match timeout(Duration::from_secs(WAIT_TIMEOUT_SECS), fut).await {
+
Ok(res) => res,
+
Err(_) => Err(anyhow!("timed out waiting for MusicBrainz response")),
+
}
+
}
+
}
+
+
async fn worker_loop(
+
http: reqwest::Client,
+
redis: &mut MultiplexedConnection,
+
) -> Result<(), Error> {
+
// pacing ticker: strictly 1 request/second
+
let mut next_allowed = Instant::now();
+
+
loop {
+
tokio::select! {
+
res = async {
+
+
// finite timeout pop
+
let v: Option<Vec<String>> = redis.blpop(Q_QUEUE, 2.0).await.ok();
+
Ok::<_, Error>(v)
+
} => {
+
let Some(mut v) = res? else {
+
continue };
+
if v.len() != 2 {
+
continue; }
+
let payload = v.pop().unwrap();
+
+
// 1 rps pacing
+
let now = Instant::now();
+
if now < next_allowed { tokio::time::sleep(next_allowed - now).await; }
+
next_allowed = Instant::now() + Duration::from_secs(1);
+
+
let payload: Job = match serde_json::from_str(&payload) {
+
Ok(j) => j,
+
Err(e) => {
+
tracing::error!(%e, "invalid job payload");
+
continue;
+
}
+
};
+
if let Err(e) = process_job(&http, redis, payload).await {
+
tracing::error!(%e, "job failed");
+
}
+
}
+
}
+
}
+
}
+
+
async fn process_job(
+
http: &reqwest::Client,
+
redis: &mut MultiplexedConnection,
+
job: Job,
+
) -> Result<(), Error> {
+
match job {
+
Job::Search { id, query } => {
+
let url = format!("{}/recording", BASE_URL);
+
let resp = http
+
.get(&url)
+
.header("Accept", "application/json")
+
.query(&[
+
("query", query.as_str()),
+
("fmt", "json"),
+
("inc", "artists+releases+isrcs"),
+
])
+
.send()
+
.await
+
.context("http search")?;
+
+
if !resp.status().is_success() {
+
// Push an error payload so waiters don鈥檛 hang forever
+
let _ = push_response(
+
redis,
+
&id,
+
&format!(r#"{{"error":"http {}"}}"#, resp.status()),
+
)
+
.await;
+
return Err(anyhow!("musicbrainz search http {}", resp.status()));
+
}
+
+
let text = resp.text().await.context("read body")?;
+
push_response(redis, &id, &text).await?;
+
}
+
Job::GetRecording { id, mbid } => {
+
let url = format!("{}/recording/{}", BASE_URL, mbid);
+
let resp = http
+
.get(&url)
+
.header("Accept", "application/json")
+
.query(&[("fmt", "json"), ("inc", "artists+releases+isrcs")])
+
.send()
+
.await
+
.context("http get_recording")?;
+
+
if !resp.status().is_success() {
+
let _ = push_response(
+
redis,
+
&id,
+
&format!(r#"{{"error":"http {}"}}"#, resp.status()),
+
)
+
.await;
+
return Err(anyhow!("musicbrainz get_recording http {}", resp.status()));
+
}
+
+
let text = resp.text().await.context("read body")?;
+
push_response(redis, &id, &text).await?;
+
}
+
}
+
Ok(())
+
}
+
+
async fn push_response(
+
redis: &mut MultiplexedConnection,
+
id: &str,
+
json: &str,
+
) -> Result<(), Error> {
+
let q = resp_key(id);
+
// RPUSH then EXPIRE to avoid leaks if a client never BRPOPs
+
let _: () = redis.rpush(&q, json).await?;
+
let _: () = redis.expire(&q, WAIT_TIMEOUT_SECS as i64 + 5).await?;
+
Ok(())
+
}
+
+
fn cache_key_search(query: &str) -> String {
+
format!("{}{}", CACHE_SEARCH_PREFIX, fast_hash(query))
+
}
+
fn cache_key_rec(mbid: &str) -> String {
+
format!("{}{}", CACHE_REC_PREFIX, mbid)
+
}
+
fn infl_key_search(query: &str) -> String {
+
format!("{}search:{}", INFLIGHT_PREFIX, fast_hash(query))
+
}
+
fn infl_key_rec(mbid: &str) -> String {
+
format!("{}rec:{}", INFLIGHT_PREFIX, mbid)
+
}
+
fn resp_key(id: &str) -> String {
+
format!("{}{}", RESP_PREFIX, id)
+
}
+
+
fn fast_hash(s: &str) -> u64 {
+
use std::hash::{Hash, Hasher};
+
let mut h = std::collections::hash_map::DefaultHasher::new();
+
s.hash(&mut h);
+
h.finish()
+
}
+
+
#[cfg(test)]
+
mod tests {
+
use super::*;
+
use serial_test::serial;
+
+
#[test]
+
fn test_fast_hash() {
+
let h1 = fast_hash("hello");
+
let h2 = fast_hash("hello");
+
let h3 = fast_hash("world");
+
assert_eq!(h1, h2);
+
assert_ne!(h1, h3);
+
}
+
+
#[test]
+
fn test_cache_keys() {
+
let q = "test query";
+
let mbid = "some-mbid";
+
assert!(cache_key_search(q).starts_with(CACHE_SEARCH_PREFIX));
+
assert!(cache_key_rec(mbid).starts_with(CACHE_REC_PREFIX));
+
assert!(infl_key_search(q).starts_with(INFLIGHT_PREFIX));
+
assert!(infl_key_rec(mbid).starts_with(INFLIGHT_PREFIX));
+
assert!(resp_key("id").starts_with(RESP_PREFIX));
+
}
+
+
#[tokio::test]
+
#[serial]
+
async fn test_musicbrainz_client() -> Result<(), Error> {
+
let client = MusicbrainzClient::new().await?;
+
let query = format!(
+
r#"recording:"{}" AND artist:"{}" AND status:Official"#,
+
"Come As You Are", "Nirvana"
+
);
+
let search_res = client.search(&query).await?;
-
Ok(response.json().await?)
+
assert!(!search_res.recordings.is_empty());
+
let rec = &search_res.recordings[0];
+
let mbid = &rec.id;
+
let rec_res = client.get_recording(mbid).await?;
+
assert_eq!(rec_res.id, *mbid);
+
Ok(())
}
}
+26 -7
crates/scrobbler/src/musicbrainz/release.rs
···
recording::Recording,
};
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Release {
#[serde(rename = "release-events")]
pub release_events: Option<Vec<ReleaseEvent>>,
···
#[serde(rename = "cover-art-archive")]
pub cover_art_archive: Option<CoverArtArchive>,
#[serde(rename = "artist-credit")]
-
pub artist_credit: Vec<ArtistCredit>,
+
pub artist_credit: Option<Vec<ArtistCredit>>,
#[serde(rename = "status-id")]
pub status_id: Option<String>,
#[serde(rename = "label-info")]
···
pub date: Option<String>,
pub country: Option<String>,
pub asin: Option<String>,
+
#[serde(rename = "track-count")]
+
pub track_count: Option<u32>,
+
#[serde(rename = "release-group")]
+
pub release_group: Option<ReleaseGroup>,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CoverArtArchive {
pub back: bool,
pub artwork: bool,
···
pub darkened: bool,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct ReleaseEvent {
pub area: Option<Area>,
pub date: String,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct TextRepresentation {
pub language: Option<String>,
pub script: Option<String>,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Media {
#[serde(rename = "format-id")]
pub format_id: Option<String>,
···
pub title: String,
pub recording: Recording,
#[serde(rename = "artist-credit")]
-
pub artist_credit: Vec<ArtistCredit>,
+
pub artist_credit: Option<Vec<ArtistCredit>>,
pub number: String,
}
+
+
#[derive(Debug, Deserialize, Clone, Default)]
+
pub struct ReleaseGroup {
+
pub id: String,
+
pub title: String,
+
#[serde(rename = "primary-type")]
+
pub primary_type: Option<String>,
+
#[serde(rename = "secondary-types")]
+
pub secondary_types: Option<Vec<String>>,
+
pub disambiguation: Option<String>,
+
#[serde(rename = "first-release-date")]
+
pub first_release_date: Option<String>,
+
#[serde(rename = "artist-credit")]
+
pub artist_credit: Option<Vec<ArtistCredit>>,
+
}
+4
crates/webscrobbler/Cargo.toml
···
actix-session = "0.10.1"
actix-limitation = "0.5.1"
tracing = "0.1.41"
+
nanoid = "0.4.0"
+
+
[dev-dependencies]
+
serial_test = "3.0.0"
+7 -2
crates/webscrobbler/src/handlers.rs
···
-
use crate::{cache::Cache, consts::BANNER, repo, scrobbler::scrobble, types::ScrobbleRequest};
+
use crate::{
+
cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient, repo,
+
scrobbler::scrobble, types::ScrobbleRequest,
+
};
use actix_web::{get, post, web, HttpRequest, HttpResponse, Responder};
use owo_colors::OwoColorize;
use sqlx::{Pool, Postgres};
···
async fn handle_scrobble(
data: web::Data<Arc<Pool<Postgres>>>,
cache: web::Data<Cache>,
+
mb_client: web::Data<Arc<MusicbrainzClient>>,
mut payload: web::Payload,
req: HttpRequest,
) -> Result<impl Responder, actix_web::Error> {
···
}
}
-
scrobble(&pool, &cache, params, &user.did)
+
let mb_client = mb_client.get_ref().as_ref();
+
scrobble(&pool, &cache, mb_client, params, &user.did)
.await
.map_err(|err| {
actix_web::error::ErrorInternalServerError(format!("Failed to scrobble: {}", err))
+5 -1
crates/webscrobbler/src/lib.rs
···
use owo_colors::OwoColorize;
use sqlx::postgres::PgPoolOptions;
-
use crate::{cache::Cache, consts::BANNER};
+
use crate::{cache::Cache, consts::BANNER, musicbrainz::client::MusicbrainzClient};
pub mod auth;
pub mod cache;
···
let conn = Arc::new(pool);
+
let mb_client = MusicbrainzClient::new().await?;
+
let mb_client = Arc::new(mb_client);
+
let host = env::var("WEBSCROBBLER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
let port = env::var("WEBSCROBBLER_PORT")
.unwrap_or_else(|_| "7883".to_string())
···
.app_data(limiter.clone())
.app_data(Data::new(conn.clone()))
.app_data(Data::new(cache.clone()))
+
.app_data(Data::new(mb_client.clone()))
.service(handlers::index)
.service(handlers::handle_scrobble)
})
+2 -2
crates/webscrobbler/src/musicbrainz/artist.rs
···
use serde::{Deserialize, Serialize};
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Artist {
pub name: String,
#[serde(rename = "sort-name")]
···
pub aliases: Option<Vec<Alias>>,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct ArtistCredit {
pub joinphrase: Option<String>,
pub name: String,
+310 -24
crates/webscrobbler/src/musicbrainz/client.rs
···
+
use std::env;
+
use super::recording::{Recording, Recordings};
-
use anyhow::Error;
+
use anyhow::{anyhow, Context, Error};
+
use redis::aio::MultiplexedConnection;
+
use redis::AsyncCommands;
+
use serde::{Deserialize, Serialize};
+
use tokio::time::{timeout, Duration, Instant};
pub const BASE_URL: &str = "https://musicbrainz.org/ws/2";
-
pub const USER_AGENT: &str = "Rocksky/0.1.0";
+
pub const USER_AGENT: &str = "Rocksky/0.1.0 (+https://rocksky.app)";
+
+
const Q_QUEUE: &str = "mb:queue:v1";
+
const CACHE_SEARCH_PREFIX: &str = "mb:cache:search:";
+
const CACHE_REC_PREFIX: &str = "mb:cache:rec:";
+
const INFLIGHT_PREFIX: &str = "mb:inflight:";
+
const RESP_PREFIX: &str = "mb:resp:";
-
pub struct MusicbrainzClient {}
+
const CACHE_TTL_SECS: u64 = 60 * 60 * 24;
+
const WAIT_TIMEOUT_SECS: u64 = 20;
+
const INFLIGHT_TTL_SECS: i64 = 30; // de-dup window while the worker is fetching
+
+
#[derive(Clone)]
+
pub struct MusicbrainzClient {
+
http: reqwest::Client,
+
redis: MultiplexedConnection,
+
cache_ttl: u64,
+
}
+
+
#[derive(Debug, Serialize, Deserialize)]
+
#[serde(tag = "kind", rename_all = "snake_case")]
+
enum Job {
+
Search { id: String, query: String },
+
GetRecording { id: String, mbid: String },
+
}
impl MusicbrainzClient {
-
pub fn new() -> Self {
-
MusicbrainzClient {}
+
pub async fn new() -> Result<Self, Error> {
+
let client =
+
redis::Client::open(env::var("REDIS_URL").unwrap_or("redis://127.0.0.1".into()))?;
+
let redis = client.get_multiplexed_tokio_connection().await?;
+
let http = reqwest::Client::builder()
+
.user_agent(USER_AGENT)
+
.build()
+
.context("build http client")?;
+
let me = MusicbrainzClient {
+
http,
+
redis,
+
cache_ttl: CACHE_TTL_SECS,
+
};
+
+
let mut worker_conn = client.get_multiplexed_async_connection().await?;
+
+
let http = me.http.clone();
+
tokio::spawn(async move { worker_loop(http, &mut worker_conn).await });
+
+
Ok(me)
}
pub async fn search(&self, query: &str) -> Result<Recordings, Error> {
-
let url = format!("{}/recording", BASE_URL);
-
let client = reqwest::Client::new();
-
let response = client
-
.get(&url)
-
.header("Accept", "application/json")
-
.header("User-Agent", USER_AGENT)
-
.query(&[("query", query), ("inc", "artist-credits+releases")])
-
.send()
+
if let Some(h) = self.get_cache(&cache_key_search(query)).await? {
+
return Ok(serde_json::from_str(&h).context("decode cached search")?);
+
}
+
let id = nanoid::nanoid!();
+
let job = Job::Search {
+
id: id.clone(),
+
query: query.to_string(),
+
};
+
self.enqueue_if_needed(&job, &infl_key_search(query))
.await?;
-
Ok(response.json().await?)
+
let raw = self.wait_for_response(&id).await?;
+
let parsed: Recordings = serde_json::from_str(&raw).context("decode search response")?;
+
+
self.set_cache(&cache_key_search(query), &raw).await?;
+
Ok(parsed)
}
pub async fn get_recording(&self, mbid: &str) -> Result<Recording, Error> {
-
let url = format!("{}/recording/{}", BASE_URL, mbid);
-
let client = reqwest::Client::new();
-
let response = client
-
.get(&url)
-
.header("Accept", "application/json")
-
.header("User-Agent", USER_AGENT)
-
.query(&[("inc", "artist-credits+releases")])
-
.send()
-
.await?;
+
if let Some(h) = self.get_cache(&cache_key_rec(mbid)).await? {
+
return Ok(serde_json::from_str(&h).context("decode cached recording")?);
+
}
+
let id = nanoid::nanoid!();
+
let job = Job::GetRecording {
+
id: id.clone(),
+
mbid: mbid.to_string(),
+
};
+
self.enqueue_if_needed(&job, &infl_key_rec(mbid)).await?;
+
let raw = self.wait_for_response(&id).await?;
+
let parsed: Recording = serde_json::from_str(&raw).context("decode recording response")?;
+
self.set_cache(&cache_key_rec(mbid), &raw).await?;
+
Ok(parsed)
+
}
+
+
// ---------- Redis helpers ----------
+
+
async fn get_cache(&self, key: &str) -> Result<Option<String>, Error> {
+
let mut r = self.redis.clone();
+
let val: Option<String> = r.get(key).await?;
+
Ok(val)
+
}
+
+
async fn set_cache(&self, key: &str, json: &str) -> Result<(), Error> {
+
let mut r = self.redis.clone();
+
let _: () = r
+
.set_ex(key, json, self.cache_ttl)
+
.await
+
.with_context(|| format!("cache set {key}"))?;
+
Ok(())
+
}
+
+
async fn enqueue_if_needed(&self, job: &Job, inflight_key: &str) -> Result<(), Error> {
+
let mut r = self.redis.clone();
+
+
// set NX to avoid duplicate work; short TTL
+
let set: bool = r.set_nx(inflight_key, "1").await.context("set in-flight")?;
+
if set {
+
let _: () = r
+
.expire(inflight_key, INFLIGHT_TTL_SECS)
+
.await
+
.context("expire inflight")?;
+
let payload = serde_json::to_string(job).expect("serialize job");
+
let _: () = r.rpush(Q_QUEUE, payload).await.context("enqueue job")?;
+
}
+
Ok(())
+
}
+
+
async fn wait_for_response(&self, id: &str) -> Result<String, Error> {
+
let mut r = self.redis.clone();
+
let resp_q = resp_key(id);
+
+
let fut = async {
+
loop {
+
let popped: Option<(String, String)> = r.brpop(&resp_q, 2.0).await?;
+
if let Some((_key, json)) = popped {
+
return Ok::<String, Error>(json);
+
}
+
}
+
};
+
+
match timeout(Duration::from_secs(WAIT_TIMEOUT_SECS), fut).await {
+
Ok(res) => res,
+
Err(_) => Err(anyhow!("timed out waiting for MusicBrainz response")),
+
}
+
}
+
}
+
+
async fn worker_loop(
+
http: reqwest::Client,
+
redis: &mut MultiplexedConnection,
+
) -> Result<(), Error> {
+
// pacing ticker: strictly 1 request/second
+
let mut next_allowed = Instant::now();
+
+
loop {
+
tokio::select! {
+
res = async {
+
+
// finite timeout pop
+
let v: Option<Vec<String>> = redis.blpop(Q_QUEUE, 2.0).await.ok();
+
Ok::<_, Error>(v)
+
} => {
+
let Some(mut v) = res? else {
+
continue };
+
if v.len() != 2 {
+
continue; }
+
let payload = v.pop().unwrap();
+
+
// 1 rps pacing
+
let now = Instant::now();
+
if now < next_allowed { tokio::time::sleep(next_allowed - now).await; }
+
next_allowed = Instant::now() + Duration::from_secs(1);
+
+
let payload: Job = match serde_json::from_str(&payload) {
+
Ok(j) => j,
+
Err(e) => {
+
tracing::error!(%e, "invalid job payload");
+
continue;
+
}
+
};
+
if let Err(e) = process_job(&http, redis, payload).await {
+
tracing::error!(%e, "job failed");
+
}
+
}
+
}
+
}
+
}
+
+
async fn process_job(
+
http: &reqwest::Client,
+
redis: &mut MultiplexedConnection,
+
job: Job,
+
) -> Result<(), Error> {
+
match job {
+
Job::Search { id, query } => {
+
let url = format!("{}/recording", BASE_URL);
+
let resp = http
+
.get(&url)
+
.header("Accept", "application/json")
+
.query(&[
+
("query", query.as_str()),
+
("fmt", "json"),
+
("inc", "artists+releases+isrcs"),
+
])
+
.send()
+
.await
+
.context("http search")?;
+
+
if !resp.status().is_success() {
+
// Push an error payload so waiters don鈥檛 hang forever
+
let _ = push_response(
+
redis,
+
&id,
+
&format!(r#"{{"error":"http {}"}}"#, resp.status()),
+
)
+
.await;
+
return Err(anyhow!("musicbrainz search http {}", resp.status()));
+
}
+
+
let text = resp.text().await.context("read body")?;
+
push_response(redis, &id, &text).await?;
+
}
+
Job::GetRecording { id, mbid } => {
+
let url = format!("{}/recording/{}", BASE_URL, mbid);
+
let resp = http
+
.get(&url)
+
.header("Accept", "application/json")
+
.query(&[("fmt", "json"), ("inc", "artists+releases+isrcs")])
+
.send()
+
.await
+
.context("http get_recording")?;
+
+
if !resp.status().is_success() {
+
let _ = push_response(
+
redis,
+
&id,
+
&format!(r#"{{"error":"http {}"}}"#, resp.status()),
+
)
+
.await;
+
return Err(anyhow!("musicbrainz get_recording http {}", resp.status()));
+
}
+
+
let text = resp.text().await.context("read body")?;
+
push_response(redis, &id, &text).await?;
+
}
+
}
+
Ok(())
+
}
+
+
async fn push_response(
+
redis: &mut MultiplexedConnection,
+
id: &str,
+
json: &str,
+
) -> Result<(), Error> {
+
let q = resp_key(id);
+
// RPUSH then EXPIRE to avoid leaks if a client never BRPOPs
+
let _: () = redis.rpush(&q, json).await?;
+
let _: () = redis.expire(&q, WAIT_TIMEOUT_SECS as i64 + 5).await?;
+
Ok(())
+
}
+
+
fn cache_key_search(query: &str) -> String {
+
format!("{}{}", CACHE_SEARCH_PREFIX, fast_hash(query))
+
}
+
fn cache_key_rec(mbid: &str) -> String {
+
format!("{}{}", CACHE_REC_PREFIX, mbid)
+
}
+
fn infl_key_search(query: &str) -> String {
+
format!("{}search:{}", INFLIGHT_PREFIX, fast_hash(query))
+
}
+
fn infl_key_rec(mbid: &str) -> String {
+
format!("{}rec:{}", INFLIGHT_PREFIX, mbid)
+
}
+
fn resp_key(id: &str) -> String {
+
format!("{}{}", RESP_PREFIX, id)
+
}
+
+
fn fast_hash(s: &str) -> u64 {
+
use std::hash::{Hash, Hasher};
+
let mut h = std::collections::hash_map::DefaultHasher::new();
+
s.hash(&mut h);
+
h.finish()
+
}
+
+
#[cfg(test)]
+
mod tests {
+
use super::*;
+
use serial_test::serial;
+
+
#[test]
+
fn test_fast_hash() {
+
let h1 = fast_hash("hello");
+
let h2 = fast_hash("hello");
+
let h3 = fast_hash("world");
+
assert_eq!(h1, h2);
+
assert_ne!(h1, h3);
+
}
+
+
#[test]
+
fn test_cache_keys() {
+
let q = "test query";
+
let mbid = "some-mbid";
+
assert!(cache_key_search(q).starts_with(CACHE_SEARCH_PREFIX));
+
assert!(cache_key_rec(mbid).starts_with(CACHE_REC_PREFIX));
+
assert!(infl_key_search(q).starts_with(INFLIGHT_PREFIX));
+
assert!(infl_key_rec(mbid).starts_with(INFLIGHT_PREFIX));
+
assert!(resp_key("id").starts_with(RESP_PREFIX));
+
}
+
+
#[tokio::test]
+
#[serial]
+
async fn test_musicbrainz_client() -> Result<(), Error> {
+
let client = MusicbrainzClient::new().await?;
+
let query = format!(
+
r#"recording:"{}" AND artist:"{}" AND status:Official"#,
+
"Come As You Are", "Nirvana"
+
);
+
let search_res = client.search(&query).await?;
-
Ok(response.json().await?)
+
assert!(!search_res.recordings.is_empty());
+
let rec = &search_res.recordings[0];
+
let mbid = &rec.id;
+
let rec_res = client.get_recording(mbid).await?;
+
assert_eq!(rec_res.id, *mbid);
+
Ok(())
}
}
+26 -7
crates/webscrobbler/src/musicbrainz/release.rs
···
recording::Recording,
};
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Release {
#[serde(rename = "release-events")]
pub release_events: Option<Vec<ReleaseEvent>>,
···
#[serde(rename = "cover-art-archive")]
pub cover_art_archive: Option<CoverArtArchive>,
#[serde(rename = "artist-credit")]
-
pub artist_credit: Vec<ArtistCredit>,
+
pub artist_credit: Option<Vec<ArtistCredit>>,
#[serde(rename = "status-id")]
pub status_id: Option<String>,
#[serde(rename = "label-info")]
···
pub date: Option<String>,
pub country: Option<String>,
pub asin: Option<String>,
+
#[serde(rename = "track-count")]
+
pub track_count: Option<u32>,
+
#[serde(rename = "release-group")]
+
pub release_group: Option<ReleaseGroup>,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CoverArtArchive {
pub back: bool,
pub artwork: bool,
···
pub darkened: bool,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct ReleaseEvent {
pub area: Option<Area>,
pub date: String,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct TextRepresentation {
pub language: Option<String>,
pub script: Option<String>,
}
-
#[derive(Debug, Deserialize, Clone)]
+
#[derive(Debug, Deserialize, Clone, Default)]
pub struct Media {
#[serde(rename = "format-id")]
pub format_id: Option<String>,
···
pub title: String,
pub recording: Recording,
#[serde(rename = "artist-credit")]
-
pub artist_credit: Vec<ArtistCredit>,
+
pub artist_credit: Option<Vec<ArtistCredit>>,
pub number: String,
}
+
+
#[derive(Debug, Deserialize, Clone, Default)]
+
pub struct ReleaseGroup {
+
pub id: String,
+
pub title: String,
+
#[serde(rename = "primary-type")]
+
pub primary_type: Option<String>,
+
#[serde(rename = "secondary-types")]
+
pub secondary_types: Option<Vec<String>>,
+
pub disambiguation: Option<String>,
+
#[serde(rename = "first-release-date")]
+
pub first_release_date: Option<String>,
+
#[serde(rename = "artist-credit")]
+
pub artist_credit: Option<Vec<ArtistCredit>>,
+
}