From 8abde9f63282abd4fb44728128c26a6d1473dc0a Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Sat, 27 Sep 2025 10:12:17 +0300 Subject: [PATCH] feat(feed): implement feed service with configuration and handler - Added `rocksky-feed` crate with initial setup and dependencies. - Created `Config` struct for loading environment variables. - Implemented `Feed` trait for managing feed server operations. - Defined `FeedHandler` trait for handling feed-related actions. - Introduced `RecentlyPlayedFeed` and its handler for managing scrobbles. - Added types for requests, feed results, and DID documents. - Established routes for feed server using Warp framework. --- Cargo.lock | 548 +++++++++++++++++++++++++++++++- crates/feed/Cargo.toml | 35 ++ crates/feed/src/config.rs | 35 ++ crates/feed/src/feed.rs | 129 ++++++++ crates/feed/src/feed_handler.rs | 10 + crates/feed/src/lib.rs | 57 ++++ crates/feed/src/types.rs | 54 ++++ 7 files changed, 855 insertions(+), 13 deletions(-) create mode 100644 crates/feed/Cargo.toml create mode 100644 crates/feed/src/config.rs create mode 100644 crates/feed/src/feed.rs create mode 100644 crates/feed/src/feed_handler.rs create mode 100644 crates/feed/src/lib.rs create mode 100644 crates/feed/src/types.rs diff --git a/Cargo.lock b/Cargo.lock index 1ce9643..5c01297 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -417,9 +417,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.98" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "arc-swap" @@ -606,6 +606,30 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "async-compression" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9611ec0b6acea03372540509035db2f7f1e9f04da5d27728436fa994033c00a0" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "async-lock" +version = "3.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd03604047cee9b6ce9de9f70c6cd540a0520c813cbd49bae61f33ab80ed1dc" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-nats" version = "0.39.0" @@ -706,6 +730,66 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atrium-api" +version = "0.25.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bceed6a87be6213040945254aa94191490fea22b7f49dec584e349f336cc17a" +dependencies = [ + "atrium-common", + "atrium-xrpc", + "chrono", + "http 1.3.1", + "ipld-core", + "langtag", + "regex", + "serde", + "serde_bytes", + "serde_json", + "thiserror 1.0.69", + "tokio", + "trait-variant", +] + +[[package]] +name = "atrium-common" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed5610654043faa396a5a15afac0ac646d76aebe45aebd7cef4f8b96b0ab7f4" +dependencies = [ + "dashmap", + "lru", + "moka", + "thiserror 1.0.69", + "tokio", + "trait-variant", + "web-time", +] + +[[package]] +name = "atrium-xrpc" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0216ad50ce34e9ff982e171c3659e65dedaa2ed5ac2994524debdc9a9647ffa8" +dependencies = [ + "http 1.3.1", + "serde", + "serde_html_form", + "serde_json", + "thiserror 1.0.69", + "trait-variant", +] + +[[package]] +name = "atrium-xrpc-client" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e099e5171f79faef52364ef0657a4cab086a71b384a779a29597a91b780de0d5" +dependencies = [ + "atrium-xrpc", + "reqwest", +] + [[package]] name = "attohttpc" version = "0.28.5" @@ -768,6 +852,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base-x" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" + [[package]] name = "base64" version = "0.20.0" @@ -1119,6 +1209,20 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "cid" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" +dependencies = [ + "core2", + "multibase", + "multihash", + "serde", + "serde_bytes", + "unsigned-varint", +] + [[package]] name = "cipher" version = "0.4.4" @@ -1246,6 +1350,23 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "compression-codecs" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "485abf41ac0c8047c07c87c72c8fb3eb5197f6e9d7ded615dfd1a00ae00a0f64" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -1340,6 +1461,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -1490,12 +1620,46 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "data-encoding-macro" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d" +dependencies = [ + "data-encoding", + "data-encoding-macro-internal", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" +dependencies = [ + "data-encoding", + "syn 2.0.101", +] + [[package]] name = "der" version = "0.7.10" @@ -1779,6 +1943,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "extended" version = "0.1.0" @@ -1867,6 +2041,7 @@ checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" dependencies = [ "futures-core", "futures-sink", + "nanorand", "spin", ] @@ -1882,6 +2057,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -2228,6 +2418,30 @@ dependencies = [ "hashbrown 0.15.3", ] +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "heapless" version = "0.8.0" @@ -2378,6 +2592,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -2443,6 +2658,22 @@ dependencies = [ "webpki-roots 1.0.0", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.12" @@ -2626,6 +2857,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "ipld-core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "104718b1cc124d92a6d01ca9c9258a7df311405debb3408c445a36452f9bf8db" +dependencies = [ + "cid", + "serde", + "serde_bytes", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -2670,6 +2912,28 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jetstream-oxide" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a267b4f381f1db945e9eb6b9ff0f3070950cc01521fef6265556b6a683e41e13" +dependencies = [ + "async-trait", + "atrium-api", + "chrono", + "flume", + "futures-util", + "log", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tokio-tungstenite 0.24.0", + "tokio-util", + "url", + "zstd", +] + [[package]] name = "jni" version = "0.21.1" @@ -2854,6 +3118,15 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "langtag" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed60c85f254d6ae8450cec15eedd921efbc4d1bdf6fcf6202b9a58b403f6f805" +dependencies = [ + "serde", +] + [[package]] name = "language-tags" version = "0.3.2" @@ -3068,6 +3341,15 @@ version = "0.4.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.3", +] + [[package]] name = "lru-slab" version = "0.1.2" @@ -3396,12 +3678,60 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" +[[package]] +name = "openssl" +version = "0.10.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "openssl-probe" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" +[[package]] +name = "openssl-src" +version = "300.5.2+3.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d270b79e2926f5150189d475bc7e9d2c69f9c4697b185fa917d5a32b792d21b4" +dependencies = [ + "cc", +] + +[[package]] +name = "openssl-sys" +version = "0.9.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90096e2e47630d78b7d1c20952dc621f957103f8bc2c8359ec81290d75238571" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -4754,6 +5084,7 @@ version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d19c46a6fdd48bc4dab94b6103fccc55d34c67cc0ad04653aad4ea2a07cd7bbb" dependencies = [ + "async-compression", "base64 0.22.1", "bytes", "futures-core", @@ -4763,12 +5094,14 @@ dependencies = [ "http-body-util", "hyper 1.6.0", "hyper-rustls 0.27.6", + "hyper-tls", "hyper-util", "ipnet", "js-sys", "log", "mime", "mime_guess", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -4781,6 +5114,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", + "tokio-native-tls", "tokio-rustls 0.26.2", "tokio-util", "tower", @@ -4875,8 +5209,8 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tokio-tungstenite", - "tungstenite", + "tokio-tungstenite 0.26.2", + "tungstenite 0.26.2", ] [[package]] @@ -4910,6 +5244,27 @@ dependencies = [ "tracing", ] +[[package]] +name = "rocksky-feed" +version = "0.1.0" +dependencies = [ + "anyhow", + "atrium-api", + "atrium-xrpc-client", + "chrono", + "dotenv", + "duckdb", + "jetstream-oxide", + "owo-colors", + "reqwest", + "serde", + "serde_json", + "sqlx", + "tokio", + "tracing", + "warp", +] + [[package]] name = "rocksky-googledrive" version = "0.1.0" @@ -4961,9 +5316,9 @@ dependencies = [ "time", "tokio", "tokio-stream", - "tokio-tungstenite", + "tokio-tungstenite 0.26.2", "tracing", - "tungstenite", + "tungstenite 0.26.2", "url", ] @@ -5469,6 +5824,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "scoped-tls" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" + [[package]] name = "scopeguard" version = "1.2.0" @@ -5550,13 +5911,24 @@ checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" [[package]] name = "serde" -version = "1.0.219" +version = "1.0.227" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +checksum = "80ece43fc6fbed4eb5392ab50c07334d3e577cbf40997ee896fe7af40bba4245" dependencies = [ + "serde_core", "serde_derive", ] +[[package]] +name = "serde_bytes" +version = "0.11.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d440709e79d88e51ac01c4b72fc6cb7314017bb7da9eeff678aa94c10e3ea8" +dependencies = [ + "serde", + "serde_core", +] + [[package]] name = "serde_cbor" version = "0.11.2" @@ -5567,27 +5939,50 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_core" +version = "1.0.227" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a576275b607a2c86ea29e410193df32bc680303c82f31e275bbfcafe8b33be5" +dependencies = [ + "serde_derive", +] + [[package]] name = "serde_derive" -version = "1.0.219" +version = "1.0.227" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +checksum = "51e694923b8824cf0e9b382adf0f60d4e05f348f357b38833a3fa5ed7c2ede04" dependencies = [ "proc-macro2", "quote", "syn 2.0.101", ] +[[package]] +name = "serde_html_form" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2f2d7ff8a2140333718bb329f5c40fc5f0865b84c426183ce14c97d2ab8154f" +dependencies = [ + "form_urlencoded", + "indexmap", + "itoa", + "ryu", + "serde_core", +] + [[package]] name = "serde_json" -version = "1.0.140" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ "itoa", "memchr", "ryu", "serde", + "serde_core", ] [[package]] @@ -6455,6 +6850,12 @@ dependencies = [ "windows", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tap" version = "1.0.1" @@ -6628,6 +7029,16 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-postgres" version = "0.7.13" @@ -6686,6 +7097,32 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "tokio-tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c83b561d025642014097b66e6c1bb422783339e0909e4429cde4749d1990bc38" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "native-tls", + "tokio", + "tokio-native-tls", + "tungstenite 0.24.0", +] + [[package]] name = "tokio-tungstenite" version = "0.26.2" @@ -6698,7 +7135,7 @@ dependencies = [ "rustls-pki-types", "tokio", "tokio-rustls 0.26.2", - "tungstenite", + "tungstenite 0.26.2", "webpki-roots 0.26.11", ] @@ -6902,6 +7339,17 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "trait-variant" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70977707304198400eb4835a78f6a9f928bf41bba420deb8fdb175cd965d77a7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "trim-in-place" version = "0.1.7" @@ -6925,6 +7373,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "tungstenite" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ef1a641ea34f399a848dea702823bbecfb4c486f911735368f1f137cb8257e1" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http 1.3.1", + "httparse", + "log", + "native-tls", + "rand 0.8.5", + "sha1", + "thiserror 1.0.69", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.26.2" @@ -7032,6 +7519,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsigned-varint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" + [[package]] name = "untrusted" version = "0.9.0" @@ -7124,6 +7617,35 @@ dependencies = [ "try-lock", ] +[[package]] +name = "warp" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4378d202ff965b011c64817db11d5829506d3404edeadb61f190d111da3f231c" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "headers", + "http 0.2.12", + "hyper 0.14.32", + "log", + "mime", + "mime_guess", + "multer", + "percent-encoding", + "pin-project", + "scoped-tls", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tungstenite 0.21.0", + "tokio-util", + "tower-service", + "tracing", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/crates/feed/Cargo.toml b/crates/feed/Cargo.toml new file mode 100644 index 0000000..a44191b --- /dev/null +++ b/crates/feed/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "rocksky-feed" +version = "0.1.0" +authors.workspace = true +edition.workspace = true +license.workspace = true +repository.workspace = true + +[dependencies] +warp = "0.3.7" +owo-colors = "4.1.0" +anyhow = "1.0.100" +atrium-api = "0.25.5" +atrium-xrpc-client = "0.5.14" +chrono = { version = "= 0.4.39", features = ["serde"] } +duckdb = { version = "1.2.0", features = ["chrono"] } +jetstream-oxide = "=0.1.2" +reqwest = { version = "0.12.12", features = [ + "rustls-tls", + "json", + "multipart", +], default-features = false } +serde = "1.0.227" +serde_json = "1.0.145" +tokio = { version = "1.43.0", features = ["full"] } +tracing = "0.1.41" +dotenv = "0.15.0" +sqlx = { version = "0.8.3", features = [ + "runtime-tokio", + "tls-rustls", + "postgres", + "chrono", + "derive", + "macros", +] } diff --git a/crates/feed/src/config.rs b/crates/feed/src/config.rs new file mode 100644 index 0000000..fa3e253 --- /dev/null +++ b/crates/feed/src/config.rs @@ -0,0 +1,35 @@ +use std::env; + +use dotenv::dotenv; + +#[derive(Debug, Clone)] +/// Configuration values for a Feed service +pub struct Config { + /// Your account's decentralized identifier (DID) + /// A DID is a persistent, long-term identifier for every account. Usually look like did:plc:ewvi7nxzyoun6zhxrhs64oiz. + pub publisher_did: String, + /// The host name for your feed generator. + /// + /// For example: if github were to host a feed generator service at their domain they would set this value to `github.com` + /// + /// You can develop your feed locally without setting this to a real value. However, when publishing, this value must be a domain that: + /// - Points to your service. + /// - Is secured with SSL (HTTPS). + /// - Is accessible on the public internet. + pub feed_generator_hostname: String, +} + +impl Config { + /// Loads the config from a local .env file containing these variables + /// PUBLISHER_DID + /// FEED_GENERATOR_HOSTNAME + pub fn load_env_config() -> Self { + dotenv().expect("Missing .env"); + Config { + publisher_did: env::var("PUBLISHER_DID") + .expect(".env file is missing an entry for PUBLISHER_DID"), + feed_generator_hostname: env::var("FEED_GENERATOR_HOSTNAME") + .expect(".env file is missing an entry for FEED_GENERATOR_HOSTNAME"), + } + } +} diff --git a/crates/feed/src/feed.rs b/crates/feed/src/feed.rs new file mode 100644 index 0000000..008c999 --- /dev/null +++ b/crates/feed/src/feed.rs @@ -0,0 +1,129 @@ +use crate::config::Config; +use crate::types::{DidDocument, FeedSkeletonParameters, Service}; +use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery}; +use std::fmt::Debug; +use std::net::SocketAddr; +use warp::Filter; + +/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. +pub trait Feed { + fn handler(&mut self) -> Handler; + /// Starts the feed generator server & connects to the firehose. + /// + /// + /// - name: The identifying name of your feed. This value is used in the feed URL & when identifying which feed to *unpublish*. This is a separate value from the display name. + /// - address: The address to bind the server to + /// + /// # Panics + /// + /// Panics if unable to bind to the provided address. + fn start( + &mut self, + name: impl AsRef, + address: impl Into + Debug + Clone + Send, + ) -> impl std::future::Future + Send { + self.start_with_config(name, Config::load_env_config(), address) + } + + /// Starts the feed generator server & connects to the firehose. + /// + /// - name: The identifying name of your feed. This value is used in the feed URL & when identifying which feed to *unpublish*. This is a separate value from the display name. + /// - config: Configuration values, see `Config` + /// - address: The address to bind the server to + /// + /// # Panics + /// + /// Panics if unable to bind to the provided address. + fn start_with_config( + &mut self, + name: impl AsRef, + config: Config, + address: impl Into + Debug + Clone + Send, + ) -> impl std::future::Future + Send { + let handler = self.handler(); + let address = address.clone(); + let feed_name = name.as_ref().to_string(); + + async move { + let config = config; + + let did_config = config.clone(); + let did_json = warp::path(".well-known") + .and(warp::path("did.json")) + .and(warp::get()) + .and_then(move || did_json(did_config.clone())); + + let describe_feed_generator = warp::path("xrpc") + .and(warp::path("app.rocksky.feed.describeFeedGenerator")) + .and(warp::get()) + .and_then(move || describe_feed_generator(feed_name.clone())); + + let get_feed_handler = handler.clone(); + let get_feed_skeleton = warp::path("xrpc") + .and(warp::path("app.rocksky.feed.getFeedSkeleton")) + .and(warp::get()) + .and(warp::query::()) + .and_then(move |query: FeedSkeletonParameters| { + get_feed_skeleton::(query.into(), get_feed_handler.clone()) + }); + + let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); + + tracing::info!("Serving feed on {}", format!("{:?}", address)); + + let routes = api.with(warp::log::custom(|info| { + let method = info.method(); + let path = info.path(); + let status = info.status(); + let elapsed = info.elapsed().as_millis(); + + if status.is_success() { + tracing::info!( + "Method: {}, Path: {}, Status: {}, Elapsed Time: {}ms", + method, + path, + status, + elapsed + ); + } else { + tracing::error!( + "Method: {}, Path: {}, Status: {}, Elapsed Time: {}ms", + method, + path, + status, + elapsed, + ); + } + })); + let feed_server = warp::serve(routes); + let firehose_listener = tokio::spawn(async move {}); + + tokio::join!(feed_server.run(address), firehose_listener) + .1 + .expect("Couldn't await tasks"); + } + } +} + +async fn did_json(config: Config) -> Result { + Ok(warp::reply::json(&DidDocument { + context: vec!["https://www.w3.org/ns/did/v1".to_owned()], + id: format!("did:web:{}", config.feed_generator_hostname), + service: vec![Service { + id: "#rsky_fg".to_owned(), + type_: "RskyFeedGenerator".to_owned(), + service_endpoint: format!("https://{}", config.feed_generator_hostname), + }], + })) +} + +async fn describe_feed_generator(feed_name: String) -> Result { + Ok(warp::reply::json(&serde_json::json!({}))) +} + +async fn get_feed_skeleton( + query: FeedSkeletonQuery, + handler: Handler, +) -> Result { + Ok(warp::reply::json(&serde_json::json!({}))) +} diff --git a/crates/feed/src/feed_handler.rs b/crates/feed/src/feed_handler.rs new file mode 100644 index 0000000..f7b7945 --- /dev/null +++ b/crates/feed/src/feed_handler.rs @@ -0,0 +1,10 @@ +use crate::types::{FeedResult, Request, Scrobble, Uri}; + +/// A feed handler is responsible for +/// - Storing and managing firehose input. +/// - Serving responses to feed requests with `serve_feed` +pub trait FeedHandler { + fn insert_scrobble(&self, scrobble: Scrobble) -> impl std::future::Future + Send; + fn delete_scrobble(&self, uri: Uri) -> impl std::future::Future + Send; + fn serve_feed(&self, request: Request) -> impl std::future::Future + Send; +} diff --git a/crates/feed/src/lib.rs b/crates/feed/src/lib.rs new file mode 100644 index 0000000..00106ff --- /dev/null +++ b/crates/feed/src/lib.rs @@ -0,0 +1,57 @@ +use std::{env, net::SocketAddr, sync::Arc}; + +use tokio::sync::Mutex; + +use crate::{ + feed::Feed, + feed_handler::FeedHandler, + types::{FeedResult, Scrobble}, +}; + +pub mod config; +pub mod feed; +pub mod feed_handler; +pub mod types; + +pub struct RecentlyPlayedFeed { + handler: RecentlyPlayedFeedHandler, +} + +impl Feed for RecentlyPlayedFeed { + fn handler(&mut self) -> RecentlyPlayedFeedHandler { + self.handler.clone() + } +} + +#[derive(Clone)] +pub struct RecentlyPlayedFeedHandler { + pub scrobbles: Arc>>, +} + +impl FeedHandler for RecentlyPlayedFeedHandler { + async fn insert_scrobble(&self, scrobble: Scrobble) { + todo!() + } + + async fn delete_scrobble(&self, uri: types::Uri) { + todo!() + } + + async fn serve_feed(&self, request: types::Request) -> FeedResult { + todo!() + } +} + +pub async fn run() { + let mut feed = RecentlyPlayedFeed { + handler: RecentlyPlayedFeedHandler { + scrobbles: Arc::new(Mutex::new(Vec::new())), + }, + }; + let host = env::var("FEED_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); + let port = env::var("FEED_PORT").unwrap_or_else(|_| "7885".to_string()); + let addr_str = format!("{}:{}", host, port); + let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); + + feed.start("recently-played", addr).await; +} diff --git a/crates/feed/src/types.rs b/crates/feed/src/types.rs new file mode 100644 index 0000000..2fd5669 --- /dev/null +++ b/crates/feed/src/types.rs @@ -0,0 +1,54 @@ +#[derive(Debug, Clone)] +pub struct Request { + pub cursor: Option, + pub feed: String, + pub limit: Option, +} + +#[derive(Debug, Clone)] +pub struct Cid(pub String); + +#[derive(Debug, Clone)] +pub struct Did(pub String); + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct Uri(pub String); + +#[derive(Debug, Clone)] +pub struct FeedResult { + pub cursor: Option, + pub feed: Vec, +} + +pub struct FeedSkeletonQuery {} + +#[derive(Deserialize)] +pub struct FeedSkeletonParameters {} + +impl Into for FeedSkeletonParameters { + fn into(self) -> FeedSkeletonQuery { + FeedSkeletonQuery {} + } +} + +#[derive(Debug, Clone)] +pub struct Scrobble {} + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize)] +pub(crate) struct DidDocument { + #[serde(rename = "@context")] + pub(crate) context: Vec, + pub(crate) id: String, + pub(crate) service: Vec, +} + +#[derive(Serialize)] +pub(crate) struct Service { + pub(crate) id: String, + #[serde(rename = "type")] + pub(crate) type_: String, + #[serde(rename = "serviceEndpoint")] + pub(crate) service_endpoint: String, +} -- 2.43.0 From fa870f15acf4e917d1dfbb26313e4c475b1c06c5 Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Sat, 27 Sep 2025 20:36:41 +0300 Subject: [PATCH] feat(feed): add feed command with serve subcommand and integrate feed module --- Cargo.lock | 1 + crates/feed/src/feed.rs | 4 ++-- crates/feed/src/lib.rs | 2 +- crates/rockskyd/Cargo.toml | 1 + crates/rockskyd/src/cmd/feed.rs | 6 ++++++ crates/rockskyd/src/cmd/mod.rs | 1 + crates/rockskyd/src/main.rs | 9 +++++++++ 7 files changed, 21 insertions(+), 3 deletions(-) create mode 100644 crates/rockskyd/src/cmd/feed.rs diff --git a/Cargo.lock b/Cargo.lock index 5c01297..9cb2e8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5479,6 +5479,7 @@ dependencies = [ "owo-colors", "rocksky-analytics", "rocksky-dropbox", + "rocksky-feed", "rocksky-googledrive", "rocksky-jetstream", "rocksky-playlists", diff --git a/crates/feed/src/feed.rs b/crates/feed/src/feed.rs index 008c999..4a596a1 100644 --- a/crates/feed/src/feed.rs +++ b/crates/feed/src/feed.rs @@ -110,8 +110,8 @@ async fn did_json(config: Config) -> Result { context: vec!["https://www.w3.org/ns/did/v1".to_owned()], id: format!("did:web:{}", config.feed_generator_hostname), service: vec![Service { - id: "#rsky_fg".to_owned(), - type_: "RskyFeedGenerator".to_owned(), + id: "#rocksky_fg".to_owned(), + type_: "RockskyFeedGenerator".to_owned(), service_endpoint: format!("https://{}", config.feed_generator_hostname), }], })) diff --git a/crates/feed/src/lib.rs b/crates/feed/src/lib.rs index 00106ff..37a941f 100644 --- a/crates/feed/src/lib.rs +++ b/crates/feed/src/lib.rs @@ -53,5 +53,5 @@ pub async fn run() { let addr_str = format!("{}:{}", host, port); let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); - feed.start("recently-played", addr).await; + feed.start("RecentlyPlayed", addr).await; } diff --git a/crates/rockskyd/Cargo.toml b/crates/rockskyd/Cargo.toml index a4071da..0517cf5 100644 --- a/crates/rockskyd/Cargo.toml +++ b/crates/rockskyd/Cargo.toml @@ -20,6 +20,7 @@ rocksky-scrobbler = { path = "../scrobbler" } rocksky-spotify = { path = "../spotify" } rocksky-tracklist = { path = "../tracklist" } rocksky-webscrobbler = { path = "../webscrobbler" } +rocksky-feed = { path = "../feed" } tracing = "0.1.41" tracing-subscriber = "0.3.20" tracing-log = "0.2.0" diff --git a/crates/rockskyd/src/cmd/feed.rs b/crates/rockskyd/src/cmd/feed.rs new file mode 100644 index 0000000..2ace58c --- /dev/null +++ b/crates/rockskyd/src/cmd/feed.rs @@ -0,0 +1,6 @@ +use anyhow::Error; + +pub async fn serve() -> Result<(), Error> { + rocksky_feed::run().await; + Ok(()) +} diff --git a/crates/rockskyd/src/cmd/mod.rs b/crates/rockskyd/src/cmd/mod.rs index 74b52f7..cb4ebda 100644 --- a/crates/rockskyd/src/cmd/mod.rs +++ b/crates/rockskyd/src/cmd/mod.rs @@ -1,5 +1,6 @@ pub mod analytics; pub mod dropbox; +pub mod feed; pub mod googledrive; pub mod jetstream; pub mod playlist; diff --git a/crates/rockskyd/src/main.rs b/crates/rockskyd/src/main.rs index 20b6310..a5b3983 100644 --- a/crates/rockskyd/src/main.rs +++ b/crates/rockskyd/src/main.rs @@ -32,6 +32,11 @@ fn cli() -> Command { .subcommand(Command::new("spotify").about("Start Spotify Listener Service")) .subcommand(Command::new("tracklist").about("Start User Current Track Queue Service")) .subcommand(Command::new("webscrobbler").about("Start Webscrobbler API")) + .subcommand( + Command::new("feed") + .about("Feed related commands") + .subcommand(Command::new("serve").about("Serve the Rocksky Feed API")), + ) } #[tokio::main] @@ -85,6 +90,10 @@ async fn main() -> Result<(), Box> { Some(("webscrobbler", _)) => { cmd::webscrobbler::start_webscrobbler_service().await?; } + Some(("feed", sub_m)) => match sub_m.subcommand() { + Some(("serve", _)) => cmd::feed::serve().await?, + _ => println!("Unknown feed command"), + }, _ => { println!("No valid subcommand was used. Use --help to see available commands."); } -- 2.43.0 From b81a0cf6001b78bcf96689038861dcaadb68a336 Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Sun, 28 Sep 2025 04:54:11 +0300 Subject: [PATCH] fix: update feed and lib modules to return Result types for better error handling --- crates/feed/src/feed.rs | 31 +++++++++++++++++++++++++------ crates/feed/src/lib.rs | 7 ++++--- crates/rockskyd/src/cmd/feed.rs | 2 +- 3 files changed, 30 insertions(+), 10 deletions(-) diff --git a/crates/feed/src/feed.rs b/crates/feed/src/feed.rs index 4a596a1..b134805 100644 --- a/crates/feed/src/feed.rs +++ b/crates/feed/src/feed.rs @@ -1,8 +1,13 @@ use crate::config::Config; use crate::types::{DidDocument, FeedSkeletonParameters, Service}; use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery}; +use anyhow::Error; +use sqlx::postgres::PgPoolOptions; +use sqlx::{Pool, Postgres}; +use std::env; use std::fmt::Debug; use std::net::SocketAddr; +use std::sync::Arc; use warp::Filter; /// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods. @@ -21,7 +26,7 @@ pub trait Feed { &mut self, name: impl AsRef, address: impl Into + Debug + Clone + Send, - ) -> impl std::future::Future + Send { + ) -> impl std::future::Future> + Send { self.start_with_config(name, Config::load_env_config(), address) } @@ -39,13 +44,19 @@ pub trait Feed { name: impl AsRef, config: Config, address: impl Into + Debug + Clone + Send, - ) -> impl std::future::Future + Send { + ) -> impl std::future::Future> + Send { let handler = self.handler(); let address = address.clone(); let feed_name = name.as_ref().to_string(); async move { let config = config; + let pool = PgPoolOptions::new() + .max_connections(5) + .connect(&env::var("XATA_POSTGRES_URL")?) + .await?; + let pool = Arc::new(pool); + let db_filter = warp::any().map(move || pool.clone()); let did_config = config.clone(); let did_json = warp::path(".well-known") @@ -56,16 +67,22 @@ pub trait Feed { let describe_feed_generator = warp::path("xrpc") .and(warp::path("app.rocksky.feed.describeFeedGenerator")) .and(warp::get()) - .and_then(move || describe_feed_generator(feed_name.clone())); + .and(db_filter.clone()) + .and_then(move |_pool: Arc>| { + describe_feed_generator(feed_name.clone()) + }); let get_feed_handler = handler.clone(); let get_feed_skeleton = warp::path("xrpc") .and(warp::path("app.rocksky.feed.getFeedSkeleton")) .and(warp::get()) .and(warp::query::()) - .and_then(move |query: FeedSkeletonParameters| { - get_feed_skeleton::(query.into(), get_feed_handler.clone()) - }); + .and(db_filter.clone()) + .and_then( + move |query: FeedSkeletonParameters, _pool: Arc>| { + get_feed_skeleton::(query.into(), get_feed_handler.clone()) + }, + ); let api = did_json.or(describe_feed_generator).or(get_feed_skeleton); @@ -101,6 +118,8 @@ pub trait Feed { tokio::join!(feed_server.run(address), firehose_listener) .1 .expect("Couldn't await tasks"); + + Ok::<(), Error>(()) } } } diff --git a/crates/feed/src/lib.rs b/crates/feed/src/lib.rs index 37a941f..4de28dd 100644 --- a/crates/feed/src/lib.rs +++ b/crates/feed/src/lib.rs @@ -1,5 +1,5 @@ +use anyhow::Error; use std::{env, net::SocketAddr, sync::Arc}; - use tokio::sync::Mutex; use crate::{ @@ -42,7 +42,7 @@ impl FeedHandler for RecentlyPlayedFeedHandler { } } -pub async fn run() { +pub async fn run() -> Result<(), Error> { let mut feed = RecentlyPlayedFeed { handler: RecentlyPlayedFeedHandler { scrobbles: Arc::new(Mutex::new(Vec::new())), @@ -53,5 +53,6 @@ pub async fn run() { let addr_str = format!("{}:{}", host, port); let addr: SocketAddr = addr_str.parse().expect("Invalid address format"); - feed.start("RecentlyPlayed", addr).await; + feed.start("RecentlyPlayed", addr).await?; + Ok(()) } diff --git a/crates/rockskyd/src/cmd/feed.rs b/crates/rockskyd/src/cmd/feed.rs index 2ace58c..fde2ab7 100644 --- a/crates/rockskyd/src/cmd/feed.rs +++ b/crates/rockskyd/src/cmd/feed.rs @@ -1,6 +1,6 @@ use anyhow::Error; pub async fn serve() -> Result<(), Error> { - rocksky_feed::run().await; + rocksky_feed::run().await?; Ok(()) } -- 2.43.0 From cb9c357360cb1d667fdf434246160e278b588baa Mon Sep 17 00:00:00 2001 From: Tsiry Sandratraina Date: Sun, 28 Sep 2025 16:14:38 +0300 Subject: [PATCH] feat: add new dependencies for moka, multer, multibase, multihash, nanorand, and native-tls --- Cargo.lock | 87 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 9cb2e8b..d8a8130 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3466,6 +3466,67 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "moka" +version = "0.12.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077" +dependencies = [ + "async-lock", + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "equivalent", + "event-listener", + "futures-util", + "parking_lot", + "portable-atomic", + "rustc_version", + "smallvec", + "tagptr", + "uuid", +] + +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 0.2.12", + "httparse", + "log", + "memchr", + "mime", + "spin", + "version_check", +] + +[[package]] +name = "multibase" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" +dependencies = [ + "base-x", + "data-encoding", + "data-encoding-macro", +] + +[[package]] +name = "multihash" +version = "0.19.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" +dependencies = [ + "core2", + "serde", + "unsigned-varint", +] + [[package]] name = "nanoid" version = "0.4.0" @@ -3475,6 +3536,32 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "native-tls" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework 2.11.1", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nkeys" version = "0.4.4" -- 2.43.0