social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from atproto import client_utils, Request, AsyncFirehoseSubscribeReposClient, CAR, CID
2from atproto_client import models
3from atproto_client.models.utils import get_or_create as get_model_or_create
4from atproto_client.models.blob_ref import BlobRef
5from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose
6from atproto2 import Client2, resolve_identity
7from httpx import Timeout
8import json
9import cross
10import database
11from database import DataBaseWorker
12import util
13import media_util
14from util import LOGGER
15import re
16from typing import Callable, Any
17
18# only for lexicon reference
19SERVICE = 'https://bsky.app'
20
21# TODO this is terrible and stupid
22ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
23PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
24
25def tokenize_post(post: dict) -> list[cross.Token]:
26 text: str = post.get('text', '')
27 if not text:
28 return []
29 text = text.encode(encoding='utf-8').decode(encoding='utf-8')
30
31 facets: list[dict] = post.get('facets', [])
32 if not facets:
33 return [cross.TextToken(text)]
34
35 slices: list[tuple[int, int, str, str]] = []
36
37 for facet in facets:
38 features: list[dict] = facet.get('features', [])
39 if not features:
40 continue
41
42 # we don't support overlapping facets/features
43 feature = features[0]
44 feature_type = feature['$type']
45 index = facet['index']
46 if feature_type == 'app.bsky.richtext.facet#tag':
47 slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag']))
48 elif feature_type == 'app.bsky.richtext.facet#link':
49 slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri']))
50 elif feature_type == 'app.bsky.richtext.facet#mention':
51 slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did']))
52
53 if not slices:
54 return [cross.TextToken(text)]
55
56 slices.sort(key=lambda s: s[0])
57 unique: list[tuple[int, int, str, str]] = []
58 current_end = 0
59 for start, end, ttype, val in slices:
60 if start >= current_end:
61 unique.append((start, end, ttype, val))
62 current_end = end
63
64 if not unique:
65 return [cross.TextToken(text)]
66
67 tokens: list[cross.Token] = []
68 prev = 0
69
70 for start, end, ttype, val in unique:
71 if start > prev:
72 # text between facets
73 tokens.append(cross.TextToken(text[prev:start]))
74 # facet token
75 if ttype == 'link':
76 label = text[start:end]
77
78 # try to unflatten links
79 split = val.split('://')
80 if len(split) > 1:
81 if split[1].startswith(label):
82 tokens.append(cross.LinkToken(val, ''))
83 elif label.endswith('...') and split[1].startswith(label[:-3]):
84 tokens.append(cross.LinkToken(val, ''))
85 else:
86 tokens.append(cross.LinkToken(val, label))
87 elif ttype == 'tag':
88 tokens.append(cross.TagToken(val))
89 elif ttype == 'mention':
90 tokens.append(cross.MentionToken(text[start:end], val))
91 prev = end
92
93 if prev < len(text):
94 tokens.append(cross.TextToken(text[prev:]))
95
96 for t in tokens:
97 print(t.__dict__)
98
99 return tokens
100
101class BlueskyPost(cross.Post):
102 def __init__(self, pds_url: str, did: str, post: dict) -> None:
103 super().__init__()
104 self.post = post
105 self.tokens = tokenize_post(post)
106
107 self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True)
108
109 self.parent_id = None
110 if self.post.get('reply'):
111 self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True)
112
113 labels = self.post.get('labels', {}).get('values')
114 self.cw = ''
115 if labels:
116 self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
117
118 def get_blob_url(blob: str):
119 nonlocal pds_url, did
120 return f'{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={blob}'
121
122 attachments: list[cross.MediaAttachment] = []
123 embed = self.post.get('embed', {})
124 if embed.get('$type') == 'app.bsky.embed.images':
125 model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
126 assert isinstance(model, models.AppBskyEmbedImages.Main)
127
128 for image in model.images:
129 attachments.append(BlueskyAttachment(
130 get_blob_url(image.image.cid.encode()),
131 'image', image.alt
132 ))
133 elif embed.get('$type') == 'app.bsky.embed.video':
134 model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
135 assert isinstance(model, models.AppBskyEmbedVideo.Main)
136
137 attachments.append(BlueskyAttachment(
138 get_blob_url(model.video.cid.encode()),
139 'video', model.alt if model.alt else ''
140 ))
141 self.attachments = attachments
142
143 def get_tokens(self) -> list[cross.Token]:
144 return self.tokens
145
146 def get_parent_id(self) -> str | None:
147 return self.parent_id
148
149 def get_post_date_iso(self) -> str:
150 return self.post.get('createdAt') or super().get_post_date_iso()
151
152 def get_cw(self) -> str:
153 return self.cw or ''
154
155 def get_id(self) -> str:
156 return self.id
157
158 def get_languages(self) -> list[str]:
159 return self.post.get('langs', []) or []
160
161 def is_sensitive(self) -> bool:
162 return self.post.get('labels', {}).get('values') or False
163
164 def get_attachments(self) -> list[cross.MediaAttachment]:
165 return self.attachments or []
166
167class BlueskyAttachment(cross.MediaAttachment):
168 def __init__(self, url: str, type: str, alt: str) -> None:
169 super().__init__()
170 self.url = url
171 self.type = type
172 self.alt = alt
173
174 def get_url(self) -> str:
175 return self.url
176
177 def get_type(self) -> str | None:
178 return self.type
179
180 def create_meta(self, bytes: bytes) -> cross.MediaMeta:
181 o_meta = media_util.get_media_meta(bytes)
182 return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
183
184 def get_alt(self) -> str:
185 return self.alt
186
187class BlueskyInput(cross.Input):
188 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
189 self.options = settings.get('options', {})
190 did, pds = resolve_identity(
191 handle=util.as_envvar(settings.get('hanlde')),
192 did=util.as_envvar(settings.get('did')),
193 pds=util.as_envvar(settings.get('pds'))
194 )
195 self.pds = pds
196
197 # PDS is Not a service, the lexicon and rids are the same across pds
198 super().__init__(SERVICE, did, settings, db)
199
200 def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
201 post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
202
203 parent_ref = None
204 if post.get('reply'):
205 parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
206
207 root_id = None
208 parent_id = None
209 if parent_ref:
210 parent_post = database.find_post(self.db, parent_ref, self.user_id, self.service)
211 if not parent_post:
212 LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref)
213 return
214
215 root_id = parent_post['id']
216 parent_id = root_id
217 if parent_post['root_id']:
218 root_id = parent_post['root_id']
219
220 LOGGER.info("Crossposting '%s'...", post_ref)
221 if root_id and parent_id:
222 database.insert_reply(
223 self.db,
224 post_ref,
225 self.user_id,
226 self.service,
227 parent_id,
228 root_id
229 )
230 else:
231 database.insert_post(
232 self.db,
233 post_ref,
234 self.user_id,
235 self.service
236 )
237
238 cross_post = BlueskyPost(self.pds, self.user_id, post)
239 for output in outputs:
240 output.accept_post(cross_post)
241 return
242
243 def _on_delete_post(self, outputs: list[cross.Output], post_id: dict):
244 identifier = json.dumps(post_id, sort_keys=True)
245 post = database.find_post(self.db, identifier, self.user_id, self.service)
246 if not post:
247 return
248
249 LOGGER.info("Deleting '%s'...", identifier)
250 for output in outputs:
251 output.delete_post(identifier)
252 database.delete_post(self.db, identifier, self.user_id, self.service)
253
254class BlueskyPdsInput(BlueskyInput):
255 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
256 super().__init__(settings, db)
257
258 def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame):
259 blocks = message.body.get('blocks')
260 if not blocks:
261 return
262
263 parsed = parse_firehose(message)
264 if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit):
265 return
266 blocks = parsed.blocks
267
268 car = None
269 def get_lazy_repo() -> CAR:
270 nonlocal car, blocks
271
272 if isinstance(blocks, str):
273 blocks = blocks.encode()
274 assert blocks
275
276 if car:
277 return car
278 car = CAR.from_bytes(blocks)
279 return car
280
281 for op in parsed.ops:
282 if op.action == 'delete':
283 if not op.prev:
284 continue
285
286 if not op.path.startswith('app.bsky.feed.post'):
287 continue
288
289 self._on_delete_post(outputs, {
290 'cid': op.prev.encode(),
291 'uri': f'at://{parsed.repo}/{op.path}'
292 })
293 continue
294
295 if op.action != 'create':
296 continue
297
298 if not op.cid:
299 continue
300
301 record_data = get_lazy_repo().blocks.get(op.cid)
302 if not record_data:
303 continue
304
305 record_dict = dict(record_data)
306 record_dict['$xpost.strongRef'] = {
307 'cid': op.cid.encode(),
308 'uri': f'at://{parsed.repo}/{op.path}'
309 }
310 if record_dict['$type'] == 'app.bsky.feed.post':
311 self._on_post(outputs, record_dict)
312
313
314 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
315 streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc"
316
317 client = AsyncFirehoseSubscribeReposClient(base_uri=streaming)
318
319 async def on_message(message: firehose_models.MessageFrame):
320 if message.header.t != '#commit':
321 return
322
323 if message.body.get('repo') != self.user_id:
324 return
325
326 if message.body.get('tooBig'):
327 LOGGER.error("#commit message is tooBig!")
328 return
329
330 submit(lambda: self.__on_commit(outputs, message))
331 return
332
333 LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos')
334 await client.start(on_message)
335
336class BlueskyOutput(cross.Output):
337 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
338 super().__init__(input, settings, db)
339 self.options = settings.get('options') or {}
340
341 if not util.as_envvar(settings.get('app-password')):
342 raise Exception("Account app password not provided!")
343
344 did, pds = resolve_identity(
345 handle=util.as_envvar(settings.get('hanlde')),
346 did=util.as_envvar(settings.get('did')),
347 pds=util.as_envvar(settings.get('pds'))
348 )
349
350 reqs = Request(timeout=Timeout(None, connect=30.0))
351
352 self.bsky = Client2(pds, request=reqs)
353 self.bsky.login(did, util.as_envvar(settings.get('app-password')))
354
355 def _find_parent(self, parent_id: str):
356 login = self.bsky.me
357 if not login:
358 raise Exception("Client not logged in!")
359
360 thread_tuple = database.find_mapped_thread(
361 self.db,
362 parent_id,
363 self.input.user_id,
364 self.input.service,
365 login.did,
366 SERVICE
367 )
368
369 if not thread_tuple:
370 LOGGER.error("Failed to find thread tuple in the database!")
371 return None
372
373 root_ref = json.loads(thread_tuple[0])
374 reply_ref = json.loads(thread_tuple[1])
375
376 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid']))
377 reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid']))
378
379 return (
380 models.create_strong_ref(root_record),
381 models.create_strong_ref(reply_record),
382 thread_tuple[2],
383 thread_tuple[3]
384 )
385
386 def _split_attachments(self, attachments: list[cross.MediaAttachment]):
387 sup_media: list[cross.MediaAttachment] = []
388 unsup_media: list[cross.MediaAttachment] = []
389
390 for attachment in attachments:
391 attachment_type = attachment.get_type()
392 if not attachment_type:
393 continue
394
395 if attachment_type in {'video', 'image'}: # TODO convert gifs to videos
396 sup_media.append(attachment)
397 else:
398 unsup_media.append(attachment)
399
400 return (sup_media, unsup_media)
401
402 def _split_media_per_post(
403 self,
404 tokens: list[client_utils.TextBuilder],
405 media: list[cross.MediaAttachment]):
406
407 posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
408 available_indices: list[int] = list(range(len(posts)))
409
410 current_image_post_idx: int | None = None
411
412 def make_blank_post() -> dict:
413 return {
414 "tokens": [client_utils.TextBuilder().text('')],
415 "attachments": []
416 }
417
418 def pop_next_empty_index() -> int:
419 if available_indices:
420 return available_indices.pop(0)
421 else:
422 new_idx = len(posts)
423 posts.append(make_blank_post())
424 return new_idx
425
426 for att in media:
427 if att.get_type() == 'video':
428 current_image_post_idx = None
429 idx = pop_next_empty_index()
430 posts[idx]["attachments"].append(att)
431 elif att.get_type() == 'image':
432 if (
433 current_image_post_idx is not None
434 and len(posts[current_image_post_idx]["attachments"]) < 4
435 ):
436 posts[current_image_post_idx]["attachments"].append(att)
437 else:
438 idx = pop_next_empty_index()
439 posts[idx]["attachments"].append(att)
440 current_image_post_idx = idx
441
442 result: list[tuple[client_utils.TextBuilder, list[cross.MediaAttachment]]] = []
443 for p in posts:
444 result.append((p["tokens"], p["attachments"]))
445 return result
446
447 def accept_post(self, post: cross.Post):
448 login = self.bsky.me
449 if not login:
450 raise Exception("Client not logged in!")
451
452 parent_id = post.get_parent_id()
453
454 # used for db insertion
455 new_root_id = None
456 new_parent_id = None
457
458 root_ref = None
459 reply_ref = None
460 if parent_id:
461 parents = self._find_parent(parent_id)
462 if not parents:
463 return
464 root_ref, reply_ref, new_root_id, new_parent_id = parents
465
466 tokens = post.get_tokens().copy()
467
468 unique_labels: set[str] = set()
469 cw = post.get_cw()
470 if cw:
471 tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
472 unique_labels.add('graphic-media')
473
474 if ADULT_PATTERN.search(cw):
475 unique_labels.add('sexual')
476
477 if PORN_PATTERN.search(cw):
478 unique_labels.add('porn')
479
480 if post.is_sensitive():
481 unique_labels.add('graphic-media')
482
483 labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
484
485 sup_media, unsup_media = self._split_attachments(post.get_attachments())
486
487 if unsup_media:
488 if tokens:
489 tokens.append(cross.TextToken('\n'))
490 for i, attachment in enumerate(unsup_media):
491 tokens.append(cross.LinkToken(
492 attachment.get_url(),
493 f"[{media_util.get_filename_from_url(attachment.get_url())}]"
494 ))
495 tokens.append(cross.TextToken(' '))
496
497
498 split_tokens: list[list[cross.Token]] = util.split_tokens(tokens, 300)
499 post_text: list[client_utils.TextBuilder] = []
500
501 # convert tokens into rich text. skip post if contains unsupported tokens
502 for block in split_tokens:
503 rich_text = tokens_to_richtext(block)
504
505 if not rich_text:
506 LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id())
507 return
508 post_text.append(rich_text)
509
510 if not post_text:
511 post_text = [client_utils.TextBuilder().text('')]
512
513 # download media first. increased RAM usage, but more reliable
514 for m in sup_media:
515 if not m.bytes:
516 if m.get_type() == 'image':
517 image_bytes = media_util.download_blob(m.get_url(), max_bytes=2_000_000)
518 if not image_bytes:
519 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
520 return
521 m.bytes = image_bytes
522 elif m.get_type() == 'video':
523 video_bytes = media_util.download_blob(m.get_url(), max_bytes=100_000_000)
524 if not video_bytes:
525 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
526 return
527 m.bytes = video_bytes
528
529 created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
530 baked_media = self._split_media_per_post(post_text, sup_media)
531
532 for text, attachments in baked_media:
533 if not attachments:
534 if reply_ref and root_ref:
535 new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
536 parent=reply_ref,
537 root=root_ref
538 ), labels=labels, time_iso=post.get_post_date_iso())
539 else:
540 new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso())
541 root_ref = models.create_strong_ref(new_post)
542
543 self.bsky.create_gates(self.options, new_post.uri, time_iso=post.get_post_date_iso())
544 reply_ref = models.create_strong_ref(new_post)
545 created_records.append(new_post)
546 else:
547 # if a single post is an image - everything else is an image
548 if attachments[0].get_type() == 'image':
549 images: list[bytes] = []
550 image_alts: list[str] = []
551 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
552
553 for attachment in attachments:
554 assert attachment.bytes
555 image_io = media_util.compress_image(attachment.bytes, quality=100)
556 metadata = attachment.create_meta(image_io)
557
558 if len(image_io) > 1_000_000:
559 LOGGER.info("Compressing %s...", attachment.get_url())
560
561 images.append(image_io)
562 image_alts.append(attachment.get_alt())
563 image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
564 width=metadata.get_width(),
565 height=metadata.get_height()
566 ))
567
568 new_post = self.bsky.send_images(
569 text=post_text[0],
570 images=images,
571 image_alts=image_alts,
572 image_aspect_ratios=image_aspect_ratios,
573 reply_to= models.AppBskyFeedPost.ReplyRef(
574 parent=reply_ref,
575 root=root_ref
576 ) if root_ref and reply_ref else None,
577 labels=labels,
578 time_iso=post.get_post_date_iso()
579 )
580 if not root_ref:
581 root_ref = models.create_strong_ref(new_post)
582
583 self.bsky.create_gates(self.options, new_post.uri, time_iso=post.get_post_date_iso())
584 reply_ref = models.create_strong_ref(new_post)
585 created_records.append(new_post)
586 else: # video is guarantedd to be one
587 video_data = attachments[0]
588 assert video_data.bytes
589 video_io = video_data.bytes
590
591 metadata = video_data.create_meta(video_io)
592 if metadata.get_duration() > 180:
593 LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
594 return
595
596 probe = media_util.probe_bytes(video_io)
597 format_name = probe['format']['format_name']
598 if 'mp4' not in format_name.split(','):
599 LOGGER.error("Converting %s to mp4...", video_data.get_url())
600 video_io = media_util.convert_to_mp4(video_io)
601
602 aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
603 width=metadata.get_width(),
604 height=metadata.get_height()
605 )
606
607 new_post = self.bsky.send_video(
608 text=post_text[0],
609 video=video_io,
610 video_aspect_ratio=aspect_ratio,
611 video_alt=video_data.get_alt(),
612 reply_to= models.AppBskyFeedPost.ReplyRef(
613 parent=reply_ref,
614 root=root_ref
615 ) if root_ref and reply_ref else None,
616 labels=labels,
617 time_iso=post.get_post_date_iso()
618 )
619 if not root_ref:
620 root_ref = models.create_strong_ref(new_post)
621
622 self.bsky.create_gates(self.options, new_post.uri, time_iso=post.get_post_date_iso())
623 reply_ref = models.create_strong_ref(new_post)
624 created_records.append(new_post)
625
626 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
627 assert db_post, "ghghghhhhh"
628
629 db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
630
631 if new_root_id is None or new_parent_id is None:
632 new_root_id = database.insert_post(
633 self.db,
634 db_identifiers[0],
635 login.did,
636 SERVICE
637 )
638 new_parent_id = new_root_id
639 database.insert_mapping(self.db, db_post['id'], new_parent_id)
640 db_identifiers = db_identifiers[1:]
641
642 for db_id in db_identifiers:
643 new_parent_id = database.insert_reply(
644 self.db,
645 db_id,
646 login.did,
647 SERVICE,
648 new_parent_id,
649 new_root_id
650 )
651 database.insert_mapping(self.db, db_post['id'], new_parent_id)
652
653 def delete_post(self, identifier: str):
654 login = self.bsky.me
655 if not login:
656 raise Exception("Client not logged in!")
657
658 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
659 if not post:
660 return
661
662 mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
663 for mapping in mappings[::-1]:
664 LOGGER.info("Deleting '%s'...", mapping[0])
665 self.bsky.delete_post(json.loads(mapping[0])['uri'])
666 database.delete_post(self.db, mapping[0], SERVICE, login.did)
667
668
669def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
670 builder = client_utils.TextBuilder()
671
672 def flatten_link(href: str):
673 split = href.split('://', 1)
674 if len(split) > 1:
675 href = split[1]
676
677 if len(href) > 32:
678 href = href[:32] + '...'
679
680 return href
681
682 for token in tokens:
683 if isinstance(token, cross.TextToken):
684 builder.text(token.text)
685 elif isinstance(token, cross.LinkToken):
686 if util.canonical_label(token.label, token.href):
687 builder.link(flatten_link(token.href), token.href)
688 continue
689
690 builder.link(token.label, token.href)
691 elif isinstance(token, cross.TagToken):
692 builder.tag('#' + token.tag, token.tag)
693 else:
694 # fail on unsupported tokens
695 return None
696
697 return builder