social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import re
2
3from atproto import client_utils
4
5import cross
6from util.media import MediaInfo
7from util.util import canonical_label
8
9# only for lexicon reference
10SERVICE = "https://bsky.app"
11
12# TODO this is terrible and stupid
13ADULT_PATTERN = re.compile(
14 r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE
15)
16PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
17
18
19class BlueskyPost(cross.Post):
20 def __init__(
21 self, record: dict, tokens: list[cross.Token], attachments: list[MediaInfo]
22 ) -> None:
23 super().__init__()
24 self.uri = record["$xpost.strongRef"]["uri"]
25 self.parent_uri = None
26 if record.get("reply"):
27 self.parent_uri = record["reply"]["parent"]["uri"]
28
29 self.tokens = tokens
30 self.timestamp = record["createdAt"]
31 labels = record.get("labels", {}).get("values")
32 self.spoiler = None
33 if labels:
34 self.spoiler = ", ".join(
35 [str(label["val"]).replace("-", " ") for label in labels]
36 )
37
38 self.attachments = attachments
39 self.languages = record.get("langs", [])
40
41 # at:// of the post record
42 def get_id(self) -> str:
43 return self.uri
44
45 def get_parent_id(self) -> str | None:
46 return self.parent_uri
47
48 def get_tokens(self) -> list[cross.Token]:
49 return self.tokens
50
51 def get_text_type(self) -> str:
52 return "text/plain"
53
54 def get_timestamp(self) -> str:
55 return self.timestamp
56
57 def get_attachments(self) -> list[MediaInfo]:
58 return self.attachments
59
60 def get_spoiler(self) -> str | None:
61 return self.spoiler
62
63 def get_languages(self) -> list[str]:
64 return self.languages
65
66 def is_sensitive(self) -> bool:
67 return self.spoiler is not None
68
69 def get_post_url(self) -> str | None:
70 did, _, post_id = str(self.uri[len("at://") :]).split("/")
71
72 return f"https://bsky.app/profile/{did}/post/{post_id}"
73
74
75def tokenize_post(post: dict) -> list[cross.Token]:
76 text: str = post.get("text", "")
77 if not text:
78 return []
79 ut8_text = text.encode(encoding="utf-8")
80
81 def decode(ut8: bytes) -> str:
82 return ut8.decode(encoding="utf-8")
83
84 facets: list[dict] = post.get("facets", [])
85 if not facets:
86 return [cross.TextToken(decode(ut8_text))]
87
88 slices: list[tuple[int, int, str, str]] = []
89
90 for facet in facets:
91 features: list[dict] = facet.get("features", [])
92 if not features:
93 continue
94
95 # we don't support overlapping facets/features
96 feature = features[0]
97 feature_type = feature["$type"]
98 index = facet["index"]
99 match feature_type:
100 case "app.bsky.richtext.facet#tag":
101 slices.append(
102 (index["byteStart"], index["byteEnd"], "tag", feature["tag"])
103 )
104 case "app.bsky.richtext.facet#link":
105 slices.append(
106 (index["byteStart"], index["byteEnd"], "link", feature["uri"])
107 )
108 case "app.bsky.richtext.facet#mention":
109 slices.append(
110 (index["byteStart"], index["byteEnd"], "mention", feature["did"])
111 )
112
113 if not slices:
114 return [cross.TextToken(decode(ut8_text))]
115
116 slices.sort(key=lambda s: s[0])
117 unique: list[tuple[int, int, str, str]] = []
118 current_end = 0
119 for start, end, ttype, val in slices:
120 if start >= current_end:
121 unique.append((start, end, ttype, val))
122 current_end = end
123
124 if not unique:
125 return [cross.TextToken(decode(ut8_text))]
126
127 tokens: list[cross.Token] = []
128 prev = 0
129
130 for start, end, ttype, val in unique:
131 if start > prev:
132 # text between facets
133 tokens.append(cross.TextToken(decode(ut8_text[prev:start])))
134 # facet token
135 match ttype:
136 case "link":
137 label = decode(ut8_text[start:end])
138
139 # try to unflatten links
140 split = val.split("://", 1)
141 if len(split) > 1:
142 if split[1].startswith(label):
143 tokens.append(cross.LinkToken(val, ""))
144 prev = end
145 continue
146
147 if label.endswith("...") and split[1].startswith(label[:-3]):
148 tokens.append(cross.LinkToken(val, ""))
149 prev = end
150 continue
151
152 tokens.append(cross.LinkToken(val, label))
153 case "tag":
154 tag = decode(ut8_text[start:end])
155 tokens.append(cross.TagToken(tag[1:] if tag.startswith("#") else tag))
156 case "mention":
157 mention = decode(ut8_text[start:end])
158 tokens.append(
159 cross.MentionToken(
160 mention[1:] if mention.startswith("@") else mention, val
161 )
162 )
163 prev = end
164
165 if prev < len(ut8_text):
166 tokens.append(cross.TextToken(decode(ut8_text[prev:])))
167
168 return tokens
169
170
171def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
172 builder = client_utils.TextBuilder()
173
174 def flatten_link(href: str):
175 split = href.split("://", 1)
176 if len(split) > 1:
177 href = split[1]
178
179 if len(href) > 32:
180 href = href[:32] + "..."
181
182 return href
183
184 for token in tokens:
185 if isinstance(token, cross.TextToken):
186 builder.text(token.text)
187 elif isinstance(token, cross.LinkToken):
188 if canonical_label(token.label, token.href):
189 builder.link(flatten_link(token.href), token.href)
190 continue
191
192 builder.link(token.label, token.href)
193 elif isinstance(token, cross.TagToken):
194 builder.tag("#" + token.tag, token.tag.lower())
195 else:
196 # fail on unsupported tokens
197 return None
198
199 return builder