Post Tweaks #1

closed
opened by mia.pds.parakeet.at targeting main from post-tweaks

"tweaks" might be the understatement of the year. This branch contains:

  • a timing fix for postgates
  • working threadgates
  • storing #tags (and mentions) in the posts table
  • reposts in getAuthorFeed
  • huge feed hydration refactors to make getAuthorFeed work properly.
Changed files
+445 -340
consumer
src
migrations
2025-09-27-171241_post-tweaks
parakeet
src
hydration
sql
xrpc
app_bsky
feed
parakeet-db
src
+1 -1
consumer/src/indexer/mod.rs
···
});
let labels = record.labels.clone();
-
db::post_insert(conn, at_uri, repo, cid, record).await?;
+
db::post_insert(conn, at_uri, repo, cid, record, false).await?;
if let Some(labels) = labels {
db::maintain_self_labels(conn, repo, Some(cid), at_uri, labels).await?;
}
+2
consumer/src/db/mod.rs
···
mod actor;
mod backfill;
pub mod copy;
+
mod gates;
mod labels;
mod record;
pub use actor::*;
pub use backfill::*;
+
pub use gates::*;
pub use labels::*;
pub use record::*;
+2 -102
consumer/src/db/record.rs
···
// if there is a root, we need to check for the presence of a threadgate.
let violates_threadgate = match &root_uri {
Some(root) => {
-
post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await?
+
super::post_enforce_threadgate(conn, root, repo, rec.created_at, is_backfill).await?
}
None => false,
};
···
.await
}
-
pub async fn post_enforce_threadgate<C: GenericClient>(
-
conn: &mut C,
-
root: &str,
-
post_author: &str,
-
post_created_at: DateTime<Utc>,
-
is_backfill: bool,
-
) -> PgResult<bool> {
-
// check if the root and the current post are the same author
-
// strip "at://" then break into parts by '/'
-
let parts = root[5..].split('/').collect::<Vec<_>>();
-
let root_author = parts[0];
-
if root_author == post_author {
-
return Ok(false);
-
}
-
-
let tg_data = threadgate_get(conn, root).await?;
-
-
let Some((created_at, allow, allow_lists)) = tg_data else {
-
return Ok(false);
-
};
-
-
// when backfilling, there's no point continuing if the record is dated before the threadgate
-
if is_backfill && post_created_at < created_at {
-
return Ok(false);
-
}
-
-
if allow.is_empty() {
-
return Ok(true);
-
}
-
-
let allow: HashSet<String> = HashSet::from_iter(allow);
-
-
if allow.contains("app.bsky.feed.threadgate#followerRule")
-
|| allow.contains("app.bsky.feed.threadgate#followingRule")
-
{
-
let profile_state: Option<(bool, bool)> = conn
-
.query_opt(
-
"SELECT following IS NOT NULL, followed IS NOT NULL FROM profile_states WHERE did=$1 AND subject=$2",
-
&[&root_author, &post_author],
-
)
-
.await?
-
.map(|v| (v.get(0), v.get(1)));
-
-
if let Some((following, followed)) = profile_state {
-
if allow.contains("app.bsky.feed.threadgate#followerRule") && followed {
-
return Ok(false);
-
}
-
-
if allow.contains("app.bsky.feed.threadgate#followingRule") && following {
-
return Ok(false);
-
}
-
}
-
}
-
-
// check mentions
-
if allow.contains("app.bsky.feed.threadgate#mentionRule") {
-
let mentions: Vec<String> = conn
-
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
-
.await?
-
.map(|r| r.get(0))
-
.unwrap_or_default();
-
-
if mentions.contains(&post_author.to_owned()) {
-
return Ok(false);
-
}
-
}
-
-
if allow.contains("app.bsky.feed.threadgate#listRule") {
-
if allow_lists.is_empty() {
-
return Ok(true);
-
}
-
-
let count: i64 = conn
-
.query_one(
-
"SELECT count(*) FROM list_items WHERE list_uri=ANY($1) AND subject=$2",
-
&[&allow_lists, &post_author],
-
)
-
.await?
-
.get(0);
-
if count == 0 {
-
return Ok(true);
-
}
-
}
-
-
Ok(false)
-
}
-
pub async fn post_get_info_for_delete<C: GenericClient>(
conn: &mut C,
at_uri: &str,
···
.await
}
-
pub async fn postgate_maintain_detaches<C: GenericClient>(
-
conn: &mut C,
-
post: &str,
-
detached: &[String],
-
disable_effective: Option<NaiveDateTime>,
-
) -> PgExecResult {
-
conn.execute(
-
"SELECT maintain_postgates($1, $2, $3)",
-
&[&post, &detached, &disable_effective],
-
)
-
.await
-
}
-
pub async fn profile_upsert<C: GenericClient>(
conn: &mut C,
repo: &str,
···
.await
}
-
async fn threadgate_get<C: GenericClient>(
+
pub async fn threadgate_get<C: GenericClient>(
conn: &mut C,
post: &str,
) -> PgOptResult<(DateTime<Utc>, Vec<String>, Vec<String>)> {
+9 -4
consumer/src/indexer/records.rs
···
pub hidden_replies: Vec<String>,
}
+
pub const THREADGATE_RULE_MENTION: &str = "app.bsky.feed.threadgate#mentionRule";
+
pub const THREADGATE_RULE_FOLLOWER: &str = "app.bsky.feed.threadgate#followerRule";
+
pub const THREADGATE_RULE_FOLLOWING: &str = "app.bsky.feed.threadgate#followingRule";
+
pub const THREADGATE_RULE_LIST: &str = "app.bsky.feed.threadgate#listRule";
+
#[derive(Debug, Deserialize, Serialize)]
#[serde(tag = "$type")]
pub enum ThreadgateRule {
···
impl ThreadgateRule {
pub fn as_str(&self) -> &'static str {
match self {
-
ThreadgateRule::Mention => "app.bsky.feed.threadgate#mentionRule",
-
ThreadgateRule::Follower => "app.bsky.feed.threadgate#followerRule",
-
ThreadgateRule::Following => "app.bsky.feed.threadgate#followingRule",
-
ThreadgateRule::List { .. } => "app.bsky.feed.threadgate#listRule",
+
ThreadgateRule::Mention => THREADGATE_RULE_MENTION,
+
ThreadgateRule::Follower => THREADGATE_RULE_FOLLOWER,
+
ThreadgateRule::Following => THREADGATE_RULE_FOLLOWING,
+
ThreadgateRule::List { .. } => THREADGATE_RULE_LIST,
}
}
}
+6 -1
consumer/src/backfill/mod.rs
···
follows: Vec<(String, String, DateTime<Utc>)>,
list_items: Vec<(String, records::AppBskyGraphListItem)>,
verifications: Vec<(String, Cid, records::AppBskyGraphVerification)>,
+
threadgates: Vec<(String, Cid, records::AppBskyFeedThreadgate)>, // not COPY'd but needs to be kept until last.
records: Vec<(String, Cid)>,
}
impl CopyStore {
async fn submit(self, t: &mut Transaction<'_>, did: &str) -> Result<(), tokio_postgres::Error> {
db::copy::copy_likes(t, did, self.likes).await?;
-
db::copy::copy_posts(t, did, self.posts).await?;
db::copy::copy_reposts(t, did, self.reposts).await?;
db::copy::copy_blocks(t, did, self.blocks).await?;
db::copy::copy_follows(t, did, self.follows).await?;
db::copy::copy_list_items(t, self.list_items).await?;
db::copy::copy_verification(t, did, self.verifications).await?;
+
db::copy::copy_posts(t, did, self.posts).await?;
+
for (at_uri, cid, record) in self.threadgates {
+
db::threadgate_enforce_backfill(t, did, &record).await?;
+
db::threadgate_upsert(t, &at_uri, cid, record).await?;
+
}
db::copy::copy_records(t, did, self.records).await?;
Ok(())
+103 -2
consumer/src/db/gates.rs
···
use super::{PgExecResult, PgResult};
use crate::indexer::records::{
-
THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING, THREADGATE_RULE_LIST,
-
THREADGATE_RULE_MENTION,
+
AppBskyFeedThreadgate, ThreadgateRule, THREADGATE_RULE_FOLLOWER, THREADGATE_RULE_FOLLOWING,
+
THREADGATE_RULE_LIST, THREADGATE_RULE_MENTION,
};
use chrono::prelude::*;
use chrono::{DateTime, Utc};
···
)
.await
}
+
+
// variant of post_enforce_threadgate that runs when backfilling to clean up any posts already in DB
+
pub async fn threadgate_enforce_backfill<C: GenericClient>(
+
conn: &mut C,
+
root_author: &str,
+
threadgate: &AppBskyFeedThreadgate,
+
) -> PgExecResult {
+
// pull out allow - if it's None we can skip this gate.
+
let Some(allow) = threadgate.allow.as_ref() else {
+
return Ok(0);
+
};
+
+
let root = &threadgate.post;
+
+
if allow.is_empty() {
+
// blind update everything
+
return conn.execute(
+
"UPDATE posts SET violates_threadgate=TRUE WHERE root_uri=$1 AND did != $2 AND created_at >= $3",
+
&[&root, &root_author, &threadgate.created_at],
+
).await;
+
}
+
+
// pull authors with our root_uri where the author is not the root author and are dated after created_at
+
// this is mutable because we'll remove ALLOWED dids
+
let mut dids: HashSet<String> = conn
+
.query(
+
"SELECT DISTINCT did FROM posts WHERE root_uri=$1 AND did != $2 AND created_at >= $3",
+
&[&root, &root_author, &threadgate.created_at],
+
)
+
.await?
+
.into_iter()
+
.map(|row| row.get(0))
+
.collect();
+
+
// this will be empty if there are no replies.
+
if dids.is_empty() {
+
return Ok(0);
+
}
+
+
let allowed_lists = allow
+
.iter()
+
.filter_map(|rule| match rule {
+
ThreadgateRule::List { list } => Some(list),
+
_ => None,
+
})
+
.collect::<Vec<_>>();
+
+
let allow: HashSet<_> = HashSet::from_iter(allow.into_iter().map(|v| v.as_str()));
+
+
if allow.contains(THREADGATE_RULE_FOLLOWER) && !dids.is_empty() {
+
let current_dids: Vec<_> = dids.iter().collect();
+
+
let res = conn.query(
+
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND followed IS NOT NULL",
+
&[&root_author, &current_dids]
+
).await?;
+
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
+
}
+
+
if allow.contains(THREADGATE_RULE_FOLLOWING) && !dids.is_empty() {
+
let current_dids: Vec<_> = dids.iter().collect();
+
+
let res = conn.query(
+
"SELECT subject FROM profile_states WHERE did=$1 AND subject=ANY($2) AND following IS NOT NULL",
+
&[&root_author, &current_dids]
+
).await?;
+
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
+
}
+
+
if allow.contains(THREADGATE_RULE_MENTION) && !dids.is_empty() {
+
let mentions: Vec<String> = conn
+
.query_opt("SELECT mentions FROM posts WHERE at_uri=$1", &[&root])
+
.await?
+
.map(|r| r.get(0))
+
.unwrap_or_default();
+
+
dids = &dids - &HashSet::from_iter(mentions);
+
}
+
+
if allow.contains(THREADGATE_RULE_LIST) && !dids.is_empty() {
+
let current_dids: Vec<_> = dids.iter().collect();
+
+
let res = conn
+
.query(
+
"SELECT subject FROM list_items WHERE list_uri = ANY($1) AND subject = ANY($2)",
+
&[&allowed_lists, &current_dids],
+
)
+
.await?;
+
+
dids = &dids - &HashSet::from_iter(res.into_iter().map(|r| r.get(0)));
+
}
+
+
let dids = dids.into_iter().collect::<Vec<_>>();
+
+
conn.execute(
+
"UPDATE posts SET violates_threadgate=TRUE WHERE root_uri = $1 AND did = ANY($2) AND created_at >= $3",
+
&[&threadgate.post, &dids, &threadgate.created_at]
+
).await
+
}
+2 -2
parakeet/src/sql/thread.sql
···
with recursive thread as (select at_uri, parent_uri, root_uri, 0 as depth
from posts
-
where parent_uri = $1
+
where parent_uri = $1 and violates_threadgate=FALSE
union all
select p.at_uri, p.parent_uri, p.root_uri, thread.depth + 1
from posts p
join thread on p.parent_uri = thread.at_uri
-
where thread.depth <= $2)
+
where thread.depth <= $2 and p.violates_threadgate=FALSE)
select *
from thread
order by depth desc;
+4 -2
parakeet/src/sql/thread_parent.sql
···
with recursive parents as (select at_uri, cid, parent_uri, root_uri, 0 as depth
from posts
-
where at_uri = (select parent_uri from posts where at_uri = $1)
+
where
+
at_uri = (select parent_uri from posts where at_uri = $1 and violates_threadgate = FALSE)
union all
select p.at_uri, p.cid, p.parent_uri, p.root_uri, parents.depth + 1
from posts p
join parents on p.at_uri = parents.parent_uri
-
where parents.depth <= $2)
+
where parents.depth <= $2
+
and p.violates_threadgate = FALSE)
select *
from parents
order by depth desc;
+13 -1
migrations/2025-09-27-171241_post-tweaks/down.sql
···
alter table posts
drop column mentions,
-
drop column violates_threadgate;
+
drop column violates_threadgate;
+
+
drop trigger t_author_feed_ins_post on posts;
+
drop trigger t_author_feed_del_post on posts;
+
drop trigger t_author_feed_ins_repost on reposts;
+
drop trigger t_author_feed_del_repost on reposts;
+
+
drop function f_author_feed_ins_post;
+
drop function f_author_feed_del_post;
+
drop function f_author_feed_ins_repost;
+
drop function f_author_feed_del_repost;
+
+
drop table author_feeds;
+77 -1
migrations/2025-09-27-171241_post-tweaks/up.sql
···
alter table posts
add column mentions text[],
-
add column violates_threadgate bool not null default false;
+
add column violates_threadgate bool not null default false;
+
+
create table author_feeds
+
(
+
uri text primary key,
+
cid text not null,
+
post text not null,
+
did text not null,
+
typ text not null,
+
sort_at timestamptz not null
+
);
+
+
-- author_feeds post triggers
+
create function f_author_feed_ins_post() returns trigger
+
language plpgsql as
+
$$
+
begin
+
insert into author_feeds (uri, cid, post, did, typ, sort_at)
+
VALUES (NEW.at_uri, NEW.cid, NEW.at_uri, NEW.did, 'post', NEW.created_at)
+
on conflict do nothing;
+
return NEW;
+
end;
+
$$;
+
+
create trigger t_author_feed_ins_post
+
before insert
+
on posts
+
for each row
+
execute procedure f_author_feed_ins_post();
+
+
create function f_author_feed_del_post() returns trigger
+
language plpgsql as
+
$$
+
begin
+
delete from author_feeds where did = OLD.did and item = OLD.at_uri and typ = 'post';
+
return OLD;
+
end;
+
$$;
+
+
create trigger t_author_feed_del_post
+
before delete
+
on posts
+
for each row
+
execute procedure f_author_feed_del_post();
+
+
-- author_feeds repost triggers
+
create function f_author_feed_ins_repost() returns trigger
+
language plpgsql as
+
$$
+
begin
+
insert into author_feeds (uri, cid, post, did, typ, sort_at)
+
VALUES ('at://' || NEW.did || 'app.bsky.feed.repost' || NEW.rkey, NEW.post_cid, NEW.post, NEW.did, 'repost', NEW.created_at)
+
on conflict do nothing;
+
return NEW;
+
end;
+
$$;
+
+
create trigger t_author_feed_ins_repost
+
before insert
+
on reposts
+
for each row
+
execute procedure f_author_feed_ins_repost();
+
+
create function f_author_feed_del_repost() returns trigger
+
language plpgsql as
+
$$
+
begin
+
delete from author_feeds where did = OLD.did and item = OLD.post and typ = 'repost';
+
return OLD;
+
end;
+
$$;
+
+
create trigger t_author_feed_del_repost
+
before delete
+
on reposts
+
for each row
+
execute procedure f_author_feed_del_repost();
+12
parakeet-db/src/schema.rs
···
}
}
+
diesel::table! {
+
author_feeds (uri) {
+
uri -> Text,
+
cid -> Text,
+
post -> Text,
+
did -> Text,
+
typ -> Text,
+
sort_at -> Timestamptz,
+
}
+
}
+
diesel::table! {
backfill (repo, repo_ver) {
repo -> Text,
···
diesel::allow_tables_to_appear_in_same_query!(
actors,
+
author_feeds,
backfill,
backfill_jobs,
blocks,
+156 -140
parakeet/src/hydration/posts.rs
···
use lexica::app_bsky::actor::ProfileViewBasic;
use lexica::app_bsky::embed::Embed;
use lexica::app_bsky::feed::{
-
BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView,
+
BlockedAuthor, FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView, PostViewerState,
+
ReplyRef, ReplyRefPost, ThreadgateView,
};
use lexica::app_bsky::graph::ListViewBasic;
use lexica::app_bsky::RecordStats;
···
}
}
+
type HydratePostsRet = (
+
models::Post,
+
ProfileViewBasic,
+
Vec<models::Label>,
+
Option<Embed>,
+
Option<ThreadgateView>,
+
Option<PostViewerState>,
+
Option<PostStats>,
+
);
+
fn build_postview(
-
post: models::Post,
-
author: ProfileViewBasic,
-
labels: Vec<models::Label>,
-
embed: Option<Embed>,
-
threadgate: Option<ThreadgateView>,
-
viewer: Option<PostViewerState>,
-
stats: Option<PostStats>,
+
(post, author, labels, embed, threadgate, viewer, stats): HydratePostsRet,
) -> PostView {
let stats = stats
.map(|stats| RecordStats {
···
let threadgate = self.hydrate_threadgate(threadgate).await;
let labels = self.get_label(&post.at_uri).await;
-
Some(build_postview(
+
Some(build_postview((
post, author, labels, embed, threadgate, viewer, stats,
-
))
+
)))
}
-
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
+
async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> {
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
let posts = self.loaders.posts.load_many(posts).await;
···
.unzip::<_, _, Vec<_>, Vec<_>>();
let authors = self.hydrate_profiles_basic(authors).await;
-
let post_labels = self.get_label_many(&post_uris).await;
-
let viewer_data = self.get_post_viewer_states(&post_uris).await;
+
let mut post_labels = self.get_label_many(&post_uris).await;
+
let mut viewer_data = self.get_post_viewer_states(&post_uris).await;
let threadgates = posts
.values()
···
.collect();
let threadgates = self.hydrate_threadgates(threadgates).await;
-
let embeds = self.hydrate_embeds(post_uris).await;
+
let mut embeds = self.hydrate_embeds(post_uris).await;
posts
.into_iter()
.filter_map(|(uri, (post, threadgate))| {
-
let author = authors.get(&post.did)?;
-
let embed = embeds.get(&uri).cloned();
+
let author = authors.get(&post.did)?.clone();
+
let embed = embeds.remove(&uri);
let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned());
-
let labels = post_labels.get(&uri).cloned().unwrap_or_default();
+
let labels = post_labels.remove(&uri).unwrap_or_default();
let stats = stats.get(&uri).cloned();
-
let viewer = viewer_data.get(&uri).cloned();
+
let viewer = viewer_data.remove(&uri);
Some((
uri,
-
build_postview(
-
post,
-
author.to_owned(),
-
labels,
-
embed,
-
threadgate,
-
viewer,
-
stats,
-
),
+
(post, author, labels, embed, threadgate, viewer, stats),
))
})
.collect()
}
+
pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> {
+
self.hydrate_posts_inner(posts)
+
.await
+
.into_iter()
+
.map(|(uri, data)| (uri, build_postview(data)))
+
.collect()
+
}
+
pub async fn hydrate_feed_posts(
&self,
-
posts: Vec<String>,
+
posts: Vec<RawFeedItem>,
author_threads_only: bool,
-
) -> HashMap<String, FeedViewPost> {
-
let stats = self.loaders.post_stats.load_many(posts.clone()).await;
-
let posts = self.loaders.posts.load_many(posts).await;
-
-
let (authors, post_uris) = posts
-
.values()
-
.map(|(post, _)| (post.did.clone(), post.at_uri.clone()))
-
.unzip::<_, _, Vec<_>, Vec<_>>();
-
let authors = self.hydrate_profiles_basic(authors).await;
-
-
let post_labels = self.get_label_many(&post_uris).await;
-
let viewer_data = self.get_post_viewer_states(&post_uris).await;
-
let embeds = self.hydrate_embeds(post_uris.clone()).await;
+
) -> Vec<FeedViewPost> {
+
let post_uris = posts
+
.iter()
+
.map(|item| item.post_uri().to_string())
+
.collect::<Vec<_>>();
+
let mut posts_hyd = self.hydrate_posts_inner(post_uris).await;
// we shouldn't show the parent when the post violates a threadgate.
-
let reply_refs = posts
+
let reply_refs = posts_hyd
.values()
-
.filter(|(post, _)| !post.violates_threadgate)
-
.flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()])
+
.filter(|(post, ..)| !post.violates_threadgate)
+
.flat_map(|(post, ..)| [post.parent_uri.clone(), post.root_uri.clone()])
.flatten()
.collect::<Vec<_>>();
-
let reply_posts = self.hydrate_posts(reply_refs).await;
-
// hydrate all the posts.
-
let mut posts = posts
-
.into_iter()
-
.filter_map(|(post_uri, (raw, _))| {
-
let root = raw.root_uri.clone();
-
let parent = raw.parent_uri.clone();
-
-
let author = authors.get(&raw.did)?;
-
let embed = embeds.get(&post_uri).cloned();
-
let labels = post_labels.get(&post_uri).cloned().unwrap_or_default();
-
let stats = stats.get(&post_uri).cloned();
-
let viewer = viewer_data.get(&post_uri).cloned();
-
let post =
-
build_postview(raw, author.to_owned(), labels, embed, None, viewer, stats);
-
-
Some((post_uri, (post, root, parent)))
-
})
-
.collect::<HashMap<_, _>>();
+
let repost_profiles = posts
+
.iter()
+
.filter_map(|item| item.repost_by())
+
.collect::<Vec<_>>();
+
let profiles_hydrated = self.hydrate_profiles_basic(repost_profiles).await;
-
post_uris
+
posts
.into_iter()
-
.filter_map(|post_uri| {
-
let item = if author_threads_only {
-
compile_feed_authors_threads_only(&post_uri, &mut posts)?
+
.filter_map(|item| {
+
let post = posts_hyd.remove(item.post_uri())?;
+
let context = item.context();
+
+
let reply = if let RawFeedItem::Post { .. } = item {
+
let root_uri = post.0.root_uri.as_ref();
+
let parent_uri = post.0.parent_uri.as_ref();
+
+
let (root, parent) = if author_threads_only {
+
if root_uri.is_some() && parent_uri.is_some() {
+
let root = root_uri.and_then(|uri| posts_hyd.get(uri))?;
+
let parent = parent_uri.and_then(|uri| posts_hyd.get(uri))?;
+
+
let root = build_postview(root.clone());
+
let parent = build_postview(parent.clone());
+
+
(Some(root), Some(parent))
+
} else {
+
(None, None)
+
}
+
} else {
+
let root = root_uri.and_then(|uri| reply_posts.get(uri)).cloned();
+
let parent = parent_uri.and_then(|uri| reply_posts.get(uri)).cloned();
+
+
(root, parent)
+
};
+
+
if root_uri.is_some() || parent_uri.is_some() {
+
Some(ReplyRef {
+
root: root.map(postview_to_replyref).unwrap_or(
+
ReplyRefPost::NotFound {
+
uri: root_uri.unwrap().to_owned(),
+
not_found: true,
+
},
+
),
+
parent: parent.map(postview_to_replyref).unwrap_or(
+
ReplyRefPost::NotFound {
+
uri: parent_uri.unwrap().to_owned(),
+
not_found: true,
+
},
+
),
+
grandparent_author: None,
+
})
+
} else {
+
None
+
}
} else {
-
compile_feed(&post_uri, &mut posts, &reply_posts)?
+
None
};
-
Some((post_uri, item))
+
let reason = match item {
+
RawFeedItem::Repost { uri, by, at, .. } => {
+
Some(FeedViewPostReason::Repost(FeedReasonRepost {
+
by: profiles_hydrated.get(&by).cloned()?,
+
uri: Some(uri),
+
cid: None,
+
indexed_at: at,
+
}))
+
}
+
RawFeedItem::Pin { .. } => Some(FeedViewPostReason::Pin),
+
_ => None,
+
};
+
+
let post = build_postview(post);
+
+
Some(FeedViewPost {
+
post,
+
reply,
+
reason,
+
feed_context: context,
+
})
})
.collect()
}
···
}
}
-
type FeedViewPartData = (PostView, Option<String>, Option<String>);
-
-
// this is the 'normal' one that runs in most places
-
fn compile_feed(
-
uri: &String,
-
posts: &mut HashMap<String, FeedViewPartData>,
-
reply_posts: &HashMap<String, PostView>,
-
) -> Option<FeedViewPost> {
-
let (post, root_uri, parent_uri) = posts.remove(uri)?;
-
-
let root = root_uri.as_ref().and_then(|uri| reply_posts.get(uri));
-
let parent = parent_uri.as_ref().and_then(|uri| reply_posts.get(uri));
-
-
let reply = if parent_uri.is_some() && root_uri.is_some() {
-
Some(ReplyRef {
-
root: root
-
.cloned()
-
.map(postview_to_replyref)
-
.unwrap_or(ReplyRefPost::NotFound {
-
uri: root_uri.as_ref().unwrap().clone(),
-
not_found: true,
-
}),
-
parent: parent
-
.cloned()
-
.map(postview_to_replyref)
-
.unwrap_or(ReplyRefPost::NotFound {
-
uri: parent_uri.as_ref().unwrap().clone(),
-
not_found: true,
-
}),
-
grandparent_author: None,
-
})
-
} else {
-
None
-
};
-
-
Some(FeedViewPost {
-
post,
-
reply,
-
reason: None,
-
feed_context: None,
-
})
+
#[derive(Debug)]
+
pub enum RawFeedItem {
+
Pin {
+
uri: String,
+
context: Option<String>,
+
},
+
Post {
+
uri: String,
+
context: Option<String>,
+
},
+
Repost {
+
uri: String,
+
post: String,
+
by: String,
+
at: chrono::DateTime<chrono::Utc>,
+
context: Option<String>,
+
},
}
-
// and this one runs in getAuthorFeed when filter=PostsAndAuthorThreads
-
fn compile_feed_authors_threads_only(
-
uri: &String,
-
posts: &mut HashMap<String, FeedViewPartData>,
-
) -> Option<FeedViewPost> {
-
let (post, root_uri, parent_uri) = posts.get(uri)?.clone();
-
-
let root = root_uri.as_ref().and_then(|root| posts.get(root));
-
let parent = parent_uri.as_ref().and_then(|parent| posts.get(parent));
-
-
let reply = if parent_uri.is_some() && root_uri.is_some() {
-
Some(ReplyRef {
-
root: root
-
.cloned()
-
.map(|(post, _, _)| postview_to_replyref(post))?,
-
parent: parent
-
.cloned()
-
.map(|(post, _, _)| postview_to_replyref(post))?,
-
grandparent_author: None,
-
})
-
} else {
-
None
-
};
-
-
Some(FeedViewPost {
-
post,
-
reply,
-
reason: None,
-
feed_context: None,
-
})
+
impl RawFeedItem {
+
fn post_uri(&self) -> &str {
+
match self {
+
RawFeedItem::Pin { uri, .. } => uri,
+
RawFeedItem::Post { uri, .. } => uri,
+
RawFeedItem::Repost { post, .. } => post,
+
}
+
}
+
+
fn repost_by(&self) -> Option<String> {
+
match self {
+
RawFeedItem::Repost { by, .. } => Some(by.clone()),
+
_ => None,
+
}
+
}
+
+
fn context(&self) -> Option<String> {
+
match self {
+
RawFeedItem::Pin { context, .. } => context.clone(),
+
RawFeedItem::Post { context, .. } => context.clone(),
+
RawFeedItem::Repost { context, .. } => context.clone(),
+
}
+
}
}
+58 -84
parakeet/src/xrpc/app_bsky/feed/posts.rs
···
+
use crate::hydration::posts::RawFeedItem;
use crate::hydration::StatefulHydrator;
use crate::xrpc::app_bsky::graph::lists::ListWithCursorQuery;
use crate::xrpc::error::{Error, XrpcResult};
···
use diesel_async::{AsyncPgConnection, RunQueryDsl};
use lexica::app_bsky::actor::ProfileView;
use lexica::app_bsky::feed::{
-
BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason,
-
PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView,
+
BlockedAuthor, FeedSkeletonResponse, FeedViewPost, PostView, SkeletonReason, ThreadViewPost,
+
ThreadViewPostType, ThreadgateView,
};
use parakeet_db::{models, schema};
use reqwest::Url;
···
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
-
let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect();
let repost_skeleton = skeleton
.feed
.iter()
···
_ => None,
})
.collect::<Vec<_>>();
+
let mut repost_data = get_skeleton_repost_data(&mut conn, repost_skeleton).await;
-
let mut posts = hyd.hydrate_feed_posts(at_uris, false).await;
-
let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await;
-
-
let feed = skeleton
+
let raw_feed = skeleton
.feed
.into_iter()
-
.filter_map(|item| {
-
let mut post = posts.remove(&item.post)?;
-
let reason = match item.reason {
-
Some(SkeletonReason::Repost { repost }) => {
-
repost_data.remove(&repost).map(FeedViewPostReason::Repost)
-
}
-
Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin),
-
_ => None,
-
};
-
-
post.reason = reason;
-
post.feed_context = item.feed_context;
-
-
Some(post)
+
.filter_map(|v| match v.reason {
+
Some(SkeletonReason::Repost { repost }) => {
+
repost_data
+
.remove_entry(&repost)
+
.map(|(uri, (by, at))| RawFeedItem::Repost {
+
uri,
+
post: v.post,
+
by,
+
at: at.and_utc(),
+
context: v.feed_context,
+
})
+
}
+
Some(SkeletonReason::Pin {}) => Some(RawFeedItem::Pin {
+
uri: v.post,
+
context: v.feed_context,
+
}),
+
None => Some(RawFeedItem::Post {
+
uri: v.post,
+
context: v.feed_context,
+
}),
})
.collect();
+
let feed = hyd.hydrate_feed_posts(raw_feed, false).await;
+
Ok(Json(FeedRes {
cursor: skeleton.cursor,
feed,
···
let pin = match query.include_pins && query.cursor.is_none() {
false => None,
-
true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? {
-
Some(post) => hyd.hydrate_post(post).await,
-
None => None,
-
},
+
true => crate::db::get_pinned_post_uri(&mut conn, &did).await?,
};
let limit = query.limit.unwrap_or(50).clamp(1, 100);
···
.last()
.map(|item| item.sort_at.timestamp_millis().to_string());
-
let at_uris = results
-
.iter()
-
.map(|item| item.post.clone())
-
.collect::<Vec<_>>();
-
-
// get the actor for if we have reposted
-
let profile = hyd
-
.hydrate_profile_basic(did)
-
.await
-
.ok_or(Error::server_error(None))?;
-
-
let mut posts = hyd.hydrate_feed_posts(at_uris, author_threads_only).await;
-
-
let mut feed: Vec<_> = results
+
let mut raw_feed = results
.into_iter()
-
.filter_map(|item| {
-
posts.remove(&item.post).map(|mut fvp| {
-
if item.typ == "repost" {
-
fvp.reason = Some(FeedViewPostReason::Repost(FeedReasonRepost {
-
by: profile.clone(),
-
uri: Some(item.uri),
-
cid: Some(item.cid),
-
indexed_at: Default::default(),
-
}))
-
}
-
fvp
-
})
+
.filter_map(|item| match &*item.typ {
+
"post" => Some(RawFeedItem::Post {
+
uri: item.post,
+
context: None,
+
}),
+
"repost" => Some(RawFeedItem::Repost {
+
uri: item.uri,
+
post: item.post,
+
by: item.did,
+
at: item.sort_at,
+
context: None,
+
}),
+
_ => None,
})
-
.collect();
+
.collect::<Vec<_>>();
if let Some(post) = pin {
-
feed.insert(
+
raw_feed.insert(
0,
-
FeedViewPost {
-
post,
-
reply: None,
-
reason: Some(FeedViewPostReason::Pin),
-
feed_context: None,
+
RawFeedItem::Pin {
+
uri: post,
+
context: None,
},
);
}
+
let feed = hyd.hydrate_feed_posts(raw_feed, author_threads_only).await;
+
Ok(Json(FeedRes { cursor, feed }))
}
···
.last()
.map(|(last, _)| last.timestamp_millis().to_string());
-
let at_uris = results
+
let raw_feed = results
.iter()
-
.map(|(_, uri)| uri.clone())
+
.map(|(_, uri)| RawFeedItem::Post {
+
uri: uri.clone(),
+
context: None,
+
})
.collect::<Vec<_>>();
-
let mut posts = hyd.hydrate_feed_posts(at_uris, false).await;
-
-
let feed = results
-
.into_iter()
-
.filter_map(|(_, uri)| posts.remove(&uri))
-
.collect();
+
let feed = hyd.hydrate_feed_posts(raw_feed, false).await;
Ok(Json(FeedRes { cursor, feed }))
}
···
}
}
-
async fn get_skeleton_repost_data<'a>(
+
async fn get_skeleton_repost_data(
conn: &mut AsyncPgConnection,
-
hyd: &StatefulHydrator<'a>,
reposts: Vec<String>,
-
) -> HashMap<String, FeedReasonRepost> {
+
) -> HashMap<String, (String, NaiveDateTime)> {
let Ok(repost_data) = schema::records::table
.select((
schema::records::at_uri,
···
return HashMap::new();
};
-
let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect();
-
let profiles = hyd.hydrate_profiles_basic(profiles).await;
-
repost_data
.into_iter()
-
.filter_map(|(uri, did, indexed_at)| {
-
let by = profiles.get(&did).cloned()?;
-
-
let repost = FeedReasonRepost {
-
by,
-
uri: Some(uri.clone()),
-
cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it.
-
indexed_at: indexed_at.and_utc(),
-
};
-
-
Some((uri, repost))
-
})
+
.map(|(uri, did, at)| (uri, (did, at)))
.collect()
}