···
1
+
use crate::db::ThreadItem;
2
+
use crate::hydration::StatefulHydrator;
3
+
use crate::xrpc::error::{Error, XrpcResult};
4
+
use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth};
5
+
use crate::xrpc::normalise_at_uri;
6
+
use crate::GlobalState;
7
+
use axum::extract::{Query, State};
9
+
use itertools::Itertools;
10
+
use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadgateView};
11
+
use lexica::app_bsky::unspecced::{ThreadItemPost, ThreadV2Item, ThreadV2ItemType};
12
+
use serde::{Deserialize, Serialize};
13
+
use std::cmp::Ordering;
14
+
use std::collections::{HashMap, HashSet};
16
+
const THREAD_PARENTS: usize = 50;
17
+
const DEFAULT_BRANCHING: u32 = 10;
18
+
const DEFAULT_DEPTH: u32 = 6;
20
+
#[derive(Copy, Clone, Debug, Default, Deserialize)]
21
+
#[serde(rename_all = "lowercase")]
22
+
pub enum PostThreadSort {
29
+
#[derive(Debug, Deserialize)]
30
+
#[serde(rename_all = "camelCase")]
31
+
pub struct GetPostThreadV2Req {
33
+
pub above: Option<bool>,
34
+
pub below: Option<u32>,
35
+
pub branching_factor: Option<u32>,
37
+
pub sort: PostThreadSort,
40
+
#[derive(Debug, Serialize)]
41
+
#[serde(rename_all = "camelCase")]
42
+
pub struct GetPostThreadV2Res {
43
+
pub thread: Vec<ThreadV2Item>,
44
+
#[serde(skip_serializing_if = "Option::is_none")]
45
+
pub threadgate: Option<ThreadgateView>,
46
+
pub has_other_replies: bool,
49
+
pub async fn get_post_thread_v2(
50
+
State(state): State<GlobalState>,
51
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
52
+
maybe_auth: Option<AtpAuth>,
53
+
Query(query): Query<GetPostThreadV2Req>,
54
+
) -> XrpcResult<Json<GetPostThreadV2Res>> {
55
+
let mut conn = state.pool.get().await?;
56
+
let maybe_did = maybe_auth.clone().map(|v| v.0);
57
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
59
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
60
+
let depth = query.below.unwrap_or(DEFAULT_DEPTH).clamp(0, 20) as i32;
61
+
let branching_factor = query
63
+
.unwrap_or(DEFAULT_BRANCHING)
64
+
.clamp(0, 100) as i32;
67
+
.hydrate_post(uri.clone())
69
+
.ok_or(Error::not_found())?;
71
+
if let Some(v) = &anchor.author.viewer {
72
+
if v.blocked_by || v.blocking.is_some() {
73
+
let block = ThreadV2ItemType::Blocked {
74
+
author: BlockedAuthor {
75
+
did: anchor.author.did,
76
+
viewer: anchor.author.viewer,
80
+
return Ok(Json(GetPostThreadV2Res {
81
+
thread: vec![ThreadV2Item {
86
+
threadgate: anchor.threadgate,
87
+
has_other_replies: false,
92
+
// get the root post URI (if there is one) and return its author's DID.
93
+
let root_uri = crate::db::get_root_post(&mut conn, &uri)
95
+
.unwrap_or(uri.clone());
96
+
let root_did = root_uri[5..].split('/').collect::<Vec<_>>()[0];
99
+
crate::db::get_thread_children_branching(&mut conn, &uri, depth, branching_factor + 1)
101
+
let reply_uris = replies
103
+
.map(|item| item.at_uri.clone())
104
+
.collect::<Vec<_>>();
106
+
// bluesky seems to use -50 atm. we get 1 extra to know if to set more_parents.
107
+
let parents = match query.above.unwrap_or(true) {
108
+
true => crate::db::get_thread_parents(&mut conn, &uri, THREAD_PARENTS as i32 + 1).await?,
111
+
let parent_uris = parents
113
+
.map(|item| item.at_uri.clone())
114
+
.collect::<Vec<_>>();
116
+
let (mut replies_hyd, mut parents_hyd) = tokio::join!(
117
+
hyd.hydrate_posts(reply_uris),
118
+
hyd.hydrate_posts(parent_uris),
121
+
let threadgate = anchor.threadgate.clone();
122
+
let hidden: HashSet<_, std::hash::RandomState> = match &threadgate {
123
+
Some(tg) => crate::db::get_threadgate_hiddens(&mut conn, &tg.uri).await?,
126
+
.map(|hiddens| HashSet::from_iter(Vec::from(hiddens)))
127
+
.unwrap_or_default();
129
+
let root_has_more = parents.len() > THREAD_PARENTS;
130
+
let mut is_op_thread = true;
132
+
let mut thread = Vec::with_capacity(1 + replies.len() + parents.len());
137
+
.tail(THREAD_PARENTS)
139
+
.map(|(idx, item)| {
140
+
let value = parents_hyd
141
+
.remove(&item.at_uri)
143
+
if let Some(v) = &post.author.viewer {
144
+
if v.blocked_by || v.blocking.is_some() {
145
+
return ThreadV2ItemType::Blocked {
146
+
author: BlockedAuthor {
147
+
did: post.author.did,
148
+
viewer: post.author.viewer,
154
+
let op_thread = (is_op_thread
155
+
|| item.root_uri.is_none() && item.parent_uri.is_none())
156
+
&& post.author.did == root_did;
158
+
ThreadV2ItemType::Post(ThreadItemPost {
160
+
more_parents: idx == 0 && root_has_more,
163
+
hidden_by_threadgate: false,
164
+
muted_by_viewer: false,
167
+
.unwrap_or(ThreadV2ItemType::NotFound {});
171
+
depth: -item.depth - 1,
177
+
is_op_thread = is_op_thread && anchor.author.did == root_did;
178
+
thread.push(ThreadV2Item {
181
+
value: ThreadV2ItemType::Post(ThreadItemPost {
183
+
more_parents: false,
185
+
op_thread: is_op_thread,
186
+
hidden_by_threadgate: false,
187
+
muted_by_viewer: false,
191
+
let mut replies_grouped = replies
193
+
.into_group_map_by(|item| item.parent_uri.clone().unwrap_or_default());
195
+
// start with the anchor
196
+
let (children, has_other_replies) = build_thread_children(
197
+
&mut replies_grouped,
203
+
&BuildThreadChildrenOpts {
206
+
maybe_did: &maybe_did,
210
+
thread.extend(children);
212
+
Ok(Json(GetPostThreadV2Res {
219
+
#[derive(Debug, Deserialize)]
220
+
#[serde(rename_all = "camelCase")]
221
+
pub struct GetPostThreadOtherV2Req {
222
+
pub anchor: String,
225
+
#[derive(Debug, Serialize)]
226
+
#[serde(rename_all = "camelCase")]
227
+
pub struct GetPostThreadOtherV2Res {
228
+
pub thread: Vec<ThreadV2Item>,
231
+
pub async fn get_post_thread_other_v2(
232
+
State(state): State<GlobalState>,
233
+
AtpAcceptLabelers(labelers): AtpAcceptLabelers,
234
+
maybe_auth: Option<AtpAuth>,
235
+
Query(query): Query<GetPostThreadOtherV2Req>,
236
+
) -> XrpcResult<Json<GetPostThreadOtherV2Res>> {
237
+
let mut conn = state.pool.get().await?;
238
+
let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth);
240
+
let uri = normalise_at_uri(&state.dataloaders, &query.anchor).await?;
242
+
let root = crate::db::get_root_post(&mut conn, &uri)
244
+
.unwrap_or(uri.clone());
246
+
// this only returns immediate children (depth==1) where hiddenByThreadgate=TRUE
247
+
let replies = crate::db::get_thread_children_hidden(&mut conn, &uri, &root).await?;
248
+
let reply_uris = replies
250
+
.map(|item| item.at_uri)
251
+
.collect::<Vec<_>>();
253
+
.hydrate_posts(reply_uris)
256
+
.filter(|(_, post)| match &post.author.viewer {
257
+
Some(viewer) if viewer.blocked_by || viewer.blocking.is_some() => false,
260
+
.map(|(uri, post)| {
261
+
let post = ThreadItemPost {
263
+
more_parents: false,
266
+
hidden_by_threadgate: true,
267
+
muted_by_viewer: false,
273
+
value: ThreadV2ItemType::Post(post),
278
+
Ok(Json(GetPostThreadOtherV2Res { thread }))
282
+
struct BuildThreadChildrenOpts<'a> {
284
+
sort: PostThreadSort,
285
+
maybe_did: &'a Option<String>,
289
+
fn build_thread_children(
290
+
grouped_replies: &mut HashMap<String, Vec<ThreadItem>>,
291
+
replies_hyd: &mut HashMap<String, PostView>,
292
+
hidden: &HashSet<String>,
294
+
is_op_thread: bool,
296
+
opts: &BuildThreadChildrenOpts,
297
+
) -> (Vec<ThreadV2Item>, bool) {
298
+
let mut has_other_replies = false;
300
+
let Some(replies) = grouped_replies.remove(parent) else {
301
+
return (Vec::default(), has_other_replies);
304
+
let replies = replies
306
+
.filter_map(|item| replies_hyd.remove(&item.at_uri))
307
+
.sorted_by(sort_replies(&opts.sort));
309
+
let mut out = Vec::new();
311
+
for post in replies {
312
+
let reply_count = grouped_replies
315
+
.unwrap_or_default();
316
+
let at_max = depth == opts.max_depth;
317
+
let more_replies = if at_max { reply_count } else { 0 };
318
+
let op_thread = is_op_thread && post.author.did == opts.root_did;
320
+
// shouldn't push to the thread if there's a block relation. Bsky doesn't push a type of Blocked for replies...
321
+
if let Some(v) = &post.author.viewer {
322
+
if v.blocked_by || v.blocking.is_some() {
327
+
// check if the post is hidden AND we're NOT the author (hidden posts still show for their author)
328
+
if hidden.contains(&post.uri) && !did_is_cur(opts.maybe_did, &post.author.did) {
329
+
// post is hidden - do not ~pass go~ push to the thread.
331
+
has_other_replies = true;
336
+
let uri = post.uri.clone();
337
+
out.push(ThreadV2Item {
338
+
uri: post.uri.clone(),
340
+
value: ThreadV2ItemType::Post(ThreadItemPost {
342
+
more_parents: false,
343
+
more_replies: more_replies as i32,
345
+
hidden_by_threadgate: false,
346
+
muted_by_viewer: false,
351
+
// we don't care about has_other_replies when recursing
352
+
let (children, _) = build_thread_children(
362
+
out.extend(children);
366
+
(out, has_other_replies)
369
+
fn sort_replies(sort: &PostThreadSort) -> impl Fn(&PostView, &PostView) -> Ordering + use<'_> {
370
+
move |a: &PostView, b: &PostView| match sort {
371
+
PostThreadSort::Newest => b.indexed_at.cmp(&a.indexed_at),
372
+
PostThreadSort::Oldest => a.indexed_at.cmp(&b.indexed_at),
373
+
PostThreadSort::Top => b.stats.like_count.cmp(&a.stats.like_count),
377
+
fn did_is_cur(cur: &Option<String>, did: &String) -> bool {
379
+
Some(cur) => did == cur,