import re from atproto import client_utils import cross from util.media import MediaInfo from util.util import canonical_label # only for lexicon reference SERVICE = "https://bsky.app" # TODO this is terrible and stupid ADULT_PATTERN = re.compile( r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE ) PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE) class BlueskyPost(cross.Post): def __init__( self, record: dict, tokens: list[cross.Token], attachments: list[MediaInfo] ) -> None: super().__init__() self.uri = record["$xpost.strongRef"]["uri"] self.parent_uri = None if record.get("reply"): self.parent_uri = record["reply"]["parent"]["uri"] self.tokens = tokens self.timestamp = record["createdAt"] labels = record.get("labels", {}).get("values") self.spoiler = None if labels: self.spoiler = ", ".join( [str(label["val"]).replace("-", " ") for label in labels] ) self.attachments = attachments self.languages = record.get("langs", []) # at:// of the post record def get_id(self) -> str: return self.uri def get_parent_id(self) -> str | None: return self.parent_uri def get_tokens(self) -> list[cross.Token]: return self.tokens def get_text_type(self) -> str: return "text/plain" def get_timestamp(self) -> str: return self.timestamp def get_attachments(self) -> list[MediaInfo]: return self.attachments def get_spoiler(self) -> str | None: return self.spoiler def get_languages(self) -> list[str]: return self.languages def is_sensitive(self) -> bool: return self.spoiler is not None def get_post_url(self) -> str | None: did, _, post_id = str(self.uri[len("at://") :]).split("/") return f"https://bsky.app/profile/{did}/post/{post_id}" def tokenize_post(post: dict) -> list[cross.Token]: text: str = post.get("text", "") if not text: return [] ut8_text = text.encode(encoding="utf-8") def decode(ut8: bytes) -> str: return ut8.decode(encoding="utf-8") facets: list[dict] = post.get("facets", []) if not facets: return [cross.TextToken(decode(ut8_text))] slices: list[tuple[int, int, str, str]] = [] for facet in facets: features: list[dict] = facet.get("features", []) if not features: continue # we don't support overlapping facets/features feature = features[0] feature_type = feature["$type"] index = facet["index"] match feature_type: case "app.bsky.richtext.facet#tag": slices.append( (index["byteStart"], index["byteEnd"], "tag", feature["tag"]) ) case "app.bsky.richtext.facet#link": slices.append( (index["byteStart"], index["byteEnd"], "link", feature["uri"]) ) case "app.bsky.richtext.facet#mention": slices.append( (index["byteStart"], index["byteEnd"], "mention", feature["did"]) ) if not slices: return [cross.TextToken(decode(ut8_text))] slices.sort(key=lambda s: s[0]) unique: list[tuple[int, int, str, str]] = [] current_end = 0 for start, end, ttype, val in slices: if start >= current_end: unique.append((start, end, ttype, val)) current_end = end if not unique: return [cross.TextToken(decode(ut8_text))] tokens: list[cross.Token] = [] prev = 0 for start, end, ttype, val in unique: if start > prev: # text between facets tokens.append(cross.TextToken(decode(ut8_text[prev:start]))) # facet token match ttype: case "link": label = decode(ut8_text[start:end]) # try to unflatten links split = val.split("://", 1) if len(split) > 1: if split[1].startswith(label): tokens.append(cross.LinkToken(val, "")) prev = end continue if label.endswith("...") and split[1].startswith(label[:-3]): tokens.append(cross.LinkToken(val, "")) prev = end continue tokens.append(cross.LinkToken(val, label)) case "tag": tag = decode(ut8_text[start:end]) tokens.append(cross.TagToken(tag[1:] if tag.startswith("#") else tag)) case "mention": mention = decode(ut8_text[start:end]) tokens.append( cross.MentionToken( mention[1:] if mention.startswith("@") else mention, val ) ) prev = end if prev < len(ut8_text): tokens.append(cross.TextToken(decode(ut8_text[prev:]))) return tokens def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None: builder = client_utils.TextBuilder() def flatten_link(href: str): split = href.split("://", 1) if len(split) > 1: href = split[1] if len(href) > 32: href = href[:32] + "..." return href for token in tokens: if isinstance(token, cross.TextToken): builder.text(token.text) elif isinstance(token, cross.LinkToken): if canonical_label(token.label, token.href): builder.link(flatten_link(token.href), token.href) continue builder.link(token.label, token.href) elif isinstance(token, cross.TagToken): builder.tag("#" + token.tag, token.tag.lower()) else: # fail on unsupported tokens return None return builder