···
2
+
use async_nats::connect;
3
+
use owo_colors::OwoColorize;
5
+
use serde::{Deserialize, Serialize};
6
+
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
sync::{atomic::AtomicBool, Arc, Mutex},
12
+
time::{SystemTime, UNIX_EPOCH},
9
-
use async_nats::connect;
10
-
use owo_colors::OwoColorize;
11
-
use reqwest::Client;
12
-
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
use tokio_stream::StreamExt;
···
pub const BASE_URL: &str = "https://spotify-api.rocksky.app/v1";
35
+
pub const MAX_USERS: usize = 100;
37
+
#[derive(Serialize, Deserialize, Debug, Clone)]
pub async fn run() -> Result<(), Error> {
let cache = Cache::new()?;
···
let mut sub = nc.subscribe("rocksky.spotify.user".to_string()).await?;
println!("Subscribed to {}", "rocksky.spotify.user".bright_green());
49
-
let users = find_spotify_users(&pool, 0, 100).await?;
59
+
let users = find_spotify_users(&pool, 0, MAX_USERS).await?;
println!("Found {} users", users.len().bright_green());
52
-
// Shared HashMap to manage threads and their stop flags
let thread_map: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>> =
Arc::new(Mutex::new(HashMap::new()));
56
-
// Start threads for all users
let email = user.0.clone();
let token = user.1.clone();
···
e.to_string().bright_red()
// If there's an error, publish a message to restart the thread
match rt.block_on(nc.publish("rocksky.spotify.user", email.clone().into())) {
···
109
-
// Handle subscription messages
while let Some(message) = sub.next().await {
let user_id = String::from_utf8(message.payload.to_vec()).unwrap();
···
let mut thread_map = thread_map.lock().unwrap();
119
-
// Check if the user exists in the thread map
if let Some(stop_flag) = thread_map.get(&user_id) {
121
-
// Stop the existing thread
stop_flag.store(true, std::sync::atomic::Ordering::Relaxed);
124
-
// Create a new stop flag and restart the thread
let new_stop_flag = Arc::new(AtomicBool::new(false));
thread_map.insert(user_id.clone(), Arc::clone(&new_stop_flag));
let user = find_spotify_user(&pool, &user_id).await?;
"Spotify user not found: {}, skipping",
···
let user = user.unwrap();
let email = user.0.clone();
let token = user.1.clone();
let did = user.2.clone();
···
println!("Restarted thread for user: {}", user_id.bright_green());
···
let client_secret = env::var("SPOTIFY_CLIENT_SECRET")?;
let client = Client::new();
.post("https://accounts.spotify.com/api/token")
.basic_auth(&client_id, Some(client_secret))
···
let token = response.json::<AccessToken>().await?;
···
) -> Result<Option<(CurrentlyPlaying, bool)>, Error> {
259
+
// Check if we have cached data
if let Ok(Some(data)) = cache.get(user_id) {
···
if data == "No content" {
268
-
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
270
+
let decoded_data = serde_json::from_str::<CurrentlyPlaying>(&data);
if decoded_data.is_err() {
···
cache.setex(user_id, "No content", 10)?;
cache.del(&format!("{}:current", user_id))?;
280
+
cache.del(&format!("{}:track_state", user_id))?;
let data: CurrentlyPlaying = decoded_data.unwrap();
283
-
// detect if the song has changed
284
-
let previous = cache.get(&format!("{}:previous", user_id));
286
-
if previous.is_err() {
288
-
"{} redis error: {}",
289
-
format!("[{}]", user_id).bright_green(),
290
-
previous.unwrap_err().to_string().bright_red()
295
-
let previous = previous.unwrap();
297
-
let changed = match previous {
298
-
Some(previous) => {
299
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
302
-
format!("[{}]", user_id).bright_green(),
303
-
"Previous cache is invalid",
309
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
310
-
if previous.item.is_none() && data.item.is_some() {
311
-
return Ok(Some((data, true)));
314
-
if previous.item.is_some() && data.item.is_none() {
315
-
return Ok(Some((data, false)));
318
-
if previous.item.is_none() && data.item.is_none() {
319
-
return Ok(Some((data, false)));
322
-
let previous_item = previous.item.unwrap();
323
-
let data_item = data.clone().item.unwrap();
324
-
previous_item.id != data_item.id
325
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
286
+
let changed = detect_track_change(&cache, user_id, &data)?;
return Ok(Some((data, changed)));
···
346
-
"{} Too many requests, retry-after {}",
304
+
"{} Too many requests, retry-after {}",
format!("[{}]", user_id).bright_green(),
···
358
-
let previous = cache.get(&format!("{}:previous", user_id));
359
-
if previous.is_err() {
361
-
"{} redis error: {}",
362
-
format!("[{}]", user_id).bright_green(),
363
-
previous.unwrap_err().to_string().bright_red()
368
-
let previous = previous.unwrap();
370
-
// check if status code is 204
376
-
match previous.is_none() {
384
-
"{} redis error: {}",
385
-
format!("[{}]", user_id).bright_green(),
386
-
e.to_string().bright_red()
391
-
match cache.del(&format!("{}:current", user_id)) {
395
-
"{} redis error: {}",
396
-
format!("[{}]", user_id).bright_green(),
397
-
e.to_string().bright_red()
318
+
// Clear track state when nothing is playing
319
+
cache.del(&format!("{}:track_state", user_id))?;
321
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
327
+
cache.setex(user_id, "No content", ttl)?;
328
+
cache.del(&format!("{}:current", user_id))?;
···
"Invalid data received".red(),
412
-
match cache.setex(user_id, "No content", 10) {
416
-
"{} redis error: {}",
417
-
format!("[{}]", user_id).bright_green(),
418
-
e.to_string().bright_red()
423
-
match cache.del(&format!("{}:current", user_id)) {
427
-
"{} redis error: {}",
428
-
format!("[{}]", user_id).bright_green(),
429
-
e.to_string().bright_red()
339
+
cache.setex(user_id, "No content", 10)?;
340
+
cache.del(&format!("{}:current", user_id))?;
341
+
cache.del(&format!("{}:track_state", user_id))?;
437
-
let data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
441
-
&serde_json::to_string(&data)?,
442
-
match previous.is_none() {
450
-
"{} redis error: {}",
451
-
format!("[{}]", user_id).bright_green(),
452
-
e.to_string().bright_red()
457
-
match cache.del(&format!("{}:current", user_id)) {
461
-
"{} redis error: {}",
462
-
format!("[{}]", user_id).bright_green(),
463
-
e.to_string().bright_red()
345
+
let currently_playing_data = serde_json::from_str::<CurrentlyPlaying>(&data)?;
469
-
// detect if the song has changed
470
-
let previous = cache.get(&format!("{}:previous", user_id));
347
+
let ttl = if cache.get(&format!("{}:previous", user_id))?.is_none() {
472
-
if previous.is_err() {
474
-
"{} redis error: {}",
475
-
format!("[{}]", user_id).bright_green(),
476
-
previous.unwrap_err().to_string().bright_red()
355
+
&serde_json::to_string(¤tly_playing_data)?,
358
+
cache.del(&format!("{}:current", user_id))?;
481
-
let previous = previous.unwrap();
482
-
let changed = match previous {
483
-
Some(previous) => {
484
-
if serde_json::from_str::<CurrentlyPlaying>(&previous).is_err() {
487
-
format!("[{}]", user_id).bright_green(),
488
-
"Previous cache is invalid",
360
+
// Detect track change and update track state
361
+
let changed = detect_track_change(&cache, user_id, ¤tly_playing_data)?;
494
-
let previous: CurrentlyPlaying = serde_json::from_str(&previous)?;
495
-
if previous.item.is_none() || data.item.is_none() {
496
-
return Ok(Some((data, false)));
363
+
// Update previous song cache
365
+
&format!("{}:previous", user_id),
366
+
&serde_json::to_string(¤tly_playing_data)?,
499
-
let previous_item = previous.item.unwrap();
500
-
let data_item = data.clone().item.unwrap();
370
+
Ok(Some((currently_playing_data, changed)))
502
-
previous_item.id != data_item.id
503
-
&& previous.progress_ms.unwrap_or(0) != data.progress_ms.unwrap_or(0)
373
+
fn detect_track_change(
376
+
current: &CurrentlyPlaying,
377
+
) -> Result<bool, Error> {
378
+
let track_state_key = format!("{}:track_state", user_id);
380
+
let now = SystemTime::now()
381
+
.duration_since(UNIX_EPOCH)
385
+
let current_item = match ¤t.item {
386
+
Some(item) => item,
388
+
let _ = cache.del(&track_state_key);
508
-
// save as previous song
510
-
&format!("{}:previous", user_id),
511
-
&serde_json::to_string(&data)?,
517
-
"{} redis error: {}",
518
-
format!("[{}]", user_id).bright_green(),
519
-
e.to_string().bright_red()
393
+
let previous_state = cache.get(&track_state_key)?;
395
+
let changed = match previous_state {
396
+
Some(state_str) => {
397
+
if let Ok(prev_state) = serde_json::from_str::<TrackState>(&state_str) {
398
+
if prev_state.track_id != current_item.id {
401
+
// Same track - check if we should scrobble based on progress and time
402
+
let progress_diff =
403
+
current.progress_ms.unwrap_or(0) as i64 - prev_state.progress_ms as i64;
404
+
let time_diff = now - prev_state.last_updated;
406
+
// Only consider it changed if:
407
+
// 1. We haven't scrobbled this track yet
408
+
// 2. Significant progress was made (more than 10 seconds or reasonable time passed)
409
+
// 3. Track is actually playing
410
+
!prev_state.scrobbled
411
+
&& current.is_playing
412
+
&& (progress_diff > 10000 || (time_diff > 30 && progress_diff > 0))
415
+
// Invalid previous state, treat as changed
420
+
// No previous state, treat as new track
425
+
let new_state = TrackState {
426
+
track_id: current_item.id.clone(),
427
+
progress_ms: current.progress_ms.unwrap_or(0),
428
+
scrobbled: changed, // Mark as scrobbled if we're reporting a change
525
-
Ok(Some((data, changed)))
432
+
cache.setex(&track_state_key, &serde_json::to_string(&new_state)?, 300)?;
···
557
-
match cache.setex(artist_id, &data, 20) {
561
-
"{} redis error: {}",
562
-
format!("[{}]", artist_id).bright_green(),
563
-
e.to_string().bright_red()
466
+
cache.setex(artist_id, &data, 20)?;
Ok(Some(serde_json::from_str(&data)?))
···
597
-
match cache.setex(album_id, &data, 20) {
601
-
"{} redis error: {}",
602
-
format!("[{}]", album_id).bright_green(),
603
-
e.to_string().bright_red()
495
+
cache.setex(album_id, &data, 20)?;
Ok(Some(serde_json::from_str(&data)?))
···
let headers = response.headers().clone();
let data = response.text().await?;
if data == "Too many requests" {
···
let album_tracks: AlbumTracks = serde_json::from_str(&data)?;
if album_tracks.items.is_empty() {
···
let all_tracks_json = serde_json::to_string(&all_tracks)?;
660
-
match cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20) {
664
-
"{} redis error: {}",
665
-
format!("[{}]", album_id).bright_green(),
666
-
e.to_string().bright_red()
547
+
cache.setex(&format!("{}:tracks", album_id), &all_tracks_json, 20)?;
···
) -> Result<Vec<(String, String, String, String)>, Error> {
let results: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
684
-
SELECT * FROM spotify_tokens
685
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
686
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
562
+
SELECT * FROM spotify_tokens
563
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
564
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
···
let mut user_tokens = vec![];
let token = decrypt_aes_256_ctr(
···
) -> Result<Option<(String, String, String)>, Error> {
let result: Vec<SpotifyTokenWithEmail> = sqlx::query_as(
719
-
SELECT * FROM spotify_tokens
720
-
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
721
-
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
722
-
WHERE spotify_accounts.email = $1
596
+
SELECT * FROM spotify_tokens
597
+
LEFT JOIN spotify_accounts ON spotify_tokens.user_id = spotify_accounts.user_id
598
+
LEFT JOIN users ON spotify_accounts.user_id = users.xata_id
599
+
WHERE spotify_accounts.email = $1
···
"Checking currently playing".cyan()
754
-
let stop_flag_clone = stop_flag.clone();
755
-
let spotify_email_clone = spotify_email.clone();
756
-
let cache_clone = cache.clone();
757
-
thread::spawn(move || {
759
-
if stop_flag_clone.load(std::sync::atomic::Ordering::Relaxed) {
761
-
"{} Stopping Thread",
762
-
format!("[{}]", spotify_email_clone).bright_green()
766
-
if let Ok(Some(cached)) = cache_clone.get(&format!("{}:current", spotify_email_clone)) {
767
-
if serde_json::from_str::<CurrentlyPlaying>(&cached).is_err() {
768
-
thread::sleep(std::time::Duration::from_millis(800));
772
-
let mut current_song = serde_json::from_str::<CurrentlyPlaying>(&cached)?;
774
-
if let Some(item) = current_song.item.clone() {
775
-
if current_song.is_playing
776
-
&& current_song.progress_ms.unwrap_or(0) < item.duration_ms.into()
778
-
current_song.progress_ms =
779
-
Some(current_song.progress_ms.unwrap_or(0) + 800);
780
-
match cache_clone.setex(
781
-
&format!("{}:current", spotify_email_clone),
782
-
&serde_json::to_string(¤t_song)?,
788
-
"{} redis error: {}",
789
-
format!("[{}]", spotify_email_clone).bright_green(),
790
-
e.to_string().bright_red()
794
-
thread::sleep(std::time::Duration::from_millis(800));
801
-
if let Ok(Some(cached)) = cache_clone.get(&spotify_email_clone) {
802
-
if cached == "No content" {
803
-
thread::sleep(std::time::Duration::from_millis(800));
806
-
match cache_clone.setex(&format!("{}:current", spotify_email_clone), &cached, 16) {
810
-
"{} redis error: {}",
811
-
format!("[{}]", spotify_email_clone).bright_green(),
812
-
e.to_string().bright_red()
818
-
thread::sleep(std::time::Duration::from_millis(800));
820
-
Ok::<(), Error>(())
631
+
// Remove the separate progress tracking thread - it was causing race conditions
632
+
// and unnecessary complexity
if stop_flag.load(std::sync::atomic::Ordering::Relaxed) {
···
let spotify_email = spotify_email.clone();
let token = token.clone();
···
format!("[{}]", spotify_email).bright_green(),
e.to_string().bright_red()
845
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
657
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
···
format!("[{}]", spotify_email).bright_green(),
"No song playing".yellow()
857
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
669
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
let data_item = data.item.unwrap();
"{} {} is_playing: {} changed: {}",
···
870
-
scrobble(cache.clone(), &spotify_email, &did, &token).await?;
682
+
// Only scrobble if there's a genuine track change and the track is playing
683
+
if changed && data.is_playing {
684
+
// Add a small delay to prevent rapid duplicate scrobbles
685
+
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
687
+
match scrobble(cache.clone(), &spotify_email, &did, &token).await {
691
+
format!("[{}]", spotify_email).bright_green(),
692
+
"Scrobbled successfully".green()
697
+
"{} Scrobble failed: {}",
698
+
format!("[{}]", spotify_email).bright_green(),
699
+
e.to_string().bright_red()
704
+
// Spawn background task for library updates
705
+
let cache_clone = cache.clone();
706
+
let token_clone = token.clone();
707
+
let spotify_email_clone = spotify_email.clone();
708
+
let did_clone = did.clone();
709
+
let album_id = data_item.album.id.clone();
let rt = tokio::runtime::Runtime::new().unwrap();
match rt.block_on(async {
875
-
get_album_tracks(cache.clone(), &data_item.album.id, &token).await?;
876
-
get_album(cache.clone(), &data_item.album.id, &token).await?;
877
-
update_library(cache.clone(), &spotify_email, &did, &token).await?;
714
+
get_album_tracks(cache_clone.clone(), &album_id, &token_clone).await?;
715
+
get_album(cache_clone.clone(), &album_id, &token_clone).await?;
717
+
cache_clone.clone(),
718
+
&spotify_email_clone,
727
+
"{} Library updated successfully",
728
+
format!("[{}]", spotify_email_clone).bright_green()
884
-
format!("[{}]", spotify_email).bright_green(),
733
+
"{} Library update failed: {}",
734
+
format!("[{}]", spotify_email_clone).bright_green(),
e.to_string().bright_red()
···
893
-
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
743
+
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;