social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at master 6.1 kB view raw
1import re 2 3from atproto import client_utils 4 5import cross 6from util.media import MediaInfo 7from util.util import canonical_label 8 9# only for lexicon reference 10SERVICE = "https://bsky.app" 11 12# TODO this is terrible and stupid 13ADULT_PATTERN = re.compile( 14 r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE 15) 16PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE) 17 18 19class BlueskyPost(cross.Post): 20 def __init__( 21 self, record: dict, tokens: list[cross.Token], attachments: list[MediaInfo] 22 ) -> None: 23 super().__init__() 24 self.uri = record["$xpost.strongRef"]["uri"] 25 self.parent_uri = None 26 if record.get("reply"): 27 self.parent_uri = record["reply"]["parent"]["uri"] 28 29 self.tokens = tokens 30 self.timestamp = record["createdAt"] 31 labels = record.get("labels", {}).get("values") 32 self.spoiler = None 33 if labels: 34 self.spoiler = ", ".join( 35 [str(label["val"]).replace("-", " ") for label in labels] 36 ) 37 38 self.attachments = attachments 39 self.languages = record.get("langs", []) 40 41 # at:// of the post record 42 def get_id(self) -> str: 43 return self.uri 44 45 def get_parent_id(self) -> str | None: 46 return self.parent_uri 47 48 def get_tokens(self) -> list[cross.Token]: 49 return self.tokens 50 51 def get_text_type(self) -> str: 52 return "text/plain" 53 54 def get_timestamp(self) -> str: 55 return self.timestamp 56 57 def get_attachments(self) -> list[MediaInfo]: 58 return self.attachments 59 60 def get_spoiler(self) -> str | None: 61 return self.spoiler 62 63 def get_languages(self) -> list[str]: 64 return self.languages 65 66 def is_sensitive(self) -> bool: 67 return self.spoiler is not None 68 69 def get_post_url(self) -> str | None: 70 did, _, post_id = str(self.uri[len("at://") :]).split("/") 71 72 return f"https://bsky.app/profile/{did}/post/{post_id}" 73 74 75def tokenize_post(post: dict) -> list[cross.Token]: 76 text: str = post.get("text", "") 77 if not text: 78 return [] 79 ut8_text = text.encode(encoding="utf-8") 80 81 def decode(ut8: bytes) -> str: 82 return ut8.decode(encoding="utf-8") 83 84 facets: list[dict] = post.get("facets", []) 85 if not facets: 86 return [cross.TextToken(decode(ut8_text))] 87 88 slices: list[tuple[int, int, str, str]] = [] 89 90 for facet in facets: 91 features: list[dict] = facet.get("features", []) 92 if not features: 93 continue 94 95 # we don't support overlapping facets/features 96 feature = features[0] 97 feature_type = feature["$type"] 98 index = facet["index"] 99 match feature_type: 100 case "app.bsky.richtext.facet#tag": 101 slices.append( 102 (index["byteStart"], index["byteEnd"], "tag", feature["tag"]) 103 ) 104 case "app.bsky.richtext.facet#link": 105 slices.append( 106 (index["byteStart"], index["byteEnd"], "link", feature["uri"]) 107 ) 108 case "app.bsky.richtext.facet#mention": 109 slices.append( 110 (index["byteStart"], index["byteEnd"], "mention", feature["did"]) 111 ) 112 113 if not slices: 114 return [cross.TextToken(decode(ut8_text))] 115 116 slices.sort(key=lambda s: s[0]) 117 unique: list[tuple[int, int, str, str]] = [] 118 current_end = 0 119 for start, end, ttype, val in slices: 120 if start >= current_end: 121 unique.append((start, end, ttype, val)) 122 current_end = end 123 124 if not unique: 125 return [cross.TextToken(decode(ut8_text))] 126 127 tokens: list[cross.Token] = [] 128 prev = 0 129 130 for start, end, ttype, val in unique: 131 if start > prev: 132 # text between facets 133 tokens.append(cross.TextToken(decode(ut8_text[prev:start]))) 134 # facet token 135 match ttype: 136 case "link": 137 label = decode(ut8_text[start:end]) 138 139 # try to unflatten links 140 split = val.split("://", 1) 141 if len(split) > 1: 142 if split[1].startswith(label): 143 tokens.append(cross.LinkToken(val, "")) 144 prev = end 145 continue 146 147 if label.endswith("...") and split[1].startswith(label[:-3]): 148 tokens.append(cross.LinkToken(val, "")) 149 prev = end 150 continue 151 152 tokens.append(cross.LinkToken(val, label)) 153 case "tag": 154 tag = decode(ut8_text[start:end]) 155 tokens.append(cross.TagToken(tag[1:] if tag.startswith("#") else tag)) 156 case "mention": 157 mention = decode(ut8_text[start:end]) 158 tokens.append( 159 cross.MentionToken( 160 mention[1:] if mention.startswith("@") else mention, val 161 ) 162 ) 163 prev = end 164 165 if prev < len(ut8_text): 166 tokens.append(cross.TextToken(decode(ut8_text[prev:]))) 167 168 return tokens 169 170 171def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None: 172 builder = client_utils.TextBuilder() 173 174 def flatten_link(href: str): 175 split = href.split("://", 1) 176 if len(split) > 1: 177 href = split[1] 178 179 if len(href) > 32: 180 href = href[:32] + "..." 181 182 return href 183 184 for token in tokens: 185 if isinstance(token, cross.TextToken): 186 builder.text(token.text) 187 elif isinstance(token, cross.LinkToken): 188 if canonical_label(token.label, token.href): 189 builder.link(flatten_link(token.href), token.href) 190 continue 191 192 builder.link(token.label, token.href) 193 elif isinstance(token, cross.TagToken): 194 builder.tag("#" + token.tag, token.tag.lower()) 195 else: 196 # fail on unsupported tokens 197 return None 198 199 return builder