mod common; use chrono::{Duration, Utc}; use reqwest::StatusCode; use serde_json::{json, Value}; use prism::records::{NewAccount, NewRecord, CHANNEL_COLLECTION}; use std::sync::atomic::{AtomicU64, Ordering}; static TEST_COUNTER: AtomicU64 = AtomicU64::new(0); fn unique_suffix() -> String { let ts = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos(); let count = TEST_COUNTER.fetch_add(1, Ordering::SeqCst); format!("{}_{}", ts, count) } async fn insert_test_account(pool: &sqlx::PgPool, did: &str) { prism::db::upsert_account( pool, &NewAccount { did: did.to_string(), handle: did.to_string(), created_at: Utc::now(), }, ) .await .unwrap(); } async fn insert_channel_at_time( pool: &sqlx::PgPool, cid: &str, creator_did: &str, name: &str, indexed_at: chrono::DateTime, ) { let uri = format!("at://{}/systems.gmstn.development.channel/{}", creator_did, cid); prism::db::insert_record( pool, &NewRecord { uri, cid: cid.to_string(), collection: CHANNEL_COLLECTION.to_string(), creator_did: creator_did.to_string(), created_at: indexed_at, indexed_at, data: json!({"name": name, "topic": "test topic"}), target_did: None, ref_cids: vec![], }, ) .await .unwrap(); } #[tokio::test] async fn test_pagination_first_page_returns_latest() { let base_url = common::base_url().await; let pool = common::get_db_pool().await; let client = common::client(); let suffix = unique_suffix(); let test_did = format!("did:plc:pagination_first_{}", suffix); insert_test_account(&pool, &test_did).await; let now = Utc::now(); insert_channel_at_time(&pool, &format!("bafypagfirst001_{}", suffix), &test_did, "Oldest", now - Duration::hours(3)) .await; insert_channel_at_time(&pool, &format!("bafypagfirst002_{}", suffix), &test_did, "Middle", now - Duration::hours(2)) .await; insert_channel_at_time(&pool, &format!("bafypagfirst003_{}", suffix), &test_did, "Newest", now - Duration::hours(1)) .await; let res = client .get(format!( "{}/xrpc/systems.gmstn.development.channel.listChannels?author={}&limit=2", base_url, test_did )) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); let body: Value = res.json().await.unwrap(); let channels = body["channels"].as_array().unwrap(); assert_eq!(channels.len(), 2); assert_eq!(channels[0]["displayName"].as_str().unwrap(), "Newest"); assert_eq!(channels[1]["displayName"].as_str().unwrap(), "Middle"); assert!(body["cursor"].is_string()); } #[tokio::test] async fn test_pagination_with_cursor_excludes_newer() { let base_url = common::base_url().await; let pool = common::get_db_pool().await; let client = common::client(); let suffix = unique_suffix(); let test_did = format!("did:plc:pagination_cursor_{}", suffix); insert_test_account(&pool, &test_did).await; let now = Utc::now(); let oldest_time = now - Duration::hours(3); let middle_time = now - Duration::hours(2); let newest_time = now - Duration::hours(1); insert_channel_at_time(&pool, &format!("bafypagcur001_{}", suffix), &test_did, "Oldest", oldest_time).await; insert_channel_at_time(&pool, &format!("bafypagcur002_{}", suffix), &test_did, "Middle", middle_time).await; insert_channel_at_time(&pool, &format!("bafypagcur003_{}", suffix), &test_did, "Newest", newest_time).await; let cursor = middle_time.to_rfc3339(); let res = client .get(format!( "{}/xrpc/systems.gmstn.development.channel.listChannels", base_url )) .query(&[("author", test_did.as_str()), ("cursor", cursor.as_str()), ("limit", "10")]) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); let body: Value = res.json().await.unwrap(); let channels = body["channels"].as_array().unwrap(); assert_eq!(channels.len(), 1); assert_eq!(channels[0]["displayName"].as_str().unwrap(), "Oldest"); } #[tokio::test] async fn test_pagination_returns_cursor_for_continuation() { let base_url = common::base_url().await; let pool = common::get_db_pool().await; let client = common::client(); let suffix = unique_suffix(); let test_did = format!("did:plc:pagination_single_{}", suffix); insert_test_account(&pool, &test_did).await; let now = Utc::now(); insert_channel_at_time(&pool, &format!("bafypagsingle001_{}", suffix), &test_did, "Only", now - Duration::hours(1)) .await; let res = client .get(format!( "{}/xrpc/systems.gmstn.development.channel.listChannels?author={}&limit=10", base_url, test_did )) .send() .await .unwrap(); assert_eq!(res.status(), StatusCode::OK); let body: Value = res.json().await.unwrap(); let channels = body["channels"].as_array().unwrap(); assert_eq!(channels.len(), 1); assert!(body["cursor"].is_string()); } #[tokio::test] async fn test_pagination_full_traversal() { let base_url = common::base_url().await; let pool = common::get_db_pool().await; let client = common::client(); let suffix = unique_suffix(); let test_did = format!("did:plc:pagination_full_{}", suffix); insert_test_account(&pool, &test_did).await; let now = Utc::now(); for i in 0..5 { insert_channel_at_time( &pool, &format!("bafypagfull{:03}_{}", i, suffix), &test_did, &format!("Channel {}", i), now - Duration::hours(5 - i as i64), ) .await; } let mut all_names: Vec = Vec::new(); let mut cursor: Option = None; loop { let mut req = client .get(format!( "{}/xrpc/systems.gmstn.development.channel.listChannels", base_url )) .query(&[("author", test_did.as_str()), ("limit", "2")]); if let Some(c) = &cursor { req = req.query(&[("cursor", c.as_str())]); } let res = req.send().await.unwrap(); assert_eq!(res.status(), StatusCode::OK); let body: Value = res.json().await.unwrap(); let channels = body["channels"].as_array().unwrap(); if channels.is_empty() { break; } for channel in channels { all_names.push(channel["displayName"].as_str().unwrap().to_string()); } cursor = body["cursor"].as_str().map(String::from); if cursor.is_none() { break; } } assert_eq!(all_names.len(), 5); assert_eq!(all_names[0], "Channel 4"); assert_eq!(all_names[4], "Channel 0"); }