feat(feed): implement feed service #8

open
opened by tsiry-sandratraina.com targeting main from feat/feed-generator
Changed files
+261 -9
crates
+35
crates/feed/Cargo.toml
···
+
[package]
+
name = "rocksky-feed"
+
version = "0.1.0"
+
authors.workspace = true
+
edition.workspace = true
+
license.workspace = true
+
repository.workspace = true
+
+
[dependencies]
+
warp = "0.3.7"
+
owo-colors = "4.1.0"
+
anyhow = "1.0.100"
+
atrium-api = "0.25.5"
+
atrium-xrpc-client = "0.5.14"
+
chrono = { version = "= 0.4.39", features = ["serde"] }
+
duckdb = { version = "1.2.0", features = ["chrono"] }
+
jetstream-oxide = "=0.1.2"
+
reqwest = { version = "0.12.12", features = [
+
"rustls-tls",
+
"json",
+
"multipart",
+
], default-features = false }
+
serde = "1.0.227"
+
serde_json = "1.0.145"
+
tokio = { version = "1.43.0", features = ["full"] }
+
tracing = "0.1.41"
+
dotenv = "0.15.0"
+
sqlx = { version = "0.8.3", features = [
+
"runtime-tokio",
+
"tls-rustls",
+
"postgres",
+
"chrono",
+
"derive",
+
"macros",
+
] }
+35
crates/feed/src/config.rs
···
+
use std::env;
+
+
use dotenv::dotenv;
+
+
#[derive(Debug, Clone)]
+
/// Configuration values for a Feed service
+
pub struct Config {
+
/// Your account's decentralized identifier (DID)
+
/// A DID is a persistent, long-term identifier for every account. Usually look like did:plc:ewvi7nxzyoun6zhxrhs64oiz.
+
pub publisher_did: String,
+
/// The host name for your feed generator.
+
///
+
/// For example: if github were to host a feed generator service at their domain they would set this value to `github.com`
+
///
+
/// You can develop your feed locally without setting this to a real value. However, when publishing, this value must be a domain that:
+
/// - Points to your service.
+
/// - Is secured with SSL (HTTPS).
+
/// - Is accessible on the public internet.
+
pub feed_generator_hostname: String,
+
}
+
+
impl Config {
+
/// Loads the config from a local .env file containing these variables
+
/// PUBLISHER_DID
+
/// FEED_GENERATOR_HOSTNAME
+
pub fn load_env_config() -> Self {
+
dotenv().expect("Missing .env");
+
Config {
+
publisher_did: env::var("PUBLISHER_DID")
+
.expect(".env file is missing an entry for PUBLISHER_DID"),
+
feed_generator_hostname: env::var("FEED_GENERATOR_HOSTNAME")
+
.expect(".env file is missing an entry for FEED_GENERATOR_HOSTNAME"),
+
}
+
}
+
}
+10
crates/feed/src/feed_handler.rs
···
+
use crate::types::{FeedResult, Request, Scrobble, Uri};
+
+
/// A feed handler is responsible for
+
/// - Storing and managing firehose input.
+
/// - Serving responses to feed requests with `serve_feed`
+
pub trait FeedHandler {
+
fn insert_scrobble(&self, scrobble: Scrobble) -> impl std::future::Future<Output = ()> + Send;
+
fn delete_scrobble(&self, uri: Uri) -> impl std::future::Future<Output = ()> + Send;
+
fn serve_feed(&self, request: Request) -> impl std::future::Future<Output = FeedResult> + Send;
+
}
+54
crates/feed/src/types.rs
···
+
#[derive(Debug, Clone)]
+
pub struct Request {
+
pub cursor: Option<String>,
+
pub feed: String,
+
pub limit: Option<u8>,
+
}
+
+
#[derive(Debug, Clone)]
+
pub struct Cid(pub String);
+
+
#[derive(Debug, Clone)]
+
pub struct Did(pub String);
+
+
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
+
pub struct Uri(pub String);
+
+
#[derive(Debug, Clone)]
+
pub struct FeedResult {
+
pub cursor: Option<String>,
+
pub feed: Vec<Uri>,
+
}
+
+
pub struct FeedSkeletonQuery {}
+
+
#[derive(Deserialize)]
+
pub struct FeedSkeletonParameters {}
+
+
impl Into<FeedSkeletonQuery> for FeedSkeletonParameters {
+
fn into(self) -> FeedSkeletonQuery {
+
FeedSkeletonQuery {}
+
}
+
}
+
+
#[derive(Debug, Clone)]
+
pub struct Scrobble {}
+
+
use serde::{Deserialize, Serialize};
+
+
#[derive(Serialize)]
+
pub(crate) struct DidDocument {
+
#[serde(rename = "@context")]
+
pub(crate) context: Vec<String>,
+
pub(crate) id: String,
+
pub(crate) service: Vec<Service>,
+
}
+
+
#[derive(Serialize)]
+
pub(crate) struct Service {
+
pub(crate) id: String,
+
#[serde(rename = "type")]
+
pub(crate) type_: String,
+
#[serde(rename = "serviceEndpoint")]
+
pub(crate) service_endpoint: String,
+
}
+1
crates/rockskyd/Cargo.toml
···
rocksky-spotify = { path = "../spotify" }
rocksky-tracklist = { path = "../tracklist" }
rocksky-webscrobbler = { path = "../webscrobbler" }
+
rocksky-feed = { path = "../feed" }
tracing = "0.1.41"
tracing-subscriber = "0.3.20"
tracing-log = "0.2.0"
+1
crates/rockskyd/src/cmd/mod.rs
···
pub mod analytics;
pub mod dropbox;
+
pub mod feed;
pub mod googledrive;
pub mod jetstream;
pub mod playlist;
+9
crates/rockskyd/src/main.rs
···
.subcommand(Command::new("spotify").about("Start Spotify Listener Service"))
.subcommand(Command::new("tracklist").about("Start User Current Track Queue Service"))
.subcommand(Command::new("webscrobbler").about("Start Webscrobbler API"))
+
.subcommand(
+
Command::new("feed")
+
.about("Feed related commands")
+
.subcommand(Command::new("serve").about("Serve the Rocksky Feed API")),
+
)
}
#[tokio::main]
···
Some(("webscrobbler", _)) => {
cmd::webscrobbler::start_webscrobbler_service().await?;
}
+
Some(("feed", sub_m)) => match sub_m.subcommand() {
+
Some(("serve", _)) => cmd::feed::serve().await?,
+
_ => println!("Unknown feed command"),
+
},
_ => {
println!("No valid subcommand was used. Use --help to see available commands.");
}
+25 -6
crates/feed/src/feed.rs
···
use crate::config::Config;
use crate::types::{DidDocument, FeedSkeletonParameters, Service};
use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery};
+
use anyhow::Error;
+
use sqlx::postgres::PgPoolOptions;
+
use sqlx::{Pool, Postgres};
+
use std::env;
use std::fmt::Debug;
use std::net::SocketAddr;
+
use std::sync::Arc;
use warp::Filter;
/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods.
···
&mut self,
name: impl AsRef<str>,
address: impl Into<SocketAddr> + Debug + Clone + Send,
-
) -> impl std::future::Future<Output = ()> + Send {
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
self.start_with_config(name, Config::load_env_config(), address)
}
···
name: impl AsRef<str>,
config: Config,
address: impl Into<SocketAddr> + Debug + Clone + Send,
-
) -> impl std::future::Future<Output = ()> + Send {
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
let handler = self.handler();
let address = address.clone();
let feed_name = name.as_ref().to_string();
async move {
let config = config;
+
let pool = PgPoolOptions::new()
+
.max_connections(5)
+
.connect(&env::var("XATA_POSTGRES_URL")?)
+
.await?;
+
let pool = Arc::new(pool);
+
let db_filter = warp::any().map(move || pool.clone());
let did_config = config.clone();
let did_json = warp::path(".well-known")
···
let describe_feed_generator = warp::path("xrpc")
.and(warp::path("app.rocksky.feed.describeFeedGenerator"))
.and(warp::get())
-
.and_then(move || describe_feed_generator(feed_name.clone()));
+
.and(db_filter.clone())
+
.and_then(move |_pool: Arc<Pool<Postgres>>| {
+
describe_feed_generator(feed_name.clone())
+
});
let get_feed_handler = handler.clone();
let get_feed_skeleton = warp::path("xrpc")
.and(warp::path("app.rocksky.feed.getFeedSkeleton"))
.and(warp::get())
.and(warp::query::<FeedSkeletonParameters>())
-
.and_then(move |query: FeedSkeletonParameters| {
-
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
-
});
+
.and(db_filter.clone())
+
.and_then(
+
move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| {
+
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
+
},
+
);
let api = did_json.or(describe_feed_generator).or(get_feed_skeleton);
···
tokio::join!(feed_server.run(address), firehose_listener)
.1
.expect("Couldn't await tasks");
+
+
Ok::<(), Error>(())
}
}
}
+4 -3
crates/feed/src/lib.rs
···
+
use anyhow::Error;
use std::{env, net::SocketAddr, sync::Arc};
-
use tokio::sync::Mutex;
use crate::{
···
}
}
-
pub async fn run() {
+
pub async fn run() -> Result<(), Error> {
let mut feed = RecentlyPlayedFeed {
handler: RecentlyPlayedFeedHandler {
scrobbles: Arc::new(Mutex::new(Vec::new())),
···
let addr_str = format!("{}:{}", host, port);
let addr: SocketAddr = addr_str.parse().expect("Invalid address format");
-
feed.start("RecentlyPlayed", addr).await;
+
feed.start("RecentlyPlayed", addr).await?;
+
Ok(())
}
+87
Cargo.lock
···
"windows-sys 0.59.0",
+
[[package]]
+
name = "moka"
+
version = "0.12.11"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077"
+
dependencies = [
+
"async-lock",
+
"crossbeam-channel",
+
"crossbeam-epoch",
+
"crossbeam-utils",
+
"equivalent",
+
"event-listener",
+
"futures-util",
+
"parking_lot",
+
"portable-atomic",
+
"rustc_version",
+
"smallvec",
+
"tagptr",
+
"uuid",
+
]
+
+
[[package]]
+
name = "multer"
+
version = "2.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
+
dependencies = [
+
"bytes",
+
"encoding_rs",
+
"futures-util",
+
"http 0.2.12",
+
"httparse",
+
"log",
+
"memchr",
+
"mime",
+
"spin",
+
"version_check",
+
]
+
+
[[package]]
+
name = "multibase"
+
version = "0.9.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
+
dependencies = [
+
"base-x",
+
"data-encoding",
+
"data-encoding-macro",
+
]
+
+
[[package]]
+
name = "multihash"
+
version = "0.19.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d"
+
dependencies = [
+
"core2",
+
"serde",
+
"unsigned-varint",
+
]
+
[[package]]
name = "nanoid"
version = "0.4.0"
···
"rand 0.8.5",
+
[[package]]
+
name = "nanorand"
+
version = "0.7.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
+
dependencies = [
+
"getrandom 0.2.16",
+
]
+
+
[[package]]
+
name = "native-tls"
+
version = "0.2.14"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
+
dependencies = [
+
"libc",
+
"log",
+
"openssl",
+
"openssl-probe",
+
"openssl-sys",
+
"schannel",
+
"security-framework 2.11.1",
+
"security-framework-sys",
+
"tempfile",
+
]
+
[[package]]
name = "nkeys"
version = "0.4.4"