fix(spotify): refactor run function and improve track change detection logic #6

merged
opened by tsiry-sandratraina.com targeting main from fix/spotify
Changed files
+182 -332
crates
spotify
src
+182 -332
crates/spotify/src/lib.rs
···
+
use anyhow::Error;
+
use async_nats::connect;
+
use owo_colors::OwoColorize;
+
use reqwest::Client;
+
use serde::{Deserialize, Serialize};
+
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use std::{
collections::HashMap,
env,
sync::{atomic::AtomicBool, Arc, Mutex},
thread,
+
time::{SystemTime, UNIX_EPOCH},
};
-
-
use anyhow::Error;
-
use async_nats::connect;
-
use owo_colors::OwoColorize;
-
use reqwest::Client;
-
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use tokio_stream::StreamExt;
use crate::{
···
pub mod types;
pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
+
pub const MAX_USERS: usize = 100;
+
+
#[derive(Serialize, Deserialize, Debug, Clone)]
+
struct TrackState {
+
track_id: String,
+
progress_ms: u64,
+
scrobbled: bool,
+
last_updated: u64,
+
}
pub async fn run() -> Result<(), Error> {
let cache = Cache::new()?;
···
let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
-
let users = find_spotify_users(&pool, 0, 100).await?;
+
let users = find_spotify_users(&pool, 0, MAX_USERS).await?;
println!("Found {} users", users.len().bright_green());
-
// Shared HashMap to manage threads and their stop flags
let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
Arc::new(Mutex::new(HashMap::new()));
-
// Start threads for all users
for user in users {
let email = user.0.clone();
let token = user.1.clone();
···
email.bright_green(),
e.to_string().bright_red()
);
-
// If there's an error, publish a message to restart the thread
match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
Ok(_) => {
···
});
}
-
// Handle subscription messages
while let Some(message) = sub.next().await {
let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
println!(
···
let mut thread_map = thread_map.lock().unwrap();
-
// Check if the user exists in the thread map
if let Some(stop_flag) = thread_map.get(&user_id) {
-
// Stop the existing thread
stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
-
// Create a new stop flag and restart the thread
let new_stop_flag = Arc::new(AtomicBool::new(false));
thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
let user = find_spotify_user(&pool, &user_id).await?;
-
if user.is_none() {
println!(
"Spotify user not found: {}, skipping",
···
}
let user = user.unwrap();
-
let email = user.0.clone();
let token = user.1.clone();
let did = user.2.clone();
···
}
}
});
-
println!("Restarted thread for user: {}", user_id.bright_green());
} else {
println!(
···
let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
let client = Client::new();
-
let response = client
.post("https://accounts.spotify.com/api/token")
.basic_auth(&client_id, Some(client_secret))
···
])
.send()
.await?;
+
let token = response.json::<AccessToken>().await?;
Ok(token)
}
···
user_id: &str,
token: &str,
) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
+
// Check if we have cached data
if let Ok(Some(data)) = cache.get(user_id) {
println!(
"{} {}",
···
if data == "No content" {
return Ok(None);
}
-
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
+
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
if decoded_data.is_err() {
println!(
"{} {} {}",
···
);
cache.setex(user_id, "No content", 10)?;
cache.del(&format!("{}:current", user_id))?;
+
cache.del(&format!("{}:track_state", user_id))?;
return Ok(None);
}
let data: CurrentlyPlaying = decoded_data.unwrap();
-
// detect if the song has changed
-
let previous = cache.get(&format!("{}:previous", user_id));
-
if previous.is_err() {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
previous.unwrap_err().to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
-
let previous = previous.unwrap();
-
-
let changed = match previous {
-
Some(previous) => {
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
-
println!(
-
"{} {} {}",
-
format!("[{}]", user_id).bright_green(),
-
"Previous cache is invalid",
-
previous
-
);
-
return Ok(None);
-
}
-
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
-
if previous.item.is_none() && data.item.is_some() {
-
return Ok(Some((data, true)));
-
}
-
-
if previous.item.is_some() && data.item.is_none() {
-
return Ok(Some((data, false)));
-
}
-
-
if previous.item.is_none() && data.item.is_none() {
-
return Ok(Some((data, false)));
-
}
-
-
let previous_item = previous.item.unwrap();
-
let data_item = data.clone().item.unwrap();
-
previous_item.id != data_item.id
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
-
}
-
_ => true,
-
};
+
let changed = detect_track_change(&cache, user_id, &data)?;
return Ok(Some((data, changed)));
}
···
if status == 429 {
println!(
-
"{} Too many requests, retry-after {}",
+
"{} Too many requests, retry-after {}",
format!("[{}]", user_id).bright_green(),
headers
.get("retry-after")
···
return Ok(None);
}
-
let previous = cache.get(&format!("{}:previous", user_id));
-
if previous.is_err() {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
previous.unwrap_err().to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
-
let previous = previous.unwrap();
-
-
// check if status code is 204
if status == 204 {
println!("No content");
-
match cache.setex(
-
user_id,
-
"No content",
-
match previous.is_none() {
-
true => 30,
-
false => 10,
-
},
-
) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
-
match cache.del(&format!("{}:current", user_id)) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
+
// Clear track state when nothing is playing
+
cache.del(&format!("{}:track_state", user_id))?;
+
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
+
30
+
} else {
+
10
+
};
+
+
cache.setex(user_id, "No content", ttl)?;
+
cache.del(&format!("{}:current", user_id))?;
return Ok(None);
}
···
"Invalid data received".red(),
data
);
-
match cache.setex(user_id, "No content", 10) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
-
match cache.del(&format!("{}:current", user_id)) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
+
cache.setex(user_id, "No content", 10)?;
+
cache.del(&format!("{}:current", user_id))?;
+
cache.del(&format!("{}:track_state", user_id))?;
return Ok(None);
}
-
let data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
-
-
match cache.setex(
-
user_id,
-
&serde_json::to_string(&data)?,
-
match previous.is_none() {
-
true => 30,
-
false => 15,
-
},
-
) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
-
match cache.del(&format!("{}:current", user_id)) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
+
let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
-
// detect if the song has changed
-
let previous = cache.get(&format!("{}:previous", user_id));
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
+
30
+
} else {
+
15
+
};
-
if previous.is_err() {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
previous.unwrap_err().to_string().bright_red()
-
);
-
return Ok(None);
-
}
+
cache.setex(
+
user_id,
+
&serde_json::to_string(&currently_playing_data)?,
+
ttl,
+
)?;
+
cache.del(&format!("{}:current", user_id))?;
-
let previous = previous.unwrap();
-
let changed = match previous {
-
Some(previous) => {
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
-
println!(
-
"{} {} {}",
-
format!("[{}]", user_id).bright_green(),
-
"Previous cache is invalid",
-
previous
-
);
-
return Ok(None);
-
}
+
// Detect track change and update track state
+
let changed = detect_track_change(&cache, user_id, &currently_playing_data)?;
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
-
if previous.item.is_none() || data.item.is_none() {
-
return Ok(Some((data, false)));
-
}
+
// Update previous song cache
+
cache.setex(
+
&format!("{}:previous", user_id),
+
&serde_json::to_string(&currently_playing_data)?,
+
600,
+
)?;
-
let previous_item = previous.item.unwrap();
-
let data_item = data.clone().item.unwrap();
+
Ok(Some((currently_playing_data, changed)))
+
}
-
previous_item.id != data_item.id
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
+
fn detect_track_change(
+
cache: &Cache,
+
user_id: &str,
+
current: &CurrentlyPlaying,
+
) -> Result<bool, Error> {
+
let track_state_key = format!("{}:track_state", user_id);
+
+
let now = SystemTime::now()
+
.duration_since(UNIX_EPOCH)
+
.unwrap()
+
.as_secs();
+
+
let current_item = match &current.item {
+
Some(item) => item,
+
None => {
+
let _ = cache.del(&track_state_key);
+
return Ok(false);
}
-
_ => false,
};
-
// save as previous song
-
match cache.setex(
-
&format!("{}:previous", user_id),
-
&serde_json::to_string(&data)?,
-
600,
-
) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", user_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
+
let previous_state = cache.get(&track_state_key)?;
+
+
let changed = match previous_state {
+
Some(state_str) => {
+
if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) {
+
if prev_state.track_id != current_item.id {
+
true
+
} else {
+
// Same track - check if we should scrobble based on progress and time
+
let progress_diff =
+
current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64;
+
let time_diff = now - prev_state.last_updated;
+
+
// Only consider it changed if:
+
// 1. We haven't scrobbled this track yet
+
// 2. Significant progress was made (more than 10 seconds or reasonable time passed)
+
// 3. Track is actually playing
+
!prev_state.scrobbled
+
&& current.is_playing
+
&& (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0))
+
}
+
} else {
+
// Invalid previous state, treat as changed
+
true
+
}
}
-
}
+
None => {
+
// No previous state, treat as new track
+
current.is_playing
+
}
+
};
+
+
let new_state = TrackState {
+
track_id: current_item.id.clone(),
+
progress_ms: current.progress_ms.unwrap_or(0),
+
scrobbled: changed, // Mark as scrobbled if we're reporting a change
+
last_updated: now,
+
};
-
Ok(Some((data, changed)))
+
cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?;
+
+
Ok(changed)
}
pub async fn get_artist(
···
return Ok(None);
}
-
match cache.setex(artist_id, &data, 20) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", artist_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
-
+
cache.setex(artist_id, &data, 20)?;
Ok(Some(serde_json::from_str(&data)?))
}
···
return Ok(None);
}
-
match cache.setex(album_id, &data, 20) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", album_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
return Ok(None);
-
}
-
}
-
+
cache.setex(album_id, &data, 20)?;
Ok(Some(serde_json::from_str(&data)?))
}
···
let headers = response.headers().clone();
let data = response.text().await?;
+
if data == "Too many requests" {
println!(
"> retry-after {}",
···
}
let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
-
if album_tracks.items.is_empty() {
break;
}
···
}
let all_tracks_json = serde_json::to_string(&all_tracks)?;
-
match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", album_id).bright_green(),
-
e.to_string().bright_red()
-
);
-
}
-
}
+
cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?;
Ok(AlbumTracks {
items: all_tracks,
···
) -> Result<Vec<(String, String, String, String)>, Error> {
let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
r#"
-
SELECT * FROM spotify_tokens
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
-
LIMIT $1 OFFSET $2
-
"#,
+
SELECT * FROM spotify_tokens
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
+
LIMIT $1 OFFSET $2
+
"#,
)
.bind(limit as i64)
.bind(offset as i64)
···
.await?;
let mut user_tokens = vec![];
-
for result in &results {
let token = decrypt_aes_256_ctr(
&result.refresh_token,
···
) -> Result<Option<(String, String, String)>, Error> {
let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
r#"
-
SELECT * FROM spotify_tokens
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
-
WHERE spotify_accounts.email = $1
-
"#,
+
SELECT * FROM spotify_tokens
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
+
WHERE spotify_accounts.email = $1
+
"#,
)
.bind(email)
.fetch_all(pool)
···
"Checking currently playing".cyan()
);
-
let stop_flag_clone = stop_flag.clone();
-
let spotify_email_clone = spotify_email.clone();
-
let cache_clone = cache.clone();
-
thread::spawn(move || {
-
loop {
-
if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
-
println!(
-
"{} Stopping Thread",
-
format!("[{}]", spotify_email_clone).bright_green()
-
);
-
break;
-
}
-
if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) {
-
if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() {
-
thread::sleep(std::time::Duration::from_millis(800));
-
continue;
-
}
-
-
let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?;
-
-
if let Some(item) = current_song.item.clone() {
-
if current_song.is_playing
-
&& current_song.progress_ms.unwrap_or(0) < item.duration_ms.into()
-
{
-
current_song.progress_ms =
-
Some(current_song.progress_ms.unwrap_or(0) + 800);
-
match cache_clone.setex(
-
&format!("{}:current", spotify_email_clone),
-
&serde_json::to_string(&current_song)?,
-
16,
-
) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", spotify_email_clone).bright_green(),
-
e.to_string().bright_red()
-
);
-
}
-
}
-
thread::sleep(std::time::Duration::from_millis(800));
-
continue;
-
}
-
}
-
continue;
-
}
-
-
if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) {
-
if cached == "No content" {
-
thread::sleep(std::time::Duration::from_millis(800));
-
continue;
-
}
-
match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) {
-
Ok(_) => {}
-
Err(e) => {
-
println!(
-
"{} redis error: {}",
-
format!("[{}]", spotify_email_clone).bright_green(),
-
e.to_string().bright_red()
-
);
-
}
-
}
-
}
-
-
thread::sleep(std::time::Duration::from_millis(800));
-
}
-
Ok::<(), Error>(())
-
});
+
// Remove the separate progress tracking thread - it was causing race conditions
+
// and unnecessary complexity
loop {
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
···
);
break;
}
+
let spotify_email = spotify_email.clone();
let token = token.clone();
let did = did.clone();
···
format!("[{}]", spotify_email).bright_green(),
e.to_string().bright_red()
);
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
continue;
}
};
···
format!("[{}]", spotify_email).bright_green(),
"No song playing".yellow()
);
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
continue;
}
+
let data_item = data.item.unwrap();
println!(
"{} {} is_playing: {} changed: {}",
···
changed
);
-
if changed {
-
scrobble(cache.clone(), &spotify_email, &did, &token).await?;
+
// Only scrobble if there's a genuine track change and the track is playing
+
if changed && data.is_playing {
+
// Add a small delay to prevent rapid duplicate scrobbles
+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
+
+
match scrobble(cache.clone(), &spotify_email, &did, &token).await {
+
Ok(_) => {
+
println!(
+
"{} {}",
+
format!("[{}]", spotify_email).bright_green(),
+
"Scrobbled successfully".green()
+
);
+
}
+
Err(e) => {
+
println!(
+
"{} Scrobble failed: {}",
+
format!("[{}]", spotify_email).bright_green(),
+
e.to_string().bright_red()
+
);
+
}
+
}
+
+
// Spawn background task for library updates
+
let cache_clone = cache.clone();
+
let token_clone = token.clone();
+
let spotify_email_clone = spotify_email.clone();
+
let did_clone = did.clone();
+
let album_id = data_item.album.id.clone();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
match rt.block_on(async {
-
get_album_tracks(cache.clone(), &data_item.album.id, &token).await?;
-
get_album(cache.clone(), &data_item.album.id, &token).await?;
-
update_library(cache.clone(), &spotify_email, &did, &token).await?;
+
get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?;
+
get_album(cache_clone.clone(), &album_id, &token_clone).await?;
+
update_library(
+
cache_clone.clone(),
+
&spotify_email_clone,
+
&did_clone,
+
&token_clone,
+
)
+
.await?;
Ok::<(), Error>(())
}) {
-
Ok(_) => {}
+
Ok(_) => {
+
println!(
+
"{} Library updated successfully",
+
format!("[{}]", spotify_email_clone).bright_green()
+
);
+
}
Err(e) => {
println!(
-
"{} {}",
-
format!("[{}]", spotify_email).bright_green(),
+
"{} Library update failed: {}",
+
format!("[{}]", spotify_email_clone).bright_green(),
e.to_string().bright_red()
);
}
···
}
}
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
}
Ok(())