From 2103b7d92bc73a2736aa8208eeb13cd70c3a4b22 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 28 Sep 2025 17:15:45 +0100 Subject: [PATCH] postgates: trust provided timestamp only when backfilling --- consumer/src/backfill/repo.rs | 2 +- consumer/src/db/record.rs | 57 ++++++++++++++++++++++++++--------- consumer/src/indexer/mod.rs | 2 +- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/consumer/src/backfill/repo.rs b/consumer/src/backfill/repo.rs index 5260ad1..ab4897d 100644 --- a/consumer/src/backfill/repo.rs +++ b/consumer/src/backfill/repo.rs @@ -144,7 +144,7 @@ async fn record_index( db::maintain_self_labels(t, did, Some(cid), &at_uri, labels).await?; } if let Some(embed) = rec.embed.clone().and_then(|embed| embed.into_bsky()) { - db::post_embed_insert(t, &at_uri, embed, rec.created_at).await?; + db::post_embed_insert(t, &at_uri, embed, rec.created_at, true).await?; } deltas.incr(did, AggregateType::ProfilePost).await; diff --git a/consumer/src/db/record.rs b/consumer/src/db/record.rs index a7748d8..b2ac46a 100644 --- a/consumer/src/db/record.rs +++ b/consumer/src/db/record.rs @@ -5,6 +5,7 @@ use chrono::prelude::*; use deadpool_postgres::GenericClient; use ipld_core::cid::Cid; use lexica::community_lexicon::bookmarks::Bookmark; +use std::collections::HashSet; pub async fn record_upsert( conn: &mut C, @@ -317,6 +318,7 @@ pub async fn post_insert( repo: &str, cid: Cid, rec: AppBskyFeedPost, + is_backfill: bool, ) -> PgExecResult { let cid = cid.to_string(); let record = serde_json::to_value(&rec).unwrap(); @@ -350,7 +352,7 @@ pub async fn post_insert( .await?; if let Some(embed) = rec.embed.and_then(|embed| embed.into_bsky()) { - post_embed_insert(conn, at_uri, embed, rec.created_at).await?; + post_embed_insert(conn, at_uri, embed, rec.created_at, is_backfill).await?; } Ok(count) @@ -380,16 +382,17 @@ pub async fn post_embed_insert( post: &str, embed: AppBskyEmbed, created_at: DateTime, + is_backfill: bool, ) -> PgExecResult { match embed { AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await, AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await, AppBskyEmbed::External(embed) => post_embed_external_insert(conn, post, embed).await, AppBskyEmbed::Record(embed) => { - post_embed_record_insert(conn, post, embed, created_at).await + post_embed_record_insert(conn, post, embed, created_at, is_backfill).await } AppBskyEmbed::RecordWithMedia(embed) => { - post_embed_record_insert(conn, post, embed.record, created_at).await?; + post_embed_record_insert(conn, post, embed.record, created_at, is_backfill).await?; match *embed.media { AppBskyEmbed::Images(embed) => post_embed_image_insert(conn, post, embed).await, AppBskyEmbed::Video(embed) => post_embed_video_insert(conn, post, embed).await, @@ -476,27 +479,38 @@ async fn post_embed_external_insert( ).await } +const PG_DISABLE_RULE: &str = "app.bsky.feed.postgate#disableRule"; async fn post_embed_record_insert( conn: &mut C, post: &str, embed: AppBskyEmbedRecord, post_created_at: DateTime, + is_backfill: bool, ) -> PgExecResult { // strip "at://" then break into parts by '/' let parts = embed.record.uri[5..].split('/').collect::>(); let detached = if parts[1] == "app.bsky.feed.post" { - let postgate_effective: Option> = conn - .query_opt( - "SELECT created_at FROM postgates WHERE post_uri=$1", - &[&post], - ) - .await? - .map(|v| v.get(0)); - - postgate_effective - .map(|v| Utc::now().min(post_created_at) > v) - .unwrap_or_default() + let pg_data = postgate_get(conn, post).await?; + + if let Some((effective, detached, rules)) = pg_data { + let detached: HashSet = HashSet::from_iter(detached); + let rules: HashSet = HashSet::from_iter(rules); + let compare_date = match is_backfill { + true => post_created_at, + false => Utc::now(), + }; + + if detached.contains(post) { + true + } else if rules.contains(PG_DISABLE_RULE) && compare_date > effective { + true + } else { + false + } + } else { + false + } } else { false }; @@ -507,6 +521,21 @@ async fn post_embed_record_insert( ).await } +async fn postgate_get( + conn: &mut C, + post: &str, +) -> PgOptResult<(DateTime, Vec, Vec)> { + let res = conn + .query_opt( + "SELECT created_at, detached, rules FROM postgates WHERE post_uri=$1", + &[&post], + ) + .await? + .map(|v| (v.get(0), v.get(1), v.get(2))); + + Ok(res) +} + pub async fn postgate_upsert( conn: &mut C, at_uri: &str, diff --git a/consumer/src/indexer/mod.rs b/consumer/src/indexer/mod.rs index b0ee18b..4a442bb 100644 --- a/consumer/src/indexer/mod.rs +++ b/consumer/src/indexer/mod.rs @@ -625,7 +625,7 @@ pub async fn index_op( }); let labels = record.labels.clone(); - db::post_insert(conn, at_uri, repo, cid, record).await?; + db::post_insert(conn, at_uri, repo, cid, record, false).await?; if let Some(labels) = labels { db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?; } -- 2.43.0 From cc0eb694be4c944bfa6974537cd241b03e1bfc63 Mon Sep 17 00:00:00 2001 From: Mia Date: Sat, 4 Oct 2025 17:49:49 +0100 Subject: [PATCH] store mentions and tags from facets in DB --- consumer/src/db/copy.rs | 11 +++++-- consumer/src/db/record.rs | 12 +++++-- consumer/src/db/sql/post_insert.sql | 4 +-- consumer/src/utils.rs | 31 +++++++++++++++++++ .../2025-09-27-171241_post-tweaks/down.sql | 2 ++ .../2025-09-27-171241_post-tweaks/up.sql | 2 ++ parakeet-db/src/schema.rs | 1 + 7 files changed, 56 insertions(+), 7 deletions(-) create mode 100644 migrations/2025-09-27-171241_post-tweaks/down.sql create mode 100644 migrations/2025-09-27-171241_post-tweaks/up.sql diff --git a/consumer/src/db/copy.rs b/consumer/src/db/copy.rs index 6806570..1a70d04 100644 --- a/consumer/src/db/copy.rs +++ b/consumer/src/db/copy.rs @@ -1,6 +1,6 @@ use super::PgExecResult; use crate::indexer::records; -use crate::utils::strongref_to_parts; +use crate::utils::{extract_mentions_and_tags, merge_tags, strongref_to_parts}; use chrono::prelude::*; use deadpool_postgres::Transaction; use futures::pin_mut; @@ -119,7 +119,7 @@ pub async fn copy_reposts( .await } -const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, created_at) FROM STDIN (FORMAT binary)"; +const POST_STMT: &str = "COPY posts_tmp (at_uri, cid, did, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, root_cid, embed, embed_subtype, mentions, created_at) FROM STDIN (FORMAT binary)"; const POST_TYPES: &[Type] = &[ Type::TEXT, Type::TEXT, @@ -135,6 +135,7 @@ const POST_TYPES: &[Type] = &[ Type::TEXT, Type::TEXT, Type::TEXT, + Type::TEXT_ARRAY, Type::TIMESTAMP, ]; pub async fn copy_posts( @@ -159,12 +160,15 @@ pub async fn copy_posts( for (at_uri, cid, post) in data { let record = serde_json::to_value(&post).unwrap(); + let (mentions, tags) = post.facets.as_ref().map(|v| extract_mentions_and_tags(&v)).unzip(); let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); let embed = post.embed.as_ref().map(|v| v.as_str()); let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); let (parent_uri, parent_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.parent)); let (root_uri, root_cid) = strongref_to_parts(post.reply.as_ref().map(|v| &v.root)); + let tags = merge_tags(tags, post.tags); + let writer = writer.as_mut(); writer .write(&[ @@ -175,13 +179,14 @@ pub async fn copy_posts( &post.text, &facets, &post.langs.unwrap_or_default(), - &post.tags.unwrap_or_default(), + &tags, &parent_uri, &parent_cid, &root_uri, &root_cid, &embed, &embed_subtype, + &mentions, &post.created_at.naive_utc(), ]) .await?; diff --git a/consumer/src/db/record.rs b/consumer/src/db/record.rs index b2ac46a..372c4be 100644 --- a/consumer/src/db/record.rs +++ b/consumer/src/db/record.rs @@ -1,6 +1,6 @@ use super::{PgExecResult, PgOptResult, PgResult}; use crate::indexer::records::*; -use crate::utils::{blob_ref, strongref_to_parts}; +use crate::utils::{blob_ref, extract_mentions_and_tags, merge_tags, strongref_to_parts}; use chrono::prelude::*; use deadpool_postgres::GenericClient; use ipld_core::cid::Cid; @@ -322,12 +322,19 @@ pub async fn post_insert( ) -> PgExecResult { let cid = cid.to_string(); let record = serde_json::to_value(&rec).unwrap(); + let (mentions, tags) = rec + .facets + .as_ref() + .map(|v| extract_mentions_and_tags(&v)) + .unzip(); let facets = rec.facets.and_then(|v| serde_json::to_value(v).ok()); let (parent_uri, parent_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.parent)); let (root_uri, root_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.root)); let embed = rec.embed.as_ref().map(|v| v.as_str()); let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype()); + let tags = merge_tags(tags, rec.tags); + let count = conn .execute( include_str!("sql/post_insert.sql"), @@ -339,13 +346,14 @@ pub async fn post_insert( &rec.text, &facets, &rec.langs.unwrap_or_default(), - &rec.tags.unwrap_or_default(), + &tags, &parent_uri, &parent_cid, &root_uri, &root_cid, &embed, &embed_subtype, + &mentions, &rec.created_at, ], ) diff --git a/consumer/src/db/sql/post_insert.sql b/consumer/src/db/sql/post_insert.sql index c31aa3d..6e81762 100644 --- a/consumer/src/db/sql/post_insert.sql +++ b/consumer/src/db/sql/post_insert.sql @@ -1,4 +1,4 @@ INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, - root_cid, embed, embed_subtype, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) + root_cid, embed, embed_subtype, mentions, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) ON CONFLICT DO NOTHING \ No newline at end of file diff --git a/consumer/src/utils.rs b/consumer/src/utils.rs index 1e10a29..b5571d7 100644 --- a/consumer/src/utils.rs +++ b/consumer/src/utils.rs @@ -1,3 +1,4 @@ +use lexica::app_bsky::richtext::{Facet, FacetMain, FacetOuter}; use lexica::{Blob, StrongRef}; use serde::{Deserialize, Deserializer}; @@ -39,3 +40,33 @@ pub fn at_uri_is_by(uri: &str, did: &str) -> bool { did == split_aturi[2] } + +pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec, Vec) { + let (mentions, tags) = from + .into_iter() + .flat_map(|v| { + v.features.iter().map(|facet| match facet { + FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), + FacetOuter::Bsky(Facet::Tag { tag }) => (None, Some(tag)), + _ => (None, None), + }) + }) + .unzip::<_, _, Vec<_>, Vec<_>>(); + + let mentions = mentions.into_iter().flatten().cloned().collect(); + let tags = tags.into_iter().flatten().cloned().collect(); + + (mentions, tags) +} + +pub fn merge_tags(t1: Option>, t2: Option>) -> Vec { + match (t1, t2) { + (Some(t1), None) => t1, + (None, Some(t2)) => t2, + (Some(mut t1), Some(t2)) => { + t1.extend(t2); + t1 + } + _ => Vec::default(), + } +} diff --git a/migrations/2025-09-27-171241_post-tweaks/down.sql b/migrations/2025-09-27-171241_post-tweaks/down.sql new file mode 100644 index 0000000..bfcf265 --- /dev/null +++ b/migrations/2025-09-27-171241_post-tweaks/down.sql @@ -0,0 +1,2 @@ +alter table posts + drop column mentions; \ No newline at end of file diff --git a/migrations/2025-09-27-171241_post-tweaks/up.sql b/migrations/2025-09-27-171241_post-tweaks/up.sql new file mode 100644 index 0000000..cdea3a9 --- /dev/null +++ b/migrations/2025-09-27-171241_post-tweaks/up.sql @@ -0,0 +1,2 @@ +alter table posts + add column mentions text[]; \ No newline at end of file diff --git a/parakeet-db/src/schema.rs b/parakeet-db/src/schema.rs index 7c9c722..2e88888 100644 --- a/parakeet-db/src/schema.rs +++ b/parakeet-db/src/schema.rs @@ -284,6 +284,7 @@ diesel::table! { embed_subtype -> Nullable, created_at -> Timestamptz, indexed_at -> Timestamp, + mentions -> Nullable>>, } } -- 2.43.0 From bc019eb86501a0b2417c4ca838ee92078c1a10f7 Mon Sep 17 00:00:00 2001 From: Mia Date: Sat, 4 Oct 2025 20:05:42 +0100 Subject: [PATCH] clippy --- consumer/src/db/copy.rs | 6 +++++- consumer/src/db/record.rs | 10 ++-------- consumer/src/utils.rs | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/consumer/src/db/copy.rs b/consumer/src/db/copy.rs index 1a70d04..7c2a9a3 100644 --- a/consumer/src/db/copy.rs +++ b/consumer/src/db/copy.rs @@ -160,7 +160,11 @@ pub async fn copy_posts( for (at_uri, cid, post) in data { let record = serde_json::to_value(&post).unwrap(); - let (mentions, tags) = post.facets.as_ref().map(|v| extract_mentions_and_tags(&v)).unzip(); + let (mentions, tags) = post + .facets + .as_ref() + .map(|v| extract_mentions_and_tags(v)) + .unzip(); let facets = post.facets.and_then(|v| serde_json::to_value(v).ok()); let embed = post.embed.as_ref().map(|v| v.as_str()); let embed_subtype = post.embed.as_ref().and_then(|v| v.subtype()); diff --git a/consumer/src/db/record.rs b/consumer/src/db/record.rs index 372c4be..5bc7e92 100644 --- a/consumer/src/db/record.rs +++ b/consumer/src/db/record.rs @@ -325,7 +325,7 @@ pub async fn post_insert( let (mentions, tags) = rec .facets .as_ref() - .map(|v| extract_mentions_and_tags(&v)) + .map(|v| extract_mentions_and_tags(v)) .unzip(); let facets = rec.facets.and_then(|v| serde_json::to_value(v).ok()); let (parent_uri, parent_cid) = strongref_to_parts(rec.reply.as_ref().map(|v| &v.parent)); @@ -509,13 +509,7 @@ async fn post_embed_record_insert( false => Utc::now(), }; - if detached.contains(post) { - true - } else if rules.contains(PG_DISABLE_RULE) && compare_date > effective { - true - } else { - false - } + detached.contains(post) || (rules.contains(PG_DISABLE_RULE) && compare_date > effective) } else { false } diff --git a/consumer/src/utils.rs b/consumer/src/utils.rs index b5571d7..4b95058 100644 --- a/consumer/src/utils.rs +++ b/consumer/src/utils.rs @@ -43,7 +43,7 @@ pub fn at_uri_is_by(uri: &str, did: &str) -> bool { pub fn extract_mentions_and_tags(from: &[FacetMain]) -> (Vec, Vec) { let (mentions, tags) = from - .into_iter() + .iter() .flat_map(|v| { v.features.iter().map(|facet| match facet { FacetOuter::Bsky(Facet::Mention { did }) => (Some(did), None), -- 2.43.0 From 20e03838eb122e9535163f065c806d9f4bb7cc86 Mon Sep 17 00:00:00 2001 From: Mia Date: Sat, 4 Oct 2025 20:12:25 +0100 Subject: [PATCH] store if a post violates a threadgate --- consumer/src/db/copy.rs | 26 ++++ consumer/src/db/record.rs | 111 ++++++++++++++++++ consumer/src/db/sql/post_insert.sql | 4 +- .../2025-09-27-171241_post-tweaks/down.sql | 3 +- .../2025-09-27-171241_post-tweaks/up.sql | 3 +- parakeet-db/src/schema.rs | 1 + 6 files changed, 144 insertions(+), 4 deletions(-) diff --git a/consumer/src/db/copy.rs b/consumer/src/db/copy.rs index 7c2a9a3..d51ac2c 100644 --- a/consumer/src/db/copy.rs +++ b/consumer/src/db/copy.rs @@ -198,6 +198,32 @@ pub async fn copy_posts( writer.finish().await?; + let threadgated: Vec<(String, String, DateTime)> = conn + .query( + "SELECT root_uri, p.at_uri, p.created_at FROM posts_tmp p INNER JOIN threadgates t ON root_uri = post_uri WHERE t.allow IS NOT NULL", + &[], + ) + .await? + .into_iter() + .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); + + for (root, post, created_at) in dbg!(threadgated) { + match super::post_enforce_threadgate(conn, &root, did, created_at, true).await { + Ok(true) => { + conn.execute( + "UPDATE posts_tmp SET violates_threadgate=TRUE WHERE at_uri=$1", + &[&post], + ) + .await?; + } + Ok(false) => continue, + Err(e) => { + tracing::error!("failed to check threadgate enforcement: {e}"); + continue; + } + } + } + conn.execute("INSERT INTO posts (SELECT * FROM posts_tmp)", &[]) .await } diff --git a/consumer/src/db/record.rs b/consumer/src/db/record.rs index 5bc7e92..1ab0a79 100644 --- a/consumer/src/db/record.rs +++ b/consumer/src/db/record.rs @@ -333,6 +333,14 @@ pub async fn post_insert( let embed = rec.embed.as_ref().map(|v| v.as_str()); let embed_subtype = rec.embed.as_ref().and_then(|v| v.subtype()); + // if there is a root, we need to check for the presence of a threadgate. + let violates_threadgate = match &root_uri { + Some(root) => { + post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await? + } + None => false, + }; + let tags = merge_tags(tags, rec.tags); let count = conn @@ -354,6 +362,7 @@ pub async fn post_insert( &embed, &embed_subtype, &mentions, + &violates_threadgate, &rec.created_at, ], ) @@ -371,6 +380,93 @@ pub async fn post_delete(conn: &mut C, at_uri: &str) -> PgExec .await } +pub async fn post_enforce_threadgate( + conn: &mut C, + root: &str, + post_author: &str, + post_created_at: DateTime, + is_backfill: bool, +) -> PgResult { + // check if the root and the current post are the same author + // strip "at://" then break into parts by '/' + let parts = root[5..].split('/').collect::>(); + let root_author = parts[0]; + if root_author == post_author { + return Ok(false); + } + + let tg_data = threadgate_get(conn, root).await?; + + let Some((created_at, allow, allow_lists)) = tg_data else { + return Ok(false); + }; + + // when backfilling, there's no point continuing if the record is dated before the threadgate + if is_backfill && post_created_at < created_at { + return Ok(false); + } + + if allow.is_empty() { + return Ok(true); + } + + let allow: HashSet = HashSet::from_iter(allow); + + if allow.contains("app.bsky.feed.threadgate#followerRule") + || allow.contains("app.bsky.feed.threadgate#followingRule") + { + let profile_state: Option<(bool, bool)> = conn + .query_opt( + "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", + &[&root_author, &post_author], + ) + .await? + .map(|v| (v.get(0), v.get(1))); + + if let Some((following, followed)) = profile_state { + if allow.contains("app.bsky.feed.threadgate#followerRule") && followed { + return Ok(false); + } + + if allow.contains("app.bsky.feed.threadgate#followingRule") && following { + return Ok(false); + } + } + } + + // check mentions + if allow.contains("app.bsky.feed.threadgate#mentionRule") { + let mentions: Vec = conn + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) + .await? + .map(|r| r.get(0)) + .unwrap_or_default(); + + if mentions.contains(&post_author.to_owned()) { + return Ok(false); + } + } + + if allow.contains("app.bsky.feed.threadgate#listRule") { + if allow_lists.is_empty() { + return Ok(true); + } + + let count: i64 = conn + .query_one( + "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", + &[&allow_lists, &post_author], + ) + .await? + .get(0); + if count == 0 { + return Ok(true); + } + } + + Ok(false) +} + pub async fn post_get_info_for_delete( conn: &mut C, at_uri: &str, @@ -731,6 +827,21 @@ pub async fn status_delete(conn: &mut C, did: &str) -> PgExecR .await } +async fn threadgate_get( + conn: &mut C, + post: &str, +) -> PgOptResult<(DateTime, Vec, Vec)> { + let res = conn + .query_opt( + "SELECT created_at, allow, allowed_lists FROM threadgates WHERE post_uri=$1 AND allow IS NOT NULL", + &[&post], + ) + .await? + .map(|v| (v.get(0), v.get(1), v.get(2))); + + Ok(res) +} + pub async fn threadgate_upsert( conn: &mut C, at_uri: &str, diff --git a/consumer/src/db/sql/post_insert.sql b/consumer/src/db/sql/post_insert.sql index 6e81762..86032ad 100644 --- a/consumer/src/db/sql/post_insert.sql +++ b/consumer/src/db/sql/post_insert.sql @@ -1,4 +1,4 @@ INSERT INTO posts (at_uri, did, cid, record, content, facets, languages, tags, parent_uri, parent_cid, root_uri, - root_cid, embed, embed_subtype, mentions, created_at) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16) + root_cid, embed, embed_subtype, mentions, violates_threadgate, created_at) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17) ON CONFLICT DO NOTHING \ No newline at end of file diff --git a/migrations/2025-09-27-171241_post-tweaks/down.sql b/migrations/2025-09-27-171241_post-tweaks/down.sql index bfcf265..1519864 100644 --- a/migrations/2025-09-27-171241_post-tweaks/down.sql +++ b/migrations/2025-09-27-171241_post-tweaks/down.sql @@ -1,2 +1,3 @@ alter table posts - drop column mentions; \ No newline at end of file + drop column mentions, + drop column violates_threadgate; \ No newline at end of file diff --git a/migrations/2025-09-27-171241_post-tweaks/up.sql b/migrations/2025-09-27-171241_post-tweaks/up.sql index cdea3a9..8965540 100644 --- a/migrations/2025-09-27-171241_post-tweaks/up.sql +++ b/migrations/2025-09-27-171241_post-tweaks/up.sql @@ -1,2 +1,3 @@ alter table posts - add column mentions text[]; \ No newline at end of file + add column mentions text[], + add column violates_threadgate bool not null default false; \ No newline at end of file diff --git a/parakeet-db/src/schema.rs b/parakeet-db/src/schema.rs index 2e88888..5280ffe 100644 --- a/parakeet-db/src/schema.rs +++ b/parakeet-db/src/schema.rs @@ -285,6 +285,7 @@ diesel::table! { created_at -> Timestamptz, indexed_at -> Timestamp, mentions -> Nullable>>, + violates_threadgate -> Bool, } } -- 2.43.0 From 4f4aab6b9b8acc2eb2bb868e414c3b2b36006d58 Mon Sep 17 00:00:00 2001 From: Mia Date: Sat, 4 Oct 2025 20:38:07 +0100 Subject: [PATCH] move postgate and threadgate enforcement into their own file --- consumer/src/db/gates.rs | 105 ++++++++++++++++++++++++++++++++++++++ consumer/src/db/mod.rs | 2 + consumer/src/db/record.rs | 104 +------------------------------------ 3 files changed, 109 insertions(+), 102 deletions(-) create mode 100644 consumer/src/db/gates.rs diff --git a/consumer/src/db/gates.rs b/consumer/src/db/gates.rs new file mode 100644 index 0000000..2c404e4 --- /dev/null +++ b/consumer/src/db/gates.rs @@ -0,0 +1,105 @@ +use super::{PgExecResult, PgResult}; +use chrono::prelude::*; +use chrono::{DateTime, Utc}; +use deadpool_postgres::GenericClient; +use std::collections::HashSet; + +pub async fn post_enforce_threadgate( + conn: &mut C, + root: &str, + post_author: &str, + post_created_at: DateTime, + is_backfill: bool, +) -> PgResult { + // check if the root and the current post are the same author + // strip "at://" then break into parts by '/' + let parts = root[5..].split('/').collect::>(); + let root_author = parts[0]; + if root_author == post_author { + return Ok(false); + } + + let tg_data = super::threadgate_get(conn, root).await?; + + let Some((created_at, allow, allow_lists)) = tg_data else { + return Ok(false); + }; + + // when backfilling, there's no point continuing if the record is dated before the threadgate + if is_backfill && post_created_at < created_at { + return Ok(false); + } + + if allow.is_empty() { + return Ok(true); + } + + let allow: HashSet = HashSet::from_iter(allow); + + if allow.contains("app.bsky.feed.threadgate#followerRule") + || allow.contains("app.bsky.feed.threadgate#followingRule") + { + let profile_state: Option<(bool, bool)> = conn + .query_opt( + "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", + &[&root_author, &post_author], + ) + .await? + .map(|v| (v.get(0), v.get(1))); + + if let Some((following, followed)) = profile_state { + if allow.contains("app.bsky.feed.threadgate#followerRule") && followed { + return Ok(false); + } + + if allow.contains("app.bsky.feed.threadgate#followingRule") && following { + return Ok(false); + } + } + } + + // check mentions + if allow.contains("app.bsky.feed.threadgate#mentionRule") { + let mentions: Vec = conn + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) + .await? + .map(|r| r.get(0)) + .unwrap_or_default(); + + if mentions.contains(&post_author.to_owned()) { + return Ok(false); + } + } + + if allow.contains("app.bsky.feed.threadgate#listRule") { + if allow_lists.is_empty() { + return Ok(true); + } + + let count: i64 = conn + .query_one( + "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", + &[&allow_lists, &post_author], + ) + .await? + .get(0); + if count == 0 { + return Ok(true); + } + } + + Ok(false) +} + +pub async fn postgate_maintain_detaches( + conn: &mut C, + post: &str, + detached: &[String], + disable_effective: Option, +) -> PgExecResult { + conn.execute( + "SELECT maintain_postgates($1, $2, $3)", + &[&post, &detached, &disable_effective], + ) + .await +} diff --git a/consumer/src/db/mod.rs b/consumer/src/db/mod.rs index b10ad7c..e45777d 100644 --- a/consumer/src/db/mod.rs +++ b/consumer/src/db/mod.rs @@ -7,10 +7,12 @@ type PgOptResult = PgResult>; mod actor; mod backfill; pub mod copy; +mod gates; mod labels; mod record; pub use actor::*; pub use backfill::*; +pub use gates::*; pub use labels::*; pub use record::*; diff --git a/consumer/src/db/record.rs b/consumer/src/db/record.rs index 1ab0a79..c3bc037 100644 --- a/consumer/src/db/record.rs +++ b/consumer/src/db/record.rs @@ -336,7 +336,7 @@ pub async fn post_insert( // if there is a root, we need to check for the presence of a threadgate. let violates_threadgate = match &root_uri { Some(root) => { - post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await? + super::post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await? } None => false, }; @@ -380,93 +380,6 @@ pub async fn post_delete(conn: &mut C, at_uri: &str) -> PgExec .await } -pub async fn post_enforce_threadgate( - conn: &mut C, - root: &str, - post_author: &str, - post_created_at: DateTime, - is_backfill: bool, -) -> PgResult { - // check if the root and the current post are the same author - // strip "at://" then break into parts by '/' - let parts = root[5..].split('/').collect::>(); - let root_author = parts[0]; - if root_author == post_author { - return Ok(false); - } - - let tg_data = threadgate_get(conn, root).await?; - - let Some((created_at, allow, allow_lists)) = tg_data else { - return Ok(false); - }; - - // when backfilling, there's no point continuing if the record is dated before the threadgate - if is_backfill && post_created_at < created_at { - return Ok(false); - } - - if allow.is_empty() { - return Ok(true); - } - - let allow: HashSet = HashSet::from_iter(allow); - - if allow.contains("app.bsky.feed.threadgate#followerRule") - || allow.contains("app.bsky.feed.threadgate#followingRule") - { - let profile_state: Option<(bool, bool)> = conn - .query_opt( - "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", - &[&root_author, &post_author], - ) - .await? - .map(|v| (v.get(0), v.get(1))); - - if let Some((following, followed)) = profile_state { - if allow.contains("app.bsky.feed.threadgate#followerRule") && followed { - return Ok(false); - } - - if allow.contains("app.bsky.feed.threadgate#followingRule") && following { - return Ok(false); - } - } - } - - // check mentions - if allow.contains("app.bsky.feed.threadgate#mentionRule") { - let mentions: Vec = conn - .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) - .await? - .map(|r| r.get(0)) - .unwrap_or_default(); - - if mentions.contains(&post_author.to_owned()) { - return Ok(false); - } - } - - if allow.contains("app.bsky.feed.threadgate#listRule") { - if allow_lists.is_empty() { - return Ok(true); - } - - let count: i64 = conn - .query_one( - "SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2", - &[&allow_lists, &post_author], - ) - .await? - .get(0); - if count == 0 { - return Ok(true); - } - } - - Ok(false) -} - pub async fn post_get_info_for_delete( conn: &mut C, at_uri: &str, @@ -665,19 +578,6 @@ pub async fn postgate_delete(conn: &mut C, at_uri: &str) -> Pg .await } -pub async fn postgate_maintain_detaches( - conn: &mut C, - post: &str, - detached: &[String], - disable_effective: Option, -) -> PgExecResult { - conn.execute( - "SELECT maintain_postgates($1, $2, $3)", - &[&post, &detached, &disable_effective], - ) - .await -} - pub async fn profile_upsert( conn: &mut C, repo: &str, @@ -827,7 +727,7 @@ pub async fn status_delete(conn: &mut C, did: &str) -> PgExecR .await } -async fn threadgate_get( +pub async fn threadgate_get( conn: &mut C, post: &str, ) -> PgOptResult<(DateTime, Vec, Vec)> { -- 2.43.0 From da2dbdd177e7001835246773012034aece8d007f Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 12:27:34 +0100 Subject: [PATCH] default deny threadgates --- consumer/src/db/gates.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/src/db/gates.rs b/consumer/src/db/gates.rs index 2c404e4..6f49a7d 100644 --- a/consumer/src/db/gates.rs +++ b/consumer/src/db/gates.rs @@ -88,7 +88,7 @@ pub async fn post_enforce_threadgate( } } - Ok(false) + Ok(true) } pub async fn postgate_maintain_detaches( -- 2.43.0 From ee988c367d1063c0c2e45a776f68753467ffa0e2 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 12:29:30 +0100 Subject: [PATCH] fix list allows --- consumer/src/db/gates.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/consumer/src/db/gates.rs b/consumer/src/db/gates.rs index 6f49a7d..c27ede6 100644 --- a/consumer/src/db/gates.rs +++ b/consumer/src/db/gates.rs @@ -83,8 +83,8 @@ pub async fn post_enforce_threadgate( ) .await? .get(0); - if count == 0 { - return Ok(true); + if count != 0 { + return Ok(false); } } -- 2.43.0 From a3338668b241d874c05cc188edf9744cb46d8906 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 13:03:41 +0100 Subject: [PATCH] remove accidental dbg! --- consumer/src/db/copy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consumer/src/db/copy.rs b/consumer/src/db/copy.rs index d51ac2c..71cdd60 100644 --- a/consumer/src/db/copy.rs +++ b/consumer/src/db/copy.rs @@ -207,7 +207,7 @@ pub async fn copy_posts( .into_iter() .map(|v| (v.get(0), v.get(1), v.get(2))).collect(); - for (root, post, created_at) in dbg!(threadgated) { + for (root, post, created_at) in threadgated { match super::post_enforce_threadgate(conn, &root, did, created_at, true).await { Ok(true) => { conn.execute( -- 2.43.0 From c0f3cd763bb693131b89882b18ce9378dc7796c4 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 13:20:28 +0100 Subject: [PATCH] consts for threadgate rules --- consumer/src/db/gates.rs | 16 +++++++++------- consumer/src/indexer/records.rs | 13 +++++++++---- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/consumer/src/db/gates.rs b/consumer/src/db/gates.rs index c27ede6..2146b5d 100644 --- a/consumer/src/db/gates.rs +++ b/consumer/src/db/gates.rs @@ -1,4 +1,8 @@ use super::{PgExecResult, PgResult}; +use crate::indexer::records::{ + THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, THREADGATE_RULE_LIST, + THREADGATE_RULE_MENTION, +}; use chrono::prelude::*; use chrono::{DateTime, Utc}; use deadpool_postgres::GenericClient; @@ -36,9 +40,7 @@ pub async fn post_enforce_threadgate( let allow: HashSet = HashSet::from_iter(allow); - if allow.contains("app.bsky.feed.threadgate#followerRule") - || allow.contains("app.bsky.feed.threadgate#followingRule") - { + if allow.contains(THREADGATE_RULE_FOLLOWER) || allow.contains(THREADGATE_RULE_FOLLOWING) { let profile_state: Option<(bool, bool)> = conn .query_opt( "SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2", @@ -48,18 +50,18 @@ pub async fn post_enforce_threadgate( .map(|v| (v.get(0), v.get(1))); if let Some((following, followed)) = profile_state { - if allow.contains("app.bsky.feed.threadgate#followerRule") && followed { + if allow.contains(THREADGATE_RULE_FOLLOWER) && followed { return Ok(false); } - if allow.contains("app.bsky.feed.threadgate#followingRule") && following { + if allow.contains(THREADGATE_RULE_FOLLOWING) && following { return Ok(false); } } } // check mentions - if allow.contains("app.bsky.feed.threadgate#mentionRule") { + if allow.contains(THREADGATE_RULE_MENTION) { let mentions: Vec = conn .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) .await? @@ -71,7 +73,7 @@ pub async fn post_enforce_threadgate( } } - if allow.contains("app.bsky.feed.threadgate#listRule") { + if allow.contains(THREADGATE_RULE_LIST) { if allow_lists.is_empty() { return Ok(true); } diff --git a/consumer/src/indexer/records.rs b/consumer/src/indexer/records.rs index b5a58f6..c18d58e 100644 --- a/consumer/src/indexer/records.rs +++ b/consumer/src/indexer/records.rs @@ -272,6 +272,11 @@ pub struct AppBskyFeedThreadgate { pub hidden_replies: Vec, } +pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule"; +pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule"; +pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule"; +pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule"; + #[derive(Debug, Deserialize, Serialize)] #[serde(tag = "$type")] pub enum ThreadgateRule { @@ -288,10 +293,10 @@ pub enum ThreadgateRule { impl ThreadgateRule { pub fn as_str(&self) -> &'static str { match self { - ThreadgateRule::Mention => "app.bsky.feed.threadgate#mentionRule", - ThreadgateRule::Follower => "app.bsky.feed.threadgate#followerRule", - ThreadgateRule::Following => "app.bsky.feed.threadgate#followingRule", - ThreadgateRule::List { .. } => "app.bsky.feed.threadgate#listRule", + ThreadgateRule::Mention => THREADGATE_RULE_MENTION, + ThreadgateRule::Follower => THREADGATE_RULE_FOLLOWER, + ThreadgateRule::Following => THREADGATE_RULE_FOLLOWING, + ThreadgateRule::List { .. } => THREADGATE_RULE_LIST, } } } -- 2.43.0 From 87cfa87cc3549487de15243442f3ce74d309db36 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 13:21:58 +0100 Subject: [PATCH] correct threadgate backfilling needed to do a reorder of COPY to make sure everything was in before calculating gates --- consumer/src/backfill/mod.rs | 7 ++- consumer/src/backfill/repo.rs | 10 ++++ consumer/src/db/gates.rs | 105 +++++++++++++++++++++++++++++++++- 3 files changed, 119 insertions(+), 3 deletions(-) diff --git a/consumer/src/backfill/mod.rs b/consumer/src/backfill/mod.rs index 0f17a62..cd719e2 100644 --- a/consumer/src/backfill/mod.rs +++ b/consumer/src/backfill/mod.rs @@ -275,18 +275,23 @@ struct CopyStore { follows: Vec<(String, String, DateTime)>, list_items: Vec<(String, records::AppBskyGraphListItem)>, verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>, + threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last. records: Vec<(String, Cid)>, } impl CopyStore { async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> { db::copy::copy_likes(t, did, self.likes).await?; - db::copy::copy_posts(t, did, self.posts).await?; db::copy::copy_reposts(t, did, self.reposts).await?; db::copy::copy_blocks(t, did, self.blocks).await?; db::copy::copy_follows(t, did, self.follows).await?; db::copy::copy_list_items(t, self.list_items).await?; db::copy::copy_verification(t, did, self.verifications).await?; + db::copy::copy_posts(t, did, self.posts).await?; + for (at_uri, cid, record) in self.threadgates { + db::threadgate_enforce_backfill(t, did, &record).await?; + db::threadgate_upsert(t, &at_uri, cid, record).await?; + } db::copy::copy_records(t, did, self.records).await?; Ok(()) diff --git a/consumer/src/backfill/repo.rs b/consumer/src/backfill/repo.rs index ab4897d..c9ca477 100644 --- a/consumer/src/backfill/repo.rs +++ b/consumer/src/backfill/repo.rs @@ -4,6 +4,7 @@ use super::{ }; use crate::indexer::records; use crate::indexer::types::{AggregateDeltaStore, RecordTypes}; +use crate::utils::at_uri_is_by; use crate::{db, indexer}; use deadpool_postgres::Transaction; use ipld_core::cid::Cid; @@ -166,6 +167,15 @@ async fn record_index( .reposts .push((rkey.to_string(), rec.subject, rec.via, rec.created_at)); } + RecordTypes::AppBskyFeedThreadgate(record) => { + if !at_uri_is_by(&record.post, did) { + tracing::warn!("tried to create a threadgate on a post we don't control!"); + return Ok(()); + } + + copies.push_record(&at_uri, cid); + copies.threadgates.push((at_uri, cid, record)); + } RecordTypes::AppBskyGraphBlock(rec) => { copies.push_record(&at_uri, cid); copies diff --git a/consumer/src/db/gates.rs b/consumer/src/db/gates.rs index 2146b5d..72e1484 100644 --- a/consumer/src/db/gates.rs +++ b/consumer/src/db/gates.rs @@ -1,7 +1,7 @@ use super::{PgExecResult, PgResult}; use crate::indexer::records::{ - THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, THREADGATE_RULE_LIST, - THREADGATE_RULE_MENTION, + AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, + THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION, }; use chrono::prelude::*; use chrono::{DateTime, Utc}; @@ -105,3 +105,104 @@ pub async fn postgate_maintain_detaches( ) .await } + +// variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB +pub async fn threadgate_enforce_backfill( + conn: &mut C, + root_author: &str, + threadgate: &AppBskyFeedThreadgate, +) -> PgExecResult { + // pull out allow - if it's None we can skip this gate. + let Some(allow) = threadgate.allow.as_ref() else { + return Ok(0); + }; + + let root = &threadgate.post; + + if allow.is_empty() { + // blind update everything + return conn.execute( + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3", + &[&root, &root_author, &threadgate.created_at], + ).await; + } + + // pull authors with our root_uri where the author is not the root author and are dated after created_at + // this is mutable because we'll remove ALLOWED dids + let mut dids: HashSet = conn + .query( + "SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3", + &[&root, &root_author, &threadgate.created_at], + ) + .await? + .into_iter() + .map(|row| row.get(0)) + .collect(); + + // this will be empty if there are no replies. + if dids.is_empty() { + return Ok(0); + } + + let allowed_lists = allow + .iter() + .filter_map(|rule| match rule { + ThreadgateRule::List { list } => Some(list), + _ => None, + }) + .collect::>(); + + let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str())); + + if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() { + let current_dids: Vec<_> = dids.iter().collect(); + + let res = conn.query( + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL", + &[&root_author, ¤t_dids] + ).await?; + + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); + } + + if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() { + let current_dids: Vec<_> = dids.iter().collect(); + + let res = conn.query( + "SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL", + &[&root_author, ¤t_dids] + ).await?; + + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); + } + + if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() { + let mentions: Vec = conn + .query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root]) + .await? + .map(|r| r.get(0)) + .unwrap_or_default(); + + dids = &dids - &HashSet::from_iter(mentions); + } + + if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() { + let current_dids: Vec<_> = dids.iter().collect(); + + let res = conn + .query( + "SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)", + &[&allowed_lists, ¤t_dids], + ) + .await?; + + dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0))); + } + + let dids = dids.into_iter().collect::>(); + + conn.execute( + "UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3", + &[&threadgate.post, &dids, &threadgate.created_at] + ).await +} -- 2.43.0 From 02b985ab4d751a12621d39df4695a49b3792e177 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 16:39:11 +0100 Subject: [PATCH] update models --- parakeet-db/src/models.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/parakeet-db/src/models.rs b/parakeet-db/src/models.rs index a0ea198..ae80369 100644 --- a/parakeet-db/src/models.rs +++ b/parakeet-db/src/models.rs @@ -148,6 +148,9 @@ pub struct Post { pub embed: Option, pub embed_subtype: Option, + pub mentions: Option>>, + pub violates_threadgate: bool, + pub created_at: DateTime, pub indexed_at: NaiveDateTime, } -- 2.43.0 From 80379191cf0bb32aa9b7be505ea96b578591fbc5 Mon Sep 17 00:00:00 2001 From: Mia Date: Sun, 5 Oct 2025 18:38:30 +0100 Subject: [PATCH] enforce threadgates --- parakeet/src/hydration/posts.rs | 2 ++ parakeet/src/sql/thread.sql | 4 ++-- parakeet/src/sql/thread_parent.sql | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/parakeet/src/hydration/posts.rs b/parakeet/src/hydration/posts.rs index c072aca..2588069 100644 --- a/parakeet/src/hydration/posts.rs +++ b/parakeet/src/hydration/posts.rs @@ -201,8 +201,10 @@ impl StatefulHydrator<'_> { let viewer_data = self.get_post_viewer_states(&post_uris).await; let embeds = self.hydrate_embeds(post_uris).await; + // we shouldn't show the parent when the post violates a threadgate. let reply_refs = posts .values() + .filter(|(post, _)| !post.violates_threadgate) .flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()]) .flatten() .collect::>(); diff --git a/parakeet/src/sql/thread.sql b/parakeet/src/sql/thread.sql index d394a4a..41ea953 100644 --- a/parakeet/src/sql/thread.sql +++ b/parakeet/src/sql/thread.sql @@ -1,11 +1,11 @@ with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth from posts - where parent_uri = $1 + where parent_uri = $1 and violates_threadgate=FALSE union all select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1 from posts p join thread on p.parent_uri = thread.at_uri - where thread.depth <= $2) + where thread.depth <= $2 and p.violates_threadgate=FALSE) select * from thread order by depth desc; diff --git a/parakeet/src/sql/thread_parent.sql b/parakeet/src/sql/thread_parent.sql index 8c826a5..8722b94 100644 --- a/parakeet/src/sql/thread_parent.sql +++ b/parakeet/src/sql/thread_parent.sql @@ -1,11 +1,13 @@ with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth from posts - where at_uri = (select parent_uri from posts where at_uri = $1) + where + at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE) union all select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1 from posts p join parents on p.at_uri = parents.parent_uri - where parents.depth <= $2) + where parents.depth <= $2 + and p.violates_threadgate = FALSE) select * from parents order by depth desc; \ No newline at end of file -- 2.43.0 From f652576d46688b4fa1bd08615c37e7ae9e517a29 Mon Sep 17 00:00:00 2001 From: Mia Date: Mon, 6 Oct 2025 20:44:00 +0100 Subject: [PATCH] show reposts in author feeds --- .../2025-09-27-171241_post-tweaks/down.sql | 14 +++- .../2025-09-27-171241_post-tweaks/up.sql | 78 ++++++++++++++++++- parakeet-db/src/models.rs | 13 ++++ parakeet-db/src/schema.rs | 12 +++ parakeet/src/xrpc/app_bsky/feed/posts.rs | 58 +++++++++----- 5 files changed, 153 insertions(+), 22 deletions(-) diff --git a/migrations/2025-09-27-171241_post-tweaks/down.sql b/migrations/2025-09-27-171241_post-tweaks/down.sql index 1519864..18e2d08 100644 --- a/migrations/2025-09-27-171241_post-tweaks/down.sql +++ b/migrations/2025-09-27-171241_post-tweaks/down.sql @@ -1,3 +1,15 @@ alter table posts drop column mentions, - drop column violates_threadgate; \ No newline at end of file + drop column violates_threadgate; + +drop trigger t_author_feed_ins_post on posts; +drop trigger t_author_feed_del_post on posts; +drop trigger t_author_feed_ins_repost on reposts; +drop trigger t_author_feed_del_repost on reposts; + +drop function f_author_feed_ins_post; +drop function f_author_feed_del_post; +drop function f_author_feed_ins_repost; +drop function f_author_feed_del_repost; + +drop table author_feeds; \ No newline at end of file diff --git a/migrations/2025-09-27-171241_post-tweaks/up.sql b/migrations/2025-09-27-171241_post-tweaks/up.sql index 8965540..26f8ff7 100644 --- a/migrations/2025-09-27-171241_post-tweaks/up.sql +++ b/migrations/2025-09-27-171241_post-tweaks/up.sql @@ -1,3 +1,79 @@ alter table posts add column mentions text[], - add column violates_threadgate bool not null default false; \ No newline at end of file + add column violates_threadgate bool not null default false; + +create table author_feeds +( + uri text primary key, + cid text not null, + post text not null, + did text not null, + typ text not null, + sort_at timestamptz not null +); + +-- author_feeds post triggers +create function f_author_feed_ins_post() returns trigger + language plpgsql as +$$ +begin + insert into author_feeds (uri, cid, post, did, typ, sort_at) + VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at) + on conflict do nothing; + return NEW; +end; +$$; + +create trigger t_author_feed_ins_post + before insert + on posts + for each row +execute procedure f_author_feed_ins_post(); + +create function f_author_feed_del_post() returns trigger + language plpgsql as +$$ +begin + delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post'; + return OLD; +end; +$$; + +create trigger t_author_feed_del_post + before delete + on posts + for each row +execute procedure f_author_feed_del_post(); + +-- author_feeds repost triggers +create function f_author_feed_ins_repost() returns trigger + language plpgsql as +$$ +begin + insert into author_feeds (uri, cid, post, did, typ, sort_at) + VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at) + on conflict do nothing; + return NEW; +end; +$$; + +create trigger t_author_feed_ins_repost + before insert + on reposts + for each row +execute procedure f_author_feed_ins_repost(); + +create function f_author_feed_del_repost() returns trigger + language plpgsql as +$$ +begin + delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost'; + return OLD; +end; +$$; + +create trigger t_author_feed_del_repost + before delete + on reposts + for each row +execute procedure f_author_feed_del_repost(); diff --git a/parakeet-db/src/models.rs b/parakeet-db/src/models.rs index ae80369..f3a6f53 100644 --- a/parakeet-db/src/models.rs +++ b/parakeet-db/src/models.rs @@ -417,3 +417,16 @@ pub struct NewBookmark<'a> { pub subject_type: &'a str, pub tags: Vec, } + +#[derive(Debug, Queryable, Selectable, Identifiable)] +#[diesel(table_name = crate::schema::author_feeds)] +#[diesel(primary_key(uri))] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct AuthorFeedItem { + pub uri: String, + pub cid: String, + pub post: String, + pub did: String, + pub typ: String, + pub sort_at: DateTime, +} diff --git a/parakeet-db/src/schema.rs b/parakeet-db/src/schema.rs index 5280ffe..ed782c4 100644 --- a/parakeet-db/src/schema.rs +++ b/parakeet-db/src/schema.rs @@ -12,6 +12,17 @@ diesel::table! { } } +diesel::table! { + author_feeds (uri) { + uri -> Text, + cid -> Text, + post -> Text, + did -> Text, + typ -> Text, + sort_at -> Timestamptz, + } +} + diesel::table! { backfill (repo, repo_ver) { repo -> Text, @@ -431,6 +442,7 @@ diesel::joinable!(verification -> actors (verifier)); diesel::allow_tables_to_appear_in_same_query!( actors, + author_feeds, backfill, backfill_jobs, blocks, diff --git a/parakeet/src/xrpc/app_bsky/feed/posts.rs b/parakeet/src/xrpc/app_bsky/feed/posts.rs index 1860ea5..c7bde48 100644 --- a/parakeet/src/xrpc/app_bsky/feed/posts.rs +++ b/parakeet/src/xrpc/app_bsky/feed/posts.rs @@ -19,7 +19,7 @@ use lexica::app_bsky::feed::{ BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, }; -use parakeet_db::schema; +use parakeet_db::{models, schema}; use reqwest::Url; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -217,59 +217,77 @@ pub async fn get_author_feed( let limit = query.limit.unwrap_or(50).clamp(1, 100); - let mut posts_query = schema::posts::table - .select((schema::posts::created_at, schema::posts::at_uri)) - .filter(schema::posts::did.eq(did)) + let mut posts_query = schema::author_feeds::table + .select(models::AuthorFeedItem::as_select()) + .left_join(schema::posts::table.on(schema::posts::at_uri.eq(schema::author_feeds::post))) + .filter(schema::author_feeds::did.eq(&did)) .into_boxed(); if let Some(cursor) = datetime_cursor(query.cursor.as_ref()) { - posts_query = posts_query.filter(schema::posts::created_at.lt(cursor)); + posts_query = posts_query.filter(schema::author_feeds::sort_at.lt(cursor)); } posts_query = match query.filter { - GetAuthorFeedFilter::PostsWithReplies => posts_query, + GetAuthorFeedFilter::PostsWithReplies => { + posts_query.filter(schema::author_feeds::typ.eq("post")) + } GetAuthorFeedFilter::PostsNoReplies => { posts_query.filter(schema::posts::parent_uri.is_null()) } - GetAuthorFeedFilter::PostsWithMedia => posts_query.filter(embed_type_filter(&[ - "app.bsky.embed.video", - "app.bsky.embed.images", - ])), + GetAuthorFeedFilter::PostsWithMedia => posts_query.filter( + embed_type_filter(&["app.bsky.embed.video", "app.bsky.embed.images"]) + .and(schema::author_feeds::typ.eq("post")), + ), GetAuthorFeedFilter::PostsAndAuthorThreads => posts_query.filter( (schema::posts::parent_uri - .like(format!("at://{}/%", &query.actor)) + .like(format!("at://{did}/%")) .or(schema::posts::parent_uri.is_null())) .and( schema::posts::root_uri - .like(format!("at://{}/%", &query.actor)) + .like(format!("at://{did}/%")) .or(schema::posts::root_uri.is_null()), ), ), - GetAuthorFeedFilter::PostsWithVideo => { - posts_query.filter(embed_type_filter(&["app.bsky.embed.video"])) - } + GetAuthorFeedFilter::PostsWithVideo => posts_query.filter( + embed_type_filter(&["app.bsky.embed.video"]).and(schema::author_feeds::typ.eq("post")), + ), }; let results = posts_query - .order(schema::posts::created_at.desc()) + .order(schema::author_feeds::sort_at.desc()) .limit(limit as i64) - .load::<(chrono::DateTime, String)>(&mut conn) + .load(&mut conn) .await?; let cursor = results .last() - .map(|(last, _)| last.timestamp_millis().to_string()); + .map(|item| item.sort_at.timestamp_millis().to_string()); let at_uris = results .iter() - .map(|(_, uri)| uri.clone()) + .map(|item| item.post.clone()) .collect::>(); + // get the actor for if we have reposted + let profile = hyd.hydrate_profile_basic(did).await.ok_or(Error::server_error(None))?; + let mut posts = hyd.hydrate_feed_posts(at_uris).await; let mut feed: Vec<_> = results .into_iter() - .filter_map(|(_, uri)| posts.remove(&uri)) + .filter_map(|item| { + posts.remove(&item.post).map(|mut fvp| { + if item.typ == "repost" { + fvp.reason = Some(FeedViewPostReason::Repost(FeedReasonRepost { + by: profile.clone(), + uri: Some(item.uri), + cid: Some(item.cid), + indexed_at: Default::default(), + })) + } + fvp + }) + }) .collect(); if let Some(post) = pin { -- 2.43.0 From 6bfd6ae7ad43d9fe89fd4088e1d60b1f98d1cf60 Mon Sep 17 00:00:00 2001 From: Mia Date: Tue, 7 Oct 2025 19:44:38 +0100 Subject: [PATCH] make author feeds work like bluesky --- parakeet/src/hydration/posts.rs | 143 ++++++++++++++++------- parakeet/src/xrpc/app_bsky/feed/likes.rs | 2 +- parakeet/src/xrpc/app_bsky/feed/posts.rs | 21 ++-- 3 files changed, 113 insertions(+), 53 deletions(-) diff --git a/parakeet/src/hydration/posts.rs b/parakeet/src/hydration/posts.rs index 2588069..9629265 100644 --- a/parakeet/src/hydration/posts.rs +++ b/parakeet/src/hydration/posts.rs @@ -187,7 +187,11 @@ impl StatefulHydrator<'_> { .collect() } - pub async fn hydrate_feed_posts(&self, posts: Vec) -> HashMap { + pub async fn hydrate_feed_posts( + &self, + posts: Vec, + author_threads_only: bool, + ) -> HashMap { let stats = self.loaders.post_stats.load_many(posts.clone()).await; let posts = self.loaders.posts.load_many(posts).await; @@ -199,7 +203,7 @@ impl StatefulHydrator<'_> { let post_labels = self.get_label_many(&post_uris).await; let viewer_data = self.get_post_viewer_states(&post_uris).await; - let embeds = self.hydrate_embeds(post_uris).await; + let embeds = self.hydrate_embeds(post_uris.clone()).await; // we shouldn't show the parent when the post violates a threadgate. let reply_refs = posts @@ -211,53 +215,35 @@ impl StatefulHydrator<'_> { let reply_posts = self.hydrate_posts(reply_refs).await; - posts + // hydrate all the posts. + let mut posts = posts .into_iter() - .filter_map(|(post_uri, (post, _))| { - let author = authors.get(&post.did)?; - - let root = post.root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); - let parent = post - .parent_uri - .as_ref() - .and_then(|uri| reply_posts.get(uri)); - - let reply = if post.parent_uri.is_some() && post.root_uri.is_some() { - Some(ReplyRef { - root: root.cloned().map(postview_to_replyref).unwrap_or( - ReplyRefPost::NotFound { - uri: post.root_uri.as_ref().unwrap().clone(), - not_found: true, - }, - ), - parent: parent.cloned().map(postview_to_replyref).unwrap_or( - ReplyRefPost::NotFound { - uri: post.parent_uri.as_ref().unwrap().clone(), - not_found: true, - }, - ), - grandparent_author: None, - }) - } else { - None - }; + .filter_map(|(post_uri, (raw, _))| { + let root = raw.root_uri.clone(); + let parent = raw.parent_uri.clone(); + let author = authors.get(&raw.did)?; let embed = embeds.get(&post_uri).cloned(); let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); let stats = stats.get(&post_uri).cloned(); let viewer = viewer_data.get(&post_uri).cloned(); let post = - build_postview(post, author.to_owned(), labels, embed, None, viewer, stats); + build_postview(raw, author.to_owned(), labels, embed, None, viewer, stats); - Some(( - post_uri, - FeedViewPost { - post, - reply, - reason: None, - feed_context: None, - }, - )) + Some((post_uri, (post, root, parent))) + }) + .collect::>(); + + post_uris + .into_iter() + .filter_map(|post_uri| { + let item = if author_threads_only { + compile_feed_authors_threads_only(&post_uri, &mut posts)? + } else { + compile_feed(&post_uri, &mut posts, &reply_posts)? + }; + + Some((post_uri, item)) }) .collect() } @@ -301,3 +287,78 @@ fn postview_to_replyref(post: PostView) -> ReplyRefPost { _ => ReplyRefPost::Post(post), } } + +type FeedViewPartData = (PostView, Option, Option); + +// this is the 'normal' one that runs in most places +fn compile_feed( + uri: &String, + posts: &mut HashMap, + reply_posts: &HashMap, +) -> Option { + let (post, root_uri, parent_uri) = posts.remove(uri)?; + + let root = root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); + let parent = parent_uri.as_ref().and_then(|uri| reply_posts.get(uri)); + + let reply = if parent_uri.is_some() && root_uri.is_some() { + Some(ReplyRef { + root: root + .cloned() + .map(postview_to_replyref) + .unwrap_or(ReplyRefPost::NotFound { + uri: root_uri.as_ref().unwrap().clone(), + not_found: true, + }), + parent: parent + .cloned() + .map(postview_to_replyref) + .unwrap_or(ReplyRefPost::NotFound { + uri: parent_uri.as_ref().unwrap().clone(), + not_found: true, + }), + grandparent_author: None, + }) + } else { + None + }; + + Some(FeedViewPost { + post, + reply, + reason: None, + feed_context: None, + }) +} + +// and this one runs in getAuthorFeed when filter=PostsAndAuthorThreads +fn compile_feed_authors_threads_only( + uri: &String, + posts: &mut HashMap, +) -> Option { + let (post, root_uri, parent_uri) = posts.get(uri)?.clone(); + + let root = root_uri.as_ref().and_then(|root| posts.get(root)); + let parent = parent_uri.as_ref().and_then(|parent| posts.get(parent)); + + let reply = if parent_uri.is_some() && root_uri.is_some() { + Some(ReplyRef { + root: root + .cloned() + .map(|(post, _, _)| postview_to_replyref(post))?, + parent: parent + .cloned() + .map(|(post, _, _)| postview_to_replyref(post))?, + grandparent_author: None, + }) + } else { + None + }; + + Some(FeedViewPost { + post, + reply, + reason: None, + feed_context: None, + }) +} diff --git a/parakeet/src/xrpc/app_bsky/feed/likes.rs b/parakeet/src/xrpc/app_bsky/feed/likes.rs index 147bba0..37a352f 100644 --- a/parakeet/src/xrpc/app_bsky/feed/likes.rs +++ b/parakeet/src/xrpc/app_bsky/feed/likes.rs @@ -62,7 +62,7 @@ pub async fn get_actor_likes( .map(|(_, uri)| uri.clone()) .collect::>(); - let mut posts = hyd.hydrate_feed_posts(at_uris).await; + let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; let feed: Vec<_> = results .into_iter() diff --git a/parakeet/src/xrpc/app_bsky/feed/posts.rs b/parakeet/src/xrpc/app_bsky/feed/posts.rs index c7bde48..5c9d459 100644 --- a/parakeet/src/xrpc/app_bsky/feed/posts.rs +++ b/parakeet/src/xrpc/app_bsky/feed/posts.rs @@ -123,7 +123,7 @@ pub async fn get_feed( }) .collect::>(); - let mut posts = hyd.hydrate_feed_posts(at_uris).await; + let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await; let feed = skeleton @@ -152,9 +152,10 @@ pub async fn get_feed( })) } -#[derive(Debug, Deserialize)] +#[derive(Debug, Default, Eq, PartialEq, Deserialize)] #[serde(rename_all = "snake_case")] pub enum GetAuthorFeedFilter { + #[default] PostsWithReplies, PostsNoReplies, PostsWithMedia, @@ -162,12 +163,6 @@ pub enum GetAuthorFeedFilter { PostsWithVideo, } -impl Default for GetAuthorFeedFilter { - fn default() -> Self { - Self::PostsWithReplies - } -} - #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct GetAuthorFeedQuery { @@ -227,6 +222,7 @@ pub async fn get_author_feed( posts_query = posts_query.filter(schema::author_feeds::sort_at.lt(cursor)); } + let author_threads_only = query.filter == GetAuthorFeedFilter::PostsAndAuthorThreads; posts_query = match query.filter { GetAuthorFeedFilter::PostsWithReplies => { posts_query.filter(schema::author_feeds::typ.eq("post")) @@ -269,9 +265,12 @@ pub async fn get_author_feed( .collect::>(); // get the actor for if we have reposted - let profile = hyd.hydrate_profile_basic(did).await.ok_or(Error::server_error(None))?; + let profile = hyd + .hydrate_profile_basic(did) + .await + .ok_or(Error::server_error(None))?; - let mut posts = hyd.hydrate_feed_posts(at_uris).await; + let mut posts = hyd.hydrate_feed_posts(at_uris, author_threads_only).await; let mut feed: Vec<_> = results .into_iter() @@ -348,7 +347,7 @@ pub async fn get_list_feed( .map(|(_, uri)| uri.clone()) .collect::>(); - let mut posts = hyd.hydrate_feed_posts(at_uris).await; + let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; let feed = results .into_iter() -- 2.43.0 From 2a640043925e8b69ddf3ee49cebcefb04bf5a7ff Mon Sep 17 00:00:00 2001 From: Mia Date: Sat, 11 Oct 2025 17:11:51 +0100 Subject: [PATCH] the big feed refactor --- parakeet/src/hydration/posts.rs | 296 ++++++++++++----------- parakeet/src/xrpc/app_bsky/feed/likes.rs | 15 +- parakeet/src/xrpc/app_bsky/feed/posts.rs | 142 +++++------ 3 files changed, 221 insertions(+), 232 deletions(-) diff --git a/parakeet/src/hydration/posts.rs b/parakeet/src/hydration/posts.rs index 9629265..66c952e 100644 --- a/parakeet/src/hydration/posts.rs +++ b/parakeet/src/hydration/posts.rs @@ -3,7 +3,8 @@ use crate::hydration::{map_labels, StatefulHydrator}; use lexica::app_bsky::actor::ProfileViewBasic; use lexica::app_bsky::embed::Embed; use lexica::app_bsky::feed::{ - BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView, + BlockedAuthor, FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView, PostViewerState, + ReplyRef, ReplyRefPost, ThreadgateView, }; use lexica::app_bsky::graph::ListViewBasic; use lexica::app_bsky::RecordStats; @@ -32,14 +33,18 @@ fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { } } +type HydratePostsRet = ( + models::Post, + ProfileViewBasic, + Vec, + Option, + Option, + Option, + Option, +); + fn build_postview( - post: models::Post, - author: ProfileViewBasic, - labels: Vec, - embed: Option, - threadgate: Option, - viewer: Option, - stats: Option, + (post, author, labels, embed, threadgate, viewer, stats): HydratePostsRet, ) -> PostView { let stats = stats .map(|stats| RecordStats { @@ -135,12 +140,12 @@ impl StatefulHydrator<'_> { let threadgate = self.hydrate_threadgate(threadgate).await; let labels = self.get_label(&post.at_uri).await; - Some(build_postview( + Some(build_postview(( post, author, labels, embed, threadgate, viewer, stats, - )) + ))) } - pub async fn hydrate_posts(&self, posts: Vec) -> HashMap { + async fn hydrate_posts_inner(&self, posts: Vec) -> HashMap { let stats = self.loaders.post_stats.load_many(posts.clone()).await; let posts = self.loaders.posts.load_many(posts).await; @@ -150,8 +155,8 @@ impl StatefulHydrator<'_> { .unzip::<_, _, Vec<_>, Vec<_>>(); let authors = self.hydrate_profiles_basic(authors).await; - let post_labels = self.get_label_many(&post_uris).await; - let viewer_data = self.get_post_viewer_states(&post_uris).await; + let mut post_labels = self.get_label_many(&post_uris).await; + let mut viewer_data = self.get_post_viewer_states(&post_uris).await; let threadgates = posts .values() @@ -159,91 +164,133 @@ impl StatefulHydrator<'_> { .collect(); let threadgates = self.hydrate_threadgates(threadgates).await; - let embeds = self.hydrate_embeds(post_uris).await; + let mut embeds = self.hydrate_embeds(post_uris).await; posts .into_iter() .filter_map(|(uri, (post, threadgate))| { - let author = authors.get(&post.did)?; - let embed = embeds.get(&uri).cloned(); + let author = authors.get(&post.did)?.clone(); + let embed = embeds.remove(&uri); let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); - let labels = post_labels.get(&uri).cloned().unwrap_or_default(); + let labels = post_labels.remove(&uri).unwrap_or_default(); let stats = stats.get(&uri).cloned(); - let viewer = viewer_data.get(&uri).cloned(); + let viewer = viewer_data.remove(&uri); Some(( uri, - build_postview( - post, - author.to_owned(), - labels, - embed, - threadgate, - viewer, - stats, - ), + (post, author, labels, embed, threadgate, viewer, stats), )) }) .collect() } + pub async fn hydrate_posts(&self, posts: Vec) -> HashMap { + self.hydrate_posts_inner(posts) + .await + .into_iter() + .map(|(uri, data)| (uri, build_postview(data))) + .collect() + } + pub async fn hydrate_feed_posts( &self, - posts: Vec, + posts: Vec, author_threads_only: bool, - ) -> HashMap { - let stats = self.loaders.post_stats.load_many(posts.clone()).await; - let posts = self.loaders.posts.load_many(posts).await; - - let (authors, post_uris) = posts - .values() - .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) - .unzip::<_, _, Vec<_>, Vec<_>>(); - let authors = self.hydrate_profiles_basic(authors).await; - - let post_labels = self.get_label_many(&post_uris).await; - let viewer_data = self.get_post_viewer_states(&post_uris).await; - let embeds = self.hydrate_embeds(post_uris.clone()).await; + ) -> Vec { + let post_uris = posts + .iter() + .map(|item| item.post_uri().to_string()) + .collect::>(); + let mut posts_hyd = self.hydrate_posts_inner(post_uris).await; // we shouldn't show the parent when the post violates a threadgate. - let reply_refs = posts + let reply_refs = posts_hyd .values() - .filter(|(post, _)| !post.violates_threadgate) - .flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()]) + .filter(|(post, ..)| !post.violates_threadgate) + .flat_map(|(post, ..)| [post.parent_uri.clone(), post.root_uri.clone()]) .flatten() .collect::>(); - let reply_posts = self.hydrate_posts(reply_refs).await; - // hydrate all the posts. - let mut posts = posts - .into_iter() - .filter_map(|(post_uri, (raw, _))| { - let root = raw.root_uri.clone(); - let parent = raw.parent_uri.clone(); - - let author = authors.get(&raw.did)?; - let embed = embeds.get(&post_uri).cloned(); - let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); - let stats = stats.get(&post_uri).cloned(); - let viewer = viewer_data.get(&post_uri).cloned(); - let post = - build_postview(raw, author.to_owned(), labels, embed, None, viewer, stats); - - Some((post_uri, (post, root, parent))) - }) - .collect::>(); + let repost_profiles = posts + .iter() + .filter_map(|item| item.repost_by()) + .collect::>(); + let profiles_hydrated = self.hydrate_profiles_basic(repost_profiles).await; - post_uris + posts .into_iter() - .filter_map(|post_uri| { - let item = if author_threads_only { - compile_feed_authors_threads_only(&post_uri, &mut posts)? + .filter_map(|item| { + let post = posts_hyd.remove(item.post_uri())?; + let context = item.context(); + + let reply = if let RawFeedItem::Post { .. } = item { + let root_uri = post.0.root_uri.as_ref(); + let parent_uri = post.0.parent_uri.as_ref(); + + let (root, parent) = if author_threads_only { + if root_uri.is_some() && parent_uri.is_some() { + let root = root_uri.and_then(|uri| posts_hyd.get(uri))?; + let parent = parent_uri.and_then(|uri| posts_hyd.get(uri))?; + + let root = build_postview(root.clone()); + let parent = build_postview(parent.clone()); + + (Some(root), Some(parent)) + } else { + (None, None) + } + } else { + let root = root_uri.and_then(|uri| reply_posts.get(uri)).cloned(); + let parent = parent_uri.and_then(|uri| reply_posts.get(uri)).cloned(); + + (root, parent) + }; + + if root_uri.is_some() || parent_uri.is_some() { + Some(ReplyRef { + root: root.map(postview_to_replyref).unwrap_or( + ReplyRefPost::NotFound { + uri: root_uri.unwrap().to_owned(), + not_found: true, + }, + ), + parent: parent.map(postview_to_replyref).unwrap_or( + ReplyRefPost::NotFound { + uri: parent_uri.unwrap().to_owned(), + not_found: true, + }, + ), + grandparent_author: None, + }) + } else { + None + } } else { - compile_feed(&post_uri, &mut posts, &reply_posts)? + None }; - Some((post_uri, item)) + let reason = match item { + RawFeedItem::Repost { uri, by, at, .. } => { + Some(FeedViewPostReason::Repost(FeedReasonRepost { + by: profiles_hydrated.get(&by).cloned()?, + uri: Some(uri), + cid: None, + indexed_at: at, + })) + } + RawFeedItem::Pin { .. } => Some(FeedViewPostReason::Pin), + _ => None, + }; + + let post = build_postview(post); + + Some(FeedViewPost { + post, + reply, + reason, + feed_context: context, + }) }) .collect() } @@ -288,77 +335,46 @@ fn postview_to_replyref(post: PostView) -> ReplyRefPost { } } -type FeedViewPartData = (PostView, Option, Option); - -// this is the 'normal' one that runs in most places -fn compile_feed( - uri: &String, - posts: &mut HashMap, - reply_posts: &HashMap, -) -> Option { - let (post, root_uri, parent_uri) = posts.remove(uri)?; - - let root = root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); - let parent = parent_uri.as_ref().and_then(|uri| reply_posts.get(uri)); - - let reply = if parent_uri.is_some() && root_uri.is_some() { - Some(ReplyRef { - root: root - .cloned() - .map(postview_to_replyref) - .unwrap_or(ReplyRefPost::NotFound { - uri: root_uri.as_ref().unwrap().clone(), - not_found: true, - }), - parent: parent - .cloned() - .map(postview_to_replyref) - .unwrap_or(ReplyRefPost::NotFound { - uri: parent_uri.as_ref().unwrap().clone(), - not_found: true, - }), - grandparent_author: None, - }) - } else { - None - }; - - Some(FeedViewPost { - post, - reply, - reason: None, - feed_context: None, - }) +#[derive(Debug)] +pub enum RawFeedItem { + Pin { + uri: String, + context: Option, + }, + Post { + uri: String, + context: Option, + }, + Repost { + uri: String, + post: String, + by: String, + at: chrono::DateTime, + context: Option, + }, } -// and this one runs in getAuthorFeed when filter=PostsAndAuthorThreads -fn compile_feed_authors_threads_only( - uri: &String, - posts: &mut HashMap, -) -> Option { - let (post, root_uri, parent_uri) = posts.get(uri)?.clone(); - - let root = root_uri.as_ref().and_then(|root| posts.get(root)); - let parent = parent_uri.as_ref().and_then(|parent| posts.get(parent)); - - let reply = if parent_uri.is_some() && root_uri.is_some() { - Some(ReplyRef { - root: root - .cloned() - .map(|(post, _, _)| postview_to_replyref(post))?, - parent: parent - .cloned() - .map(|(post, _, _)| postview_to_replyref(post))?, - grandparent_author: None, - }) - } else { - None - }; - - Some(FeedViewPost { - post, - reply, - reason: None, - feed_context: None, - }) +impl RawFeedItem { + fn post_uri(&self) -> &str { + match self { + RawFeedItem::Pin { uri, .. } => uri, + RawFeedItem::Post { uri, .. } => uri, + RawFeedItem::Repost { post, .. } => post, + } + } + + fn repost_by(&self) -> Option { + match self { + RawFeedItem::Repost { by, .. } => Some(by.clone()), + _ => None, + } + } + + fn context(&self) -> Option { + match self { + RawFeedItem::Pin { context, .. } => context.clone(), + RawFeedItem::Post { context, .. } => context.clone(), + RawFeedItem::Repost { context, .. } => context.clone(), + } + } } diff --git a/parakeet/src/xrpc/app_bsky/feed/likes.rs b/parakeet/src/xrpc/app_bsky/feed/likes.rs index 37a352f..d56c9aa 100644 --- a/parakeet/src/xrpc/app_bsky/feed/likes.rs +++ b/parakeet/src/xrpc/app_bsky/feed/likes.rs @@ -1,3 +1,4 @@ +use crate::hydration::posts::RawFeedItem; use crate::hydration::StatefulHydrator; use crate::xrpc::error::{Error, XrpcResult}; use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; @@ -57,17 +58,15 @@ pub async fn get_actor_likes( .last() .map(|(last, _)| last.timestamp_millis().to_string()); - let at_uris = results + let raw_feed = results .iter() - .map(|(_, uri)| uri.clone()) + .map(|(_, uri)| RawFeedItem::Post { + uri: uri.clone(), + context: None, + }) .collect::>(); - let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; - - let feed: Vec<_> = results - .into_iter() - .filter_map(|(_, uri)| posts.remove(&uri)) - .collect(); + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; Ok(Json(FeedRes { cursor, feed })) } diff --git a/parakeet/src/xrpc/app_bsky/feed/posts.rs b/parakeet/src/xrpc/app_bsky/feed/posts.rs index 5c9d459..f27bf44 100644 --- a/parakeet/src/xrpc/app_bsky/feed/posts.rs +++ b/parakeet/src/xrpc/app_bsky/feed/posts.rs @@ -1,3 +1,4 @@ +use crate::hydration::posts::RawFeedItem; use crate::hydration::StatefulHydrator; use crate::xrpc::app_bsky::graph::lists::ListWithCursorQuery; use crate::xrpc::error::{Error, XrpcResult}; @@ -16,8 +17,8 @@ use diesel::prelude::*; use diesel_async::{AsyncPgConnection, RunQueryDsl}; use lexica::app_bsky::actor::ProfileView; use lexica::app_bsky::feed::{ - BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, - PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, + BlockedAuthor, FeedSkeletonResponse, FeedViewPost, PostView, SkeletonReason, ThreadViewPost, + ThreadViewPostType, ThreadgateView, }; use parakeet_db::{models, schema}; use reqwest::Url; @@ -113,7 +114,6 @@ pub async fn get_feed( let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); - let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect(); let repost_skeleton = skeleton .feed .iter() @@ -122,30 +122,36 @@ pub async fn get_feed( _ => None, }) .collect::>(); + let mut repost_data = get_skeleton_repost_data(&mut conn, repost_skeleton).await; - let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; - let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await; - - let feed = skeleton + let raw_feed = skeleton .feed .into_iter() - .filter_map(|item| { - let mut post = posts.remove(&item.post)?; - let reason = match item.reason { - Some(SkeletonReason::Repost { repost }) => { - repost_data.remove(&repost).map(FeedViewPostReason::Repost) - } - Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin), - _ => None, - }; - - post.reason = reason; - post.feed_context = item.feed_context; - - Some(post) + .filter_map(|v| match v.reason { + Some(SkeletonReason::Repost { repost }) => { + repost_data + .remove_entry(&repost) + .map(|(uri, (by, at))| RawFeedItem::Repost { + uri, + post: v.post, + by, + at: at.and_utc(), + context: v.feed_context, + }) + } + Some(SkeletonReason::Pin {}) => Some(RawFeedItem::Pin { + uri: v.post, + context: v.feed_context, + }), + None => Some(RawFeedItem::Post { + uri: v.post, + context: v.feed_context, + }), }) .collect(); + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; + Ok(Json(FeedRes { cursor: skeleton.cursor, feed, @@ -204,10 +210,7 @@ pub async fn get_author_feed( let pin = match query.include_pins && query.cursor.is_none() { false => None, - true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? { - Some(post) => hyd.hydrate_post(post).await, - None => None, - }, + true => crate::db::get_pinned_post_uri(&mut conn, &did).await?, }; let limit = query.limit.unwrap_or(50).clamp(1, 100); @@ -259,48 +262,36 @@ pub async fn get_author_feed( .last() .map(|item| item.sort_at.timestamp_millis().to_string()); - let at_uris = results - .iter() - .map(|item| item.post.clone()) - .collect::>(); - - // get the actor for if we have reposted - let profile = hyd - .hydrate_profile_basic(did) - .await - .ok_or(Error::server_error(None))?; - - let mut posts = hyd.hydrate_feed_posts(at_uris, author_threads_only).await; - - let mut feed: Vec<_> = results + let mut raw_feed = results .into_iter() - .filter_map(|item| { - posts.remove(&item.post).map(|mut fvp| { - if item.typ == "repost" { - fvp.reason = Some(FeedViewPostReason::Repost(FeedReasonRepost { - by: profile.clone(), - uri: Some(item.uri), - cid: Some(item.cid), - indexed_at: Default::default(), - })) - } - fvp - }) + .filter_map(|item| match &*item.typ { + "post" => Some(RawFeedItem::Post { + uri: item.post, + context: None, + }), + "repost" => Some(RawFeedItem::Repost { + uri: item.uri, + post: item.post, + by: item.did, + at: item.sort_at, + context: None, + }), + _ => None, }) - .collect(); + .collect::>(); if let Some(post) = pin { - feed.insert( + raw_feed.insert( 0, - FeedViewPost { - post, - reply: None, - reason: Some(FeedViewPostReason::Pin), - feed_context: None, + RawFeedItem::Pin { + uri: post, + context: None, }, ); } + let feed = hyd.hydrate_feed_posts(raw_feed, author_threads_only).await; + Ok(Json(FeedRes { cursor, feed })) } @@ -342,17 +333,15 @@ pub async fn get_list_feed( .last() .map(|(last, _)| last.timestamp_millis().to_string()); - let at_uris = results + let raw_feed = results .iter() - .map(|(_, uri)| uri.clone()) + .map(|(_, uri)| RawFeedItem::Post { + uri: uri.clone(), + context: None, + }) .collect::>(); - let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; - - let feed = results - .into_iter() - .filter_map(|(_, uri)| posts.remove(&uri)) - .collect(); + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; Ok(Json(FeedRes { cursor, feed })) } @@ -686,11 +675,10 @@ async fn get_feed_skeleton( } } -async fn get_skeleton_repost_data<'a>( +async fn get_skeleton_repost_data( conn: &mut AsyncPgConnection, - hyd: &StatefulHydrator<'a>, reposts: Vec, -) -> HashMap { +) -> HashMap { let Ok(repost_data) = schema::records::table .select(( schema::records::at_uri, @@ -704,23 +692,9 @@ async fn get_skeleton_repost_data<'a>( return HashMap::new(); }; - let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect(); - let profiles = hyd.hydrate_profiles_basic(profiles).await; - repost_data .into_iter() - .filter_map(|(uri, did, indexed_at)| { - let by = profiles.get(&did).cloned()?; - - let repost = FeedReasonRepost { - by, - uri: Some(uri.clone()), - cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it. - indexed_at: indexed_at.and_utc(), - }; - - Some((uri, repost)) - }) + .map(|(uri, did, at)| (uri, (did, at))) .collect() } -- 2.43.0