test #2

closed
opened by mia.pds.parakeet.at targeting main from push-toprokrqrnrk
Changed files
+539 -29
lexica
src
app_bsky
parakeet
parakeet-db
src
+1
lexica/src/app_bsky/mod.rs
···
pub mod graph;
pub mod labeler;
pub mod richtext;
+
pub mod unspecced;
#[derive(Clone, Default, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
+33
lexica/src/app_bsky/unspecced.rs
···
+
use crate::app_bsky::feed::{BlockedAuthor, PostView};
+
use serde::Serialize;
+
+
#[derive(Clone, Debug, Serialize)]
+
pub struct ThreadV2Item {
+
pub uri: String,
+
pub depth: i32,
+
pub value: ThreadV2ItemType,
+
}
+
+
#[derive(Clone, Debug, Serialize)]
+
#[serde(tag = "$type")]
+
pub enum ThreadV2ItemType {
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemPost")]
+
Post(ThreadItemPost),
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNoUnauthenticated")]
+
NoUnauthenticated {},
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemNotFound")]
+
NotFound {},
+
#[serde(rename = "app.bsky.unspecced.defs#threadItemBlocked")]
+
Blocked { author: BlockedAuthor },
+
}
+
+
#[derive(Clone, Debug, Serialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct ThreadItemPost {
+
pub post: PostView,
+
pub more_parents: bool,
+
pub more_replies: i32,
+
pub op_thread: bool,
+
pub hidden_by_threadgate: bool,
+
pub muted_by_viewer: bool,
+
}
+95 -1
parakeet/src/db.rs
···
use diesel::prelude::*;
-
use diesel::sql_types::{Array, Bool, Nullable, Text};
+
use diesel::sql_types::{Array, Bool, Integer, Nullable, Text};
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use parakeet_db::{schema, types};
+
use parakeet_db::models::TextArray;
pub async fn get_actor_status(
conn: &mut AsyncPgConnection,
···
.await
.optional()
}
+
+
#[derive(Debug, QueryableByName)]
+
#[diesel(check_for_backend(diesel::pg::Pg))]
+
#[allow(unused)]
+
pub struct ThreadItem {
+
#[diesel(sql_type = Text)]
+
pub at_uri: String,
+
#[diesel(sql_type = Nullable<Text>)]
+
pub parent_uri: Option<String>,
+
#[diesel(sql_type = Nullable<Text>)]
+
pub root_uri: Option<String>,
+
#[diesel(sql_type = Integer)]
+
pub depth: i32,
+
}
+
+
pub async fn get_thread_children(
+
conn: &mut AsyncPgConnection,
+
uri: &str,
+
depth: i32,
+
) -> QueryResult<Vec<ThreadItem>> {
+
diesel::sql_query(include_str!("sql/thread.sql"))
+
.bind::<Text, _>(uri)
+
.bind::<Integer, _>(depth)
+
.load(conn)
+
.await
+
}
+
+
pub async fn get_thread_children_branching(
+
conn: &mut AsyncPgConnection,
+
uri: &str,
+
depth: i32,
+
branching_factor: i32,
+
) -> QueryResult<Vec<ThreadItem>> {
+
diesel::sql_query(include_str!("sql/thread_branching.sql"))
+
.bind::<Text, _>(uri)
+
.bind::<Integer, _>(depth)
+
.bind::<Integer, _>(branching_factor)
+
.load(conn)
+
.await
+
}
+
+
#[derive(Debug, QueryableByName)]
+
#[diesel(check_for_backend(diesel::pg::Pg))]
+
pub struct HiddenThreadChildItem {
+
#[diesel(sql_type = Text)]
+
pub at_uri: String,
+
}
+
+
pub async fn get_thread_children_hidden(
+
conn: &mut AsyncPgConnection,
+
uri: &str,
+
root: &str,
+
) -> QueryResult<Vec<HiddenThreadChildItem>> {
+
diesel::sql_query(include_str!("sql/thread_v2_hidden_children.sql"))
+
.bind::<Text, _>(uri)
+
.bind::<Text, _>(root)
+
.load(conn)
+
.await
+
}
+
+
pub async fn get_thread_parents(
+
conn: &mut AsyncPgConnection,
+
uri: &str,
+
height: i32,
+
) -> QueryResult<Vec<ThreadItem>> {
+
diesel::sql_query(include_str!("sql/thread_parent.sql"))
+
.bind::<Text, _>(uri)
+
.bind::<Integer, _>(height)
+
.load(conn)
+
.await
+
}
+
+
pub async fn get_root_post(conn: &mut AsyncPgConnection, uri: &str) -> QueryResult<Option<String>> {
+
schema::posts::table
+
.select(schema::posts::root_uri)
+
.find(&uri)
+
.get_result(conn)
+
.await
+
.optional()
+
.map(|v| v.flatten())
+
}
+
+
pub async fn get_threadgate_hiddens(
+
conn: &mut AsyncPgConnection,
+
uri: &str,
+
) -> QueryResult<Option<TextArray>> {
+
schema::threadgates::table
+
.select(schema::threadgates::hidden_replies)
+
.find(&uri)
+
.get_result(conn)
+
.await
+
.optional()
+
}
+1 -3
parakeet/src/hydration/profile.rs
···
.followed
.map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject));
-
let blocking = data.list_block.or(data
-
.blocking
-
.map(|rkey| format!("at://{}/app.bsky.graph.block/{rkey}", data.did)));
+
let blocking = data.list_block.or(data.blocking);
ProfileViewerState {
muted: data.muting.unwrap_or_default(),
+1 -1
parakeet/src/sql/thread.sql
···
-
with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth
+
with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth
from posts
where parent_uri = $1 and violates_threadgate=FALSE
union all
+13
parakeet/src/sql/thread_branching.sql
···
+
with recursive thread as (select at_uri, parent_uri, root_uri, 1 as depth
+
from posts
+
where parent_uri = $1
+
and violates_threadgate = FALSE
+
union all
+
(select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
+
from posts p
+
join thread on p.parent_uri = thread.at_uri
+
where thread.depth <= $2
+
and violates_threadgate = FALSE
+
LIMIT $3))
+
select *
+
from thread;
+6
parakeet/src/sql/thread_v2_hidden_children.sql
···
+
select at_uri
+
from posts
+
where parent_uri = $1
+
and at_uri = any (select unnest(hidden_replies)
+
from threadgates
+
where post_uri = $2)
+2 -24
parakeet/src/xrpc/app_bsky/feed/posts.rs
···
pub threadgate: Option<ThreadgateView>,
}
-
#[derive(Debug, QueryableByName)]
-
#[diesel(check_for_backend(diesel::pg::Pg))]
-
struct ThreadItem {
-
#[diesel(sql_type = diesel::sql_types::Text)]
-
at_uri: String,
-
#[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
-
parent_uri: Option<String>,
-
// #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)]
-
// root_uri: Option<String>,
-
#[diesel(sql_type = diesel::sql_types::Integer)]
-
depth: i32,
-
}
-
pub async fn get_post_thread(
State(state): State<GlobalState>,
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
···
}
}
-
let replies = diesel::sql_query(include_str!("../../../sql/thread.sql"))
-
.bind::<diesel::sql_types::Text, _>(&uri)
-
.bind::<diesel::sql_types::Integer, _>(depth as i32)
-
.load::<ThreadItem>(&mut conn)
-
.await?;
-
-
let parents = diesel::sql_query(include_str!("../../../sql/thread_parent.sql"))
-
.bind::<diesel::sql_types::Text, _>(&uri)
-
.bind::<diesel::sql_types::Integer, _>(parent_height as i32)
-
.load::<ThreadItem>(&mut conn)
-
.await?;
+
let replies = crate::db::get_thread_children(&mut conn, &uri, depth as i32).await?;
+
let parents = crate::db::get_thread_parents(&mut conn, &uri, parent_height as i32).await?;
let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect();
let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect();
+3
parakeet/src/xrpc/app_bsky/mod.rs
···
mod feed;
mod graph;
mod labeler;
+
mod unspecced;
#[rustfmt::skip]
pub fn routes() -> Router<crate::GlobalState> {
···
// TODO: app.bsky.notification.putActivitySubscriptions
// TODO: app.bsky.notification.putPreferences
// TODO: app.bsky.notification.putPreferencesV2
+
.route("/app.bsky.unspecced.getPostThreadV2", get(unspecced::thread_v2::get_post_thread_v2))
+
.route("/app.bsky.unspecced.getPostThreadOtherV2", get(unspecced::thread_v2::get_post_thread_other_v2))
}
async fn not_implemented() -> axum::http::StatusCode {
+1
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
···
+
pub mod thread_v2;
+382
parakeet/src/xrpc/app_bsky/unspecced/thread_v2.rs
···
+
use crate::db::ThreadItem;
+
use crate::hydration::StatefulHydrator;
+
use crate::xrpc::error::{Error, XrpcResult};
+
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
+
use crate::xrpc::normalise_at_uri;
+
use crate::GlobalState;
+
use axum::extract::{Query, State};
+
use axum::Json;
+
use itertools::Itertools;
+
use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView};
+
use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType};
+
use serde::{Deserialize, Serialize};
+
use std::cmp::Ordering;
+
use std::collections::{HashMap, HashSet};
+
+
const THREAD_PARENTS: usize = 50;
+
const DEFAULT_BRANCHING: u32 = 10;
+
const DEFAULT_DEPTH: u32 = 6;
+
+
#[derive(Copy, Clone, Debug, Default, Deserialize)]
+
#[serde(rename_all = "lowercase")]
+
pub enum PostThreadSort {
+
Newest,
+
#[default]
+
Oldest,
+
Top,
+
}
+
+
#[derive(Debug, Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct GetPostThreadV2Req {
+
pub anchor: String,
+
pub above: Option<bool>,
+
pub below: Option<u32>,
+
pub branching_factor: Option<u32>,
+
#[serde(default)]
+
pub sort: PostThreadSort,
+
}
+
+
#[derive(Debug, Serialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct GetPostThreadV2Res {
+
pub thread: Vec<ThreadV2Item>,
+
#[serde(skip_serializing_if = "Option::is_none")]
+
pub threadgate: Option<ThreadgateView>,
+
pub has_other_replies: bool,
+
}
+
+
pub async fn get_post_thread_v2(
+
State(state): State<GlobalState>,
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
+
maybe_auth: Option<AtpAuth>,
+
Query(query): Query<GetPostThreadV2Req>,
+
) -> XrpcResult<Json<GetPostThreadV2Res>> {
+
let mut conn = state.pool.get().await?;
+
let maybe_did = maybe_auth.clone().map(|v| v.0);
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
+
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
+
let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32;
+
let branching_factor = query
+
.branching_factor
+
.unwrap_or(DEFAULT_BRANCHING)
+
.clamp(0, 100) as i32;
+
+
let anchor = hyd
+
.hydrate_post(uri.clone())
+
.await
+
.ok_or(Error::not_found())?;
+
+
if let Some(v) = &anchor.author.viewer {
+
if v.blocked_by || v.blocking.is_some() {
+
let block = ThreadV2ItemType::Blocked {
+
author: BlockedAuthor {
+
did: anchor.author.did,
+
viewer: anchor.author.viewer,
+
},
+
};
+
+
return Ok(Json(GetPostThreadV2Res {
+
thread: vec![ThreadV2Item {
+
uri,
+
depth: 0,
+
value: block,
+
}],
+
threadgate: anchor.threadgate,
+
has_other_replies: false,
+
}));
+
}
+
}
+
+
// get the root post URI (if there is one) and return its author's DID.
+
let root_uri = crate::db::get_root_post(&mut conn, &uri)
+
.await?
+
.unwrap_or(uri.clone());
+
let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0];
+
+
let replies =
+
crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1)
+
.await?;
+
let reply_uris = replies
+
.iter()
+
.map(|item| item.at_uri.clone())
+
.collect::<Vec<_>>();
+
+
// bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents.
+
let parents = match query.above.unwrap_or(true) {
+
true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?,
+
false => vec![],
+
};
+
let parent_uris = parents
+
.iter()
+
.map(|item| item.at_uri.clone())
+
.collect::<Vec<_>>();
+
+
let (mut replies_hyd, mut parents_hyd) = tokio::join!(
+
hyd.hydrate_posts(reply_uris),
+
hyd.hydrate_posts(parent_uris),
+
);
+
+
let threadgate = anchor.threadgate.clone();
+
let hidden: HashSet<_, std::hash::RandomState> = match &threadgate {
+
Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?,
+
None => None,
+
}
+
.map(|hiddens| HashSet::from_iter(Vec::from(hiddens)))
+
.unwrap_or_default();
+
+
let root_has_more = parents.len() > THREAD_PARENTS;
+
let mut is_op_thread = true;
+
+
let mut thread = Vec::with_capacity(1 + replies.len() + parents.len());
+
+
thread.extend(
+
parents
+
.into_iter()
+
.tail(THREAD_PARENTS)
+
.enumerate()
+
.map(|(idx, item)| {
+
let value = parents_hyd
+
.remove(&item.at_uri)
+
.map(|post| {
+
if let Some(v) = &post.author.viewer {
+
if v.blocked_by || v.blocking.is_some() {
+
return ThreadV2ItemType::Blocked {
+
author: BlockedAuthor {
+
did: post.author.did,
+
viewer: post.author.viewer,
+
},
+
};
+
}
+
}
+
+
let op_thread = (is_op_thread
+
|| item.root_uri.is_none() && item.parent_uri.is_none())
+
&& post.author.did == root_did;
+
+
ThreadV2ItemType::Post(ThreadItemPost {
+
post,
+
more_parents: idx == 0 && root_has_more,
+
more_replies: 0,
+
op_thread,
+
hidden_by_threadgate: false,
+
muted_by_viewer: false,
+
})
+
})
+
.unwrap_or(ThreadV2ItemType::NotFound {});
+
+
ThreadV2Item {
+
uri: item.at_uri,
+
depth: -item.depth - 1,
+
value,
+
}
+
}),
+
);
+
+
is_op_thread = is_op_thread && anchor.author.did == root_did;
+
thread.push(ThreadV2Item {
+
uri: uri.clone(),
+
depth: 0,
+
value: ThreadV2ItemType::Post(ThreadItemPost {
+
post: anchor,
+
more_parents: false,
+
more_replies: 0,
+
op_thread: is_op_thread,
+
hidden_by_threadgate: false,
+
muted_by_viewer: false,
+
}),
+
});
+
+
let mut replies_grouped = replies
+
.into_iter()
+
.into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default());
+
+
// start with the anchor
+
let (children, has_other_replies) = build_thread_children(
+
&mut replies_grouped,
+
&mut replies_hyd,
+
&hidden,
+
&uri,
+
is_op_thread,
+
1,
+
&BuildThreadChildrenOpts {
+
root_did,
+
sort: query.sort,
+
maybe_did: &maybe_did,
+
max_depth: depth,
+
},
+
);
+
thread.extend(children);
+
+
Ok(Json(GetPostThreadV2Res {
+
thread,
+
threadgate,
+
has_other_replies,
+
}))
+
}
+
+
#[derive(Debug, Deserialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct GetPostThreadOtherV2Req {
+
pub anchor: String,
+
}
+
+
#[derive(Debug, Serialize)]
+
#[serde(rename_all = "camelCase")]
+
pub struct GetPostThreadOtherV2Res {
+
pub thread: Vec<ThreadV2Item>,
+
}
+
+
pub async fn get_post_thread_other_v2(
+
State(state): State<GlobalState>,
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
+
maybe_auth: Option<AtpAuth>,
+
Query(query): Query<GetPostThreadOtherV2Req>,
+
) -> XrpcResult<Json<GetPostThreadOtherV2Res>> {
+
let mut conn = state.pool.get().await?;
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
+
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
+
+
let root = crate::db::get_root_post(&mut conn, &uri)
+
.await?
+
.unwrap_or(uri.clone());
+
+
// this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE
+
let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?;
+
let reply_uris = replies
+
.into_iter()
+
.map(|item| item.at_uri)
+
.collect::<Vec<_>>();
+
let thread = hyd
+
.hydrate_posts(reply_uris)
+
.await
+
.into_iter()
+
.filter(|(_, post)| match &post.author.viewer {
+
Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false,
+
_ => true,
+
})
+
.map(|(uri, post)| {
+
let post = ThreadItemPost {
+
post,
+
more_parents: false,
+
more_replies: 0,
+
op_thread: false,
+
hidden_by_threadgate: true,
+
muted_by_viewer: false,
+
};
+
+
ThreadV2Item {
+
uri,
+
depth: 1,
+
value: ThreadV2ItemType::Post(post),
+
}
+
})
+
.collect();
+
+
Ok(Json(GetPostThreadOtherV2Res { thread }))
+
}
+
+
#[derive(Debug)]
+
struct BuildThreadChildrenOpts<'a> {
+
root_did: &'a str,
+
sort: PostThreadSort,
+
maybe_did: &'a Option<String>,
+
max_depth: i32,
+
}
+
+
fn build_thread_children(
+
grouped_replies: &mut HashMap<String, Vec<ThreadItem>>,
+
replies_hyd: &mut HashMap<String, PostView>,
+
hidden: &HashSet<String>,
+
parent: &str,
+
is_op_thread: bool,
+
depth: i32,
+
opts: &BuildThreadChildrenOpts,
+
) -> (Vec<ThreadV2Item>, bool) {
+
let mut has_other_replies = false;
+
+
let Some(replies) = grouped_replies.remove(parent) else {
+
return (Vec::default(), has_other_replies);
+
};
+
+
let replies = replies
+
.into_iter()
+
.filter_map(|item| replies_hyd.remove(&item.at_uri))
+
.sorted_by(sort_replies(&opts.sort));
+
+
let mut out = Vec::new();
+
+
for post in replies {
+
let reply_count = grouped_replies
+
.get(&post.uri)
+
.map(|v| v.len())
+
.unwrap_or_default();
+
let at_max = depth == opts.max_depth;
+
let more_replies = if at_max { reply_count } else { 0 };
+
let op_thread = is_op_thread && post.author.did == opts.root_did;
+
+
// shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies...
+
if let Some(v) = &post.author.viewer {
+
if v.blocked_by || v.blocking.is_some() {
+
continue;
+
}
+
}
+
+
// check if the post is hidden AND we're NOT the author (hidden posts still show for their author)
+
if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) {
+
// post is hidden - do not ~pass go~ push to the thread.
+
if depth == 1 {
+
has_other_replies = true;
+
}
+
continue;
+
}
+
+
let uri = post.uri.clone();
+
out.push(ThreadV2Item {
+
uri: post.uri.clone(),
+
depth,
+
value: ThreadV2ItemType::Post(ThreadItemPost {
+
post,
+
more_parents: false,
+
more_replies: more_replies as i32,
+
op_thread,
+
hidden_by_threadgate: false,
+
muted_by_viewer: false,
+
}),
+
});
+
+
if !at_max {
+
// we don't care about has_other_replies when recursing
+
let (children, _) = build_thread_children(
+
grouped_replies,
+
replies_hyd,
+
hidden,
+
&uri,
+
op_thread,
+
depth + 1,
+
opts,
+
);
+
+
out.extend(children);
+
}
+
}
+
+
(out, has_other_replies)
+
}
+
+
fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> {
+
move |a: &PostView, b: &PostView| match sort {
+
PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at),
+
PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at),
+
PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count),
+
}
+
}
+
+
fn did_is_cur(cur: &Option<String>, did: &String) -> bool {
+
match cur {
+
Some(cur) => did == cur,
+
None => false,
+
}
+
}
+1
parakeet-db/src/models.rs
···
pub sort_at: DateTime<Utc>,
}
+
pub use not_null_vec::TextArray;
mod not_null_vec {
use diesel::deserialize::FromSql;
use diesel::pg::Pg;