back interdiff of round #1 and #0

feat(feed): implement feed service #8

open
opened by tsiry-sandratraina.com targeting main from feat/feed-generator
ERROR
crates/feed/Cargo.toml

Failed to calculate interdiff for this file.

ERROR
crates/feed/src/config.rs

Failed to calculate interdiff for this file.

ERROR
crates/feed/src/feed_handler.rs

Failed to calculate interdiff for this file.

ERROR
crates/feed/src/types.rs

Failed to calculate interdiff for this file.

ERROR
crates/rockskyd/Cargo.toml

Failed to calculate interdiff for this file.

REVERTED
crates/rockskyd/src/cmd/feed.rs
···
-
use anyhow::Error;
-
-
pub async fn serve() -> Result<(), Error> {
-
rocksky_feed::run().await;
-
Ok(())
-
}
···
ERROR
crates/rockskyd/src/cmd/mod.rs

Failed to calculate interdiff for this file.

ERROR
crates/rockskyd/src/main.rs

Failed to calculate interdiff for this file.

NEW
crates/feed/src/feed.rs
···
use crate::config::Config;
use crate::types::{DidDocument, FeedSkeletonParameters, Service};
use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery};
use std::fmt::Debug;
use std::net::SocketAddr;
use warp::Filter;
/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods.
···
&mut self,
name: impl AsRef<str>,
address: impl Into<SocketAddr> + Debug + Clone + Send,
-
) -> impl std::future::Future<Output = ()> + Send {
self.start_with_config(name, Config::load_env_config(), address)
}
···
name: impl AsRef<str>,
config: Config,
address: impl Into<SocketAddr> + Debug + Clone + Send,
-
) -> impl std::future::Future<Output = ()> + Send {
let handler = self.handler();
let address = address.clone();
let feed_name = name.as_ref().to_string();
async move {
let config = config;
let did_config = config.clone();
let did_json = warp::path(".well-known")
···
let describe_feed_generator = warp::path("xrpc")
.and(warp::path("app.rocksky.feed.describeFeedGenerator"))
.and(warp::get())
-
.and_then(move || describe_feed_generator(feed_name.clone()));
let get_feed_handler = handler.clone();
let get_feed_skeleton = warp::path("xrpc")
.and(warp::path("app.rocksky.feed.getFeedSkeleton"))
.and(warp::get())
.and(warp::query::<FeedSkeletonParameters>())
-
.and_then(move |query: FeedSkeletonParameters| {
-
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
-
});
let api = did_json.or(describe_feed_generator).or(get_feed_skeleton);
···
tokio::join!(feed_server.run(address), firehose_listener)
.1
.expect("Couldn't await tasks");
}
}
}
···
use crate::config::Config;
use crate::types::{DidDocument, FeedSkeletonParameters, Service};
use crate::{feed_handler::FeedHandler, types::FeedSkeletonQuery};
+
use anyhow::Error;
+
use sqlx::postgres::PgPoolOptions;
+
use sqlx::{Pool, Postgres};
+
use std::env;
use std::fmt::Debug;
use std::net::SocketAddr;
+
use std::sync::Arc;
use warp::Filter;
/// A `Feed` stores a `FeedHandler`, handles feed server endpoints & connects to the Firehose using the `start` methods.
···
&mut self,
name: impl AsRef<str>,
address: impl Into<SocketAddr> + Debug + Clone + Send,
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
self.start_with_config(name, Config::load_env_config(), address)
}
···
name: impl AsRef<str>,
config: Config,
address: impl Into<SocketAddr> + Debug + Clone + Send,
+
) -> impl std::future::Future<Output = Result<(), Error>> + Send {
let handler = self.handler();
let address = address.clone();
let feed_name = name.as_ref().to_string();
async move {
let config = config;
+
let pool = PgPoolOptions::new()
+
.max_connections(5)
+
.connect(&env::var("XATA_POSTGRES_URL")?)
+
.await?;
+
let pool = Arc::new(pool);
+
let db_filter = warp::any().map(move || pool.clone());
let did_config = config.clone();
let did_json = warp::path(".well-known")
···
let describe_feed_generator = warp::path("xrpc")
.and(warp::path("app.rocksky.feed.describeFeedGenerator"))
.and(warp::get())
+
.and(db_filter.clone())
+
.and_then(move |_pool: Arc<Pool<Postgres>>| {
+
describe_feed_generator(feed_name.clone())
+
});
let get_feed_handler = handler.clone();
let get_feed_skeleton = warp::path("xrpc")
.and(warp::path("app.rocksky.feed.getFeedSkeleton"))
.and(warp::get())
.and(warp::query::<FeedSkeletonParameters>())
+
.and(db_filter.clone())
+
.and_then(
+
move |query: FeedSkeletonParameters, _pool: Arc<Pool<Postgres>>| {
+
get_feed_skeleton::<Handler>(query.into(), get_feed_handler.clone())
+
},
+
);
let api = did_json.or(describe_feed_generator).or(get_feed_skeleton);
···
tokio::join!(feed_server.run(address), firehose_listener)
.1
.expect("Couldn't await tasks");
+
+
Ok::<(), Error>(())
}
}
}
NEW
crates/feed/src/lib.rs
···
use std::{env, net::SocketAddr, sync::Arc};
-
use tokio::sync::Mutex;
use crate::{
···
}
}
-
pub async fn run() {
let mut feed = RecentlyPlayedFeed {
handler: RecentlyPlayedFeedHandler {
scrobbles: Arc::new(Mutex::new(Vec::new())),
···
let addr_str = format!("{}:{}", host, port);
let addr: SocketAddr = addr_str.parse().expect("Invalid address format");
-
feed.start("RecentlyPlayed", addr).await;
}
···
+
use anyhow::Error;
use std::{env, net::SocketAddr, sync::Arc};
use tokio::sync::Mutex;
use crate::{
···
}
}
+
pub async fn run() -> Result<(), Error> {
let mut feed = RecentlyPlayedFeed {
handler: RecentlyPlayedFeedHandler {
scrobbles: Arc::new(Mutex::new(Vec::new())),
···
let addr_str = format!("{}:{}", host, port);
let addr: SocketAddr = addr_str.parse().expect("Invalid address format");
+
feed.start("RecentlyPlayed", addr).await?;
+
Ok(())
}
NEW
Cargo.lock
···
"windows-sys 0.59.0",
]
[[package]]
name = "nanoid"
version = "0.4.0"
···
"rand 0.8.5",
]
[[package]]
name = "nkeys"
version = "0.4.4"
···
"windows-sys 0.59.0",
]
+
[[package]]
+
name = "moka"
+
version = "0.12.11"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "8261cd88c312e0004c1d51baad2980c66528dfdb2bee62003e643a4d8f86b077"
+
dependencies = [
+
"async-lock",
+
"crossbeam-channel",
+
"crossbeam-epoch",
+
"crossbeam-utils",
+
"equivalent",
+
"event-listener",
+
"futures-util",
+
"parking_lot",
+
"portable-atomic",
+
"rustc_version",
+
"smallvec",
+
"tagptr",
+
"uuid",
+
]
+
+
[[package]]
+
name = "multer"
+
version = "2.1.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2"
+
dependencies = [
+
"bytes",
+
"encoding_rs",
+
"futures-util",
+
"http 0.2.12",
+
"httparse",
+
"log",
+
"memchr",
+
"mime",
+
"spin",
+
"version_check",
+
]
+
+
[[package]]
+
name = "multibase"
+
version = "0.9.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404"
+
dependencies = [
+
"base-x",
+
"data-encoding",
+
"data-encoding-macro",
+
]
+
+
[[package]]
+
name = "multihash"
+
version = "0.19.3"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d"
+
dependencies = [
+
"core2",
+
"serde",
+
"unsigned-varint",
+
]
+
[[package]]
name = "nanoid"
version = "0.4.0"
···
"rand 0.8.5",
]
+
[[package]]
+
name = "nanorand"
+
version = "0.7.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
+
dependencies = [
+
"getrandom 0.2.16",
+
]
+
+
[[package]]
+
name = "native-tls"
+
version = "0.2.14"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "87de3442987e9dbec73158d5c715e7ad9072fda936bb03d19d7fa10e00520f0e"
+
dependencies = [
+
"libc",
+
"log",
+
"openssl",
+
"openssl-probe",
+
"openssl-sys",
+
"schannel",
+
"security-framework 2.11.1",
+
"security-framework-sys",
+
"tempfile",
+
]
+
[[package]]
name = "nkeys"
version = "0.4.4"