social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from atproto import client_utils, Client, AtUri, IdResolver
2from atproto_client import models
3import json
4import cross
5import database
6from database import DataBaseWorker
7import util
8import media_util
9from util import LOGGER
10
11# only for lexicon reference
12SERVICE = 'https://bsky.app'
13
14ADULT_LABEL = ["sexual content", "nsfw"]
15PORN_LABEL = ["porn", "yiff"]
16
17class BlueskyOutput(cross.Output):
18 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
19 super().__init__(input, settings, db)
20 self.options = util.safe_get(settings, 'options', {})
21
22 if not util.get_or_envvar(settings, 'app-password'):
23 raise Exception("Account app password not provided!")
24
25 resolver = IdResolver()
26 did: str | None = util.get_or_envvar(settings, 'did')
27 if not did:
28 if not util.get_or_envvar(settings, 'handle'):
29 raise Exception("ATP handle not specified!")
30 LOGGER.info("Resolving ATP identity for %s...", util.get_or_envvar(settings, 'handle'))
31 did = resolver.handle.resolve(util.get_or_envvar(settings, 'handle'))
32 if not did:
33 raise Exception("Failed to resolve DID!")
34
35 pds: str | None = util.get_or_envvar(settings, 'pds')
36 if not pds:
37 LOGGER.info("Resolving PDS from DID document...")
38 did_doc = resolver.did.resolve(did)
39 if not did_doc:
40 raise Exception("Failed to resolve DID doc for '%s'", did)
41 pds = did_doc.get_pds_endpoint()
42 if not pds:
43 raise Exception("Failed to resolve PDS!")
44
45 self.client = Client(pds)
46 self.client.login(did, util.get_or_envvar(settings, 'app-password'))
47 self.bsky = Bluesky(self.client)
48
49 def _find_parent(self, parent_id: str):
50 login = self.client.me
51 if not login:
52 raise Exception("Client not logged in!")
53
54 reply_data = database.find_post(self.db, parent_id, self.input.user_id, self.input.service)
55 assert reply_data, "reply_data requested, but doesn't exist in db (should've been skipped bt firehose)"
56
57 reply_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['id'], SERVICE, login.did)]
58 if not reply_mappings:
59 LOGGER.error("Failed to find mappings for a post in the db!")
60 return None
61
62 reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[-1]['uri']), cid=str(reply_mappings[-1]['cid']))
63 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[0]['uri']), cid=str(reply_mappings[0]['cid']))
64 if reply_data['root_id']:
65 root_data = database.find_post_by_id(self.db, reply_data['root_id'])
66 assert root_data, "root_data requested but doesn't exist in db"
67
68 root_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['root_id'], SERVICE, login.did)]
69 if not root_mappings:
70 LOGGER.error("Failed to find mappings for a post in the db!")
71 return None
72 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_mappings[0]['uri']), cid=str(root_mappings[0]['cid']))
73
74 return (
75 models.create_strong_ref(root_record),
76 models.create_strong_ref(reply_record),
77 reply_data['root_id'],
78 reply_data['id']
79 )
80
81 def _split_attachments(self, attachments: list[cross.MediaAttachment]):
82 sup_media: list[cross.MediaAttachment] = []
83 unsup_media: list[cross.MediaAttachment] = []
84
85 for attachment in attachments:
86 attachment_type = attachment.get_type()
87 if not attachment_type:
88 continue
89
90 if attachment_type in {'video', 'image'}: # TODO convert gifs to videos
91 sup_media.append(attachment)
92 else:
93 unsup_media.append(attachment)
94
95 return (sup_media, unsup_media)
96
97 def _split_media_per_post(
98 self,
99 tokens: list[client_utils.TextBuilder],
100 media: list[cross.MediaAttachment]):
101
102 posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
103 available_indices: list[int] = list(range(len(posts)))
104
105 current_image_post_idx: int | None = None
106
107 def make_blank_post() -> dict:
108 return {
109 "tokens": [client_utils.TextBuilder().text('')],
110 "attachments": []
111 }
112
113 def pop_next_empty_index() -> int:
114 if available_indices:
115 return available_indices.pop(0)
116 else:
117 new_idx = len(posts)
118 posts.append(make_blank_post())
119 return new_idx
120
121 for att in media:
122 if att.get_type() == 'video':
123 current_image_post_idx = None
124 idx = pop_next_empty_index()
125 posts[idx]["attachments"].append(att)
126 elif att.get_type() == 'image':
127 if (
128 current_image_post_idx is not None
129 and len(posts[current_image_post_idx]["attachments"]) < 4
130 ):
131 posts[current_image_post_idx]["attachments"].append(att)
132 else:
133 idx = pop_next_empty_index()
134 posts[idx]["attachments"].append(att)
135 current_image_post_idx = idx
136
137 result: list[tuple[client_utils.TextBuilder, list[cross.MediaAttachment]]] = []
138 for p in posts:
139 result.append((p["tokens"], p["attachments"]))
140 return result
141
142 def accept_post(self, post: cross.Post):
143 login = self.client.me
144 if not login:
145 raise Exception("Client not logged in!")
146
147 parent_id = post.get_parent_id()
148
149 # used for db insertion
150 new_root_id = None
151 new_parent_id = None
152
153 root_ref = None
154 reply_ref = None
155 if parent_id:
156 parents = self._find_parent(parent_id)
157 if not parents:
158 return
159 root_ref, reply_ref, new_root_id, new_parent_id = parents
160
161 tokens = post.get_tokens()
162
163 unique_labels: set[str] = set()
164 cw = post.get_cw()
165 if cw:
166 tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
167 unique_labels.add('graphic-media')
168
169 if any(tag in cw for tag in ADULT_LABEL):
170 unique_labels.add('sexual')
171
172 if any(tag in cw for tag in PORN_LABEL):
173 unique_labels.add('porn')
174
175 if post.is_sensitive():
176 unique_labels.add('graphic-media')
177
178 labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
179
180 sup_media, unsup_media = self._split_attachments(post.get_attachments())
181
182 if unsup_media:
183 if tokens:
184 tokens.append(cross.TextToken('\n'))
185 for i, attachment in enumerate(unsup_media):
186 tokens.append(cross.LinkToken(
187 attachment.get_url(),
188 f"[{media_util.get_filename_from_url(attachment.get_url())}]"
189 ))
190 tokens.append(cross.TextToken(' '))
191
192
193 split_tokens: list[list[cross.Token]] = util.split_tokens(post.get_tokens(), 300)
194 post_text: list[client_utils.TextBuilder] = []
195
196 # convert tokens into rich text. skip post if contains unsupported tokens
197 for block in split_tokens:
198 rich_text = tokens_to_richtext(block)
199
200 if not rich_text:
201 LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id())
202 return
203 post_text.append(rich_text)
204
205 if not post_text:
206 post_text = [client_utils.TextBuilder().text('')]
207
208 # download media first. increased RAM usage, but more reliable
209 for m in sup_media:
210 if not m.bytes:
211 if m.get_type() == 'image':
212 image_bytes = media_util.download_blob(m.get_url(), max_bytes=2_000_000)
213 if not image_bytes:
214 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
215 return
216 m.bytes = image_bytes
217 elif m.get_type() == 'video':
218 video_bytes = media_util.download_blob(m.get_url(), max_bytes=100_000_000)
219 if not video_bytes:
220 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
221 return
222 m.bytes = video_bytes
223
224 created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
225 baked_media = self._split_media_per_post(post_text, sup_media)
226
227 for text, attachments in baked_media:
228 if not attachments:
229 if reply_ref and root_ref:
230 new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
231 parent=reply_ref,
232 root=root_ref
233 ), labels=labels)
234 else:
235 new_post = self.bsky.send_post(text, labels=labels)
236 root_ref = models.create_strong_ref(new_post)
237
238 self.bsky.create_gates(self.options, new_post.uri)
239 reply_ref = models.create_strong_ref(new_post)
240 created_records.append(new_post)
241 else:
242 # if a single post is an image - everything else is an image
243 if attachments[0].get_type() == 'image':
244 images: list[bytes] = []
245 image_alts: list[str] = []
246 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
247
248 for attachment in attachments:
249 assert attachment.bytes
250 image_io = media_util.compress_image(attachment.bytes, quality=100)
251 metadata = attachment.create_meta(image_io)
252
253 if len(image_io) > 1_000_000:
254 LOGGER.info("Compressing %s...", attachment.get_url())
255
256 images.append(image_io)
257 image_alts.append(attachment.get_alt())
258 image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
259 width=metadata.get_width(),
260 height=metadata.get_height()
261 ))
262
263 new_post = self.bsky.send_images(
264 text=post_text[0],
265 images=images,
266 image_alts=image_alts,
267 image_aspect_ratios=image_aspect_ratios,
268 reply_to= models.AppBskyFeedPost.ReplyRef(
269 parent=reply_ref,
270 root=root_ref
271 ) if root_ref and reply_ref else None,
272 labels=labels
273 )
274 if not root_ref:
275 root_ref = models.create_strong_ref(new_post)
276
277 self.bsky.create_gates(self.options, new_post.uri)
278 reply_ref = models.create_strong_ref(new_post)
279 created_records.append(new_post)
280 else: # video is guarantedd to be one
281 video_data = attachments[0]
282 assert video_data.bytes
283 video_io = video_data.bytes
284
285 metadata = video_data.create_meta(video_io)
286 if metadata.get_duration() > 180:
287 LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
288 return
289
290 probe = media_util.probe_bytes(video_io)
291 format_name = probe['format']['format_name']
292 if 'mp4' not in format_name.split(','):
293 LOGGER.error("Converting %s to mp4...", video_data.get_url())
294 video_io = media_util.convert_to_mp4(video_io)
295
296 aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
297 width=metadata.get_width(),
298 height=metadata.get_height()
299 )
300
301 new_post = self.bsky.send_video(
302 text=post_text[0],
303 video=video_io,
304 video_aspect_ratio=aspect_ratio,
305 video_alt=video_data.get_alt(),
306 reply_to= models.AppBskyFeedPost.ReplyRef(
307 parent=reply_ref,
308 root=root_ref
309 ) if root_ref and reply_ref else None,
310 labels=labels
311 )
312 if not root_ref:
313 root_ref = models.create_strong_ref(new_post)
314
315 self.bsky.create_gates(self.options, new_post.uri)
316 reply_ref = models.create_strong_ref(new_post)
317 created_records.append(new_post)
318
319 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
320 assert db_post, "ghghghhhhh"
321
322 db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
323
324 if new_root_id is None or new_parent_id is None:
325 new_root_id = database.insert_post(
326 self.db,
327 db_identifiers[0],
328 login.did,
329 SERVICE
330 )
331 new_parent_id = new_root_id
332 database.insert_mapping(self.db, db_post['id'], new_parent_id)
333 db_identifiers = db_identifiers[1:]
334
335 for db_id in db_identifiers:
336 new_parent_id = database.insert_reply(
337 self.db,
338 db_id,
339 login.did,
340 SERVICE,
341 new_parent_id,
342 new_root_id
343 )
344 database.insert_mapping(self.db, db_post['id'], new_parent_id)
345
346 def delete_post(self, identifier: str):
347 login = self.client.me
348 if not login:
349 raise Exception("Client not logged in!")
350
351 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
352 if not post:
353 return
354
355 mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
356 for mapping in mappings[::-1]:
357 self.client.delete_post(json.loads(mapping[0])['uri'])
358 database.delete_post(self.db, mapping[0], SERVICE, login.did)
359
360
361class Bluesky():
362 def __init__(self, client: Client) -> None:
363 self.client = client
364
365 def send_video(
366 self,
367 text: str | client_utils.TextBuilder,
368 video: bytes,
369 video_alt: str | None = None,
370 video_aspect_ratio: models.AppBskyEmbedDefs.AspectRatio | None = None,
371 reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
372 langs: list[str] | None = None,
373 facets: list[models.AppBskyRichtextFacet.Main] | None = None,
374 labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
375 ) -> models.AppBskyFeedPost.CreateRecordResponse:
376
377 if video_alt is None:
378 video_alt = ''
379
380 upload = self.client.upload_blob(video)
381
382 return self.send_post(
383 text,
384 reply_to=reply_to,
385 embed=models.AppBskyEmbedVideo.Main(video=upload.blob, alt=video_alt, aspect_ratio=video_aspect_ratio),
386 langs=langs,
387 facets=facets,
388 labels=labels
389 )
390
391 def send_images(
392 self,
393 text: str | client_utils.TextBuilder,
394 images: list[bytes],
395 image_alts: list[str] | None = None,
396 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] | None = None,
397 reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
398 langs: list[str] | None = None,
399 facets: list[models.AppBskyRichtextFacet.Main] | None = None,
400 labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
401 ) -> models.AppBskyFeedPost.CreateRecordResponse:
402
403 if image_alts is None:
404 image_alts = [''] * len(images)
405 else:
406 diff = len(images) - len(image_alts)
407 image_alts = image_alts + [''] * diff
408
409 if image_aspect_ratios is None:
410 aligned_image_aspect_ratios = [None] * len(images)
411 else:
412 diff = len(images) - len(image_aspect_ratios)
413 aligned_image_aspect_ratios = image_aspect_ratios + [None] * diff
414
415 uploads = [self.client.upload_blob(image) for image in images]
416
417 embed_images = [
418 models.AppBskyEmbedImages.Image(alt=alt, image=upload.blob, aspect_ratio=aspect_ratio)
419 for alt, upload, aspect_ratio in zip(image_alts, uploads, aligned_image_aspect_ratios)
420 ]
421
422 return self.send_post(
423 text,
424 reply_to=reply_to,
425 embed=models.AppBskyEmbedImages.Main(images=embed_images),
426 langs=langs,
427 facets=facets,
428 labels=labels
429 )
430
431 def send_post(
432 self,
433 text: str | client_utils.TextBuilder,
434 reply_to: models.AppBskyFeedPost.ReplyRef | None = None,
435 embed:
436 None |
437 models.AppBskyEmbedImages.Main |
438 models.AppBskyEmbedExternal.Main |
439 models.AppBskyEmbedRecord.Main |
440 models.AppBskyEmbedRecordWithMedia.Main |
441 models.AppBskyEmbedVideo.Main = None,
442 langs: list[str] | None = None,
443 facets: list[models.AppBskyRichtextFacet.Main] | None = None,
444 labels: models.ComAtprotoLabelDefs.SelfLabels | None = None
445 ) -> models.AppBskyFeedPost.CreateRecordResponse:
446
447 if isinstance(text, client_utils.TextBuilder):
448 facets = text.build_facets()
449 text = text.build_text()
450
451 repo = self.client.me and self.client.me.did
452 if not repo:
453 raise Exception("Client not logged in!")
454
455 if not langs:
456 langs = ['en']
457
458 record = models.AppBskyFeedPost.Record(
459 created_at=self.client.get_current_time_iso(),
460 text=text,
461 reply=reply_to,
462 embed=embed,
463 langs=langs,
464 facets=facets,
465 labels=labels
466 )
467 return self.client.app.bsky.feed.post.create(repo, record)
468
469 def create_gates(self, options: dict, post_uri: str):
470 account = self.client.me
471 if not account:
472 raise Exception("Client not logged in!")
473
474 rkey = AtUri.from_str(post_uri).rkey
475 time = self.client.get_current_time_iso()
476
477 thread_gate_opts = options.get('thread_gate', [])
478 if 'everybody' not in thread_gate_opts:
479 allow = []
480 if thread_gate_opts:
481 if 'following' in thread_gate_opts:
482 allow.append(models.AppBskyFeedThreadgate.FollowingRule())
483 if 'followers' in thread_gate_opts:
484 allow.append(models.AppBskyFeedThreadgate.FollowerRule())
485 if 'mentioned' in thread_gate_opts:
486 allow.append(models.AppBskyFeedThreadgate.MentionRule())
487
488 thread_gate = models.AppBskyFeedThreadgate.Record(
489 post=post_uri,
490 created_at=time,
491 allow=allow
492 )
493
494 self.client.app.bsky.feed.threadgate.create(account.did, thread_gate, rkey)
495
496 if options.get('quote_gate', False):
497 post_gate = models.AppBskyFeedPostgate.Record(
498 post=post_uri,
499 created_at=time,
500 embedding_rules=[
501 models.AppBskyFeedPostgate.DisableRule()
502 ]
503 )
504
505 self.client.app.bsky.feed.postgate.create(account.did, post_gate, rkey)
506
507
508def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
509 builder = client_utils.TextBuilder()
510
511 def flatten_link(href: str):
512 split = href.split('://', 1)
513 if len(split) > 1:
514 href = split[1]
515
516 if len(href) > 32:
517 href = href[:32] + '...'
518
519 return href
520
521 for token in tokens:
522 if isinstance(token, cross.TextToken):
523 builder.text(token.text)
524 elif isinstance(token, cross.LinkToken):
525 if util.canonical_label(token.label, token.href):
526 builder.link(flatten_link(token.href), token.href)
527 continue
528
529 builder.link(token.label, token.href)
530 elif isinstance(token, cross.TagToken):
531 builder.tag('#' + token.tag, token.tag)
532 else:
533 # fail on unsupported tokens
534 return None
535
536 return builder