social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from atproto import client_utils, Request, AsyncFirehoseSubscribeReposClient, CAR, CID
2from atproto_client import models
3from atproto_client.models.utils import get_or_create as get_model_or_create
4from atproto_client.models.blob_ref import BlobRef
5from atproto_firehose import models as firehose_models, parse_subscribe_repos_message as parse_firehose
6from atproto2 import Client2, resolve_identity
7from httpx import Timeout
8import json
9import cross
10import database
11from database import DataBaseWorker
12import util
13import media_util
14from util import LOGGER
15import re
16from typing import Callable, Any
17
18# only for lexicon reference
19SERVICE = 'https://bsky.app'
20
21# TODO this is terrible and stupid
22ADULT_PATTERN = re.compile(r"\b(sexual content|nsfw|erotic|adult only|18\+)\b", re.IGNORECASE)
23PORN_PATTERN = re.compile(r"\b(porn|yiff|hentai|pornographic|fetish)\b", re.IGNORECASE)
24
25def tokenize_post(post: dict) -> list[cross.Token]:
26 text: str = post.get('text', '')
27 if not text:
28 return []
29 text = text.encode(encoding='utf-8').decode(encoding='utf-8')
30
31 facets: list[dict] = post.get('facets', [])
32 if not facets:
33 return [cross.TextToken(text)]
34
35 slices: list[tuple[int, int, str, str]] = []
36
37 for facet in facets:
38 features: list[dict] = facet.get('features', [])
39 if not features:
40 continue
41
42 # we don't support overlapping facets/features
43 feature = features[0]
44 feature_type = feature['$type']
45 index = facet['index']
46 if feature_type == 'app.bsky.richtext.facet#tag':
47 slices.append((index['byteStart'], index['byteEnd'], 'tag', feature['tag']))
48 elif feature_type == 'app.bsky.richtext.facet#link':
49 slices.append((index['byteStart'], index['byteEnd'], 'link', feature['uri']))
50 elif feature_type == 'app.bsky.richtext.facet#mention':
51 slices.append((index['byteStart'], index['byteEnd'], 'mention', feature['did']))
52
53 if not slices:
54 return [cross.TextToken(text)]
55
56 slices.sort(key=lambda s: s[0])
57 unique: list[tuple[int, int, str, str]] = []
58 current_end = 0
59 for start, end, ttype, val in slices:
60 if start >= current_end:
61 unique.append((start, end, ttype, val))
62 current_end = end
63
64 if not unique:
65 return [cross.TextToken(text)]
66
67 tokens: list[cross.Token] = []
68 prev = 0
69
70 for start, end, ttype, val in unique:
71 if start > prev:
72 # text between facets
73 tokens.append(cross.TextToken(text[prev:start]))
74 # facet token
75 if ttype == 'link':
76 label = text[start:end]
77
78 # try to unflatten links
79 split = val.split('://')
80 if len(split) > 1:
81 if split[1].startswith(label):
82 tokens.append(cross.LinkToken(val, ''))
83 elif label.endswith('...') and split[1].startswith(label[:-3]):
84 tokens.append(cross.LinkToken(val, ''))
85 else:
86 tokens.append(cross.LinkToken(val, label))
87 elif ttype == 'tag':
88 tokens.append(cross.TagToken(val))
89 elif ttype == 'mention':
90 tokens.append(cross.MentionToken(text[start:end], val))
91 prev = end
92
93 if prev < len(text):
94 tokens.append(cross.TextToken(text[prev:]))
95
96 for t in tokens:
97 print(t.__dict__)
98
99 return tokens
100
101class BlueskyPost(cross.Post):
102 def __init__(self, post: dict, tokens: list[cross.Token], attachments: list[media_util.MediaInfo]) -> None:
103 super().__init__()
104 self.post = post
105 self.tokens = tokens
106
107 self.id = json.dumps(self.post['$xpost.strongRef'], sort_keys=True)
108
109 self.parent_id = None
110 if self.post.get('reply'):
111 self.parent_id = json.dumps(self.post['reply']['parent'], sort_keys=True)
112
113 labels = self.post.get('labels', {}).get('values')
114 self.cw = ''
115 if labels:
116 self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
117 self.attachments = attachments
118
119 def get_tokens(self) -> list[cross.Token]:
120 return self.tokens
121
122 def get_parent_id(self) -> str | None:
123 return self.parent_id
124
125 def get_post_date_iso(self) -> str:
126 return self.post.get('createdAt') or super().get_post_date_iso()
127
128 def get_cw(self) -> str:
129 return self.cw or ''
130
131 def get_id(self) -> str:
132 return self.id
133
134 def get_languages(self) -> list[str]:
135 return self.post.get('langs', []) or []
136
137 def is_sensitive(self) -> bool:
138 return self.post.get('labels', {}).get('values') or False
139
140 def get_attachments(self) -> list[media_util.MediaInfo]:
141 return self.attachments
142
143class BlueskyInputOptions():
144 def __init__(self, o: dict) -> None:
145 self.filters = [re.compile(f) for f in o.get('regex_filters', [])]
146
147class BlueskyInput(cross.Input):
148 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
149 self.options = BlueskyInputOptions(settings.get('options', {}))
150 did, pds = resolve_identity(
151 handle=util.as_envvar(settings.get('handle')),
152 did=util.as_envvar(settings.get('did')),
153 pds=util.as_envvar(settings.get('pds'))
154 )
155 self.pds = pds
156
157 # PDS is Not a service, the lexicon and rids are the same across pds
158 super().__init__(SERVICE, did, settings, db)
159
160 def _on_post(self, outputs: list[cross.Output], post: dict[str, Any]):
161 post_ref = json.dumps(post['$xpost.strongRef'], sort_keys=True)
162
163 parent_ref = None
164 if post.get('reply'):
165 parent_ref = json.dumps(post['reply']['parent'], sort_keys=True)
166
167 success = database.try_insert_post(self.db, post_ref, parent_ref, self.user_id, self.service)
168 if not success:
169 LOGGER.info("Skipping '%s' as parent post was not found in db!", post_ref)
170 return
171
172 tokens = tokenize_post(post)
173 if not cross.test_filters(tokens, self.options.filters):
174 LOGGER.info("Skipping '%s'. Matched a filter!", post_ref)
175 return
176
177 LOGGER.info("Crossposting '%s'...", post_ref)
178
179 def get_blob_url(blob: str):
180 return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}'
181
182 attachments: list[media_util.MediaInfo] = []
183 embed = post.get('embed', {})
184 if embed.get('$type') == 'app.bsky.embed.images':
185 model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
186 assert isinstance(model, models.AppBskyEmbedImages.Main)
187
188 for image in model.images:
189 url = get_blob_url(image.image.cid.encode())
190 LOGGER.info("Downloading %s...", url)
191 io = media_util.download_media(url, image.alt)
192 if not io:
193 LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
194 return
195 attachments.append(io)
196 elif embed.get('$type') == 'app.bsky.embed.video':
197 model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
198 assert isinstance(model, models.AppBskyEmbedVideo.Main)
199 url = get_blob_url(model.video.cid.encode())
200 LOGGER.info("Downloading %s...", url)
201 io = media_util.download_media(url, model.alt if model.alt else '')
202 if not io:
203 LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
204 return
205 attachments.append(io)
206
207 cross_post = BlueskyPost(post, tokens, attachments)
208 for output in outputs:
209 output.accept_post(cross_post)
210 return
211
212 def _on_delete_post(self, outputs: list[cross.Output], post_id: dict):
213 identifier = json.dumps(post_id, sort_keys=True)
214 post = database.find_post(self.db, identifier, self.user_id, self.service)
215 if not post:
216 return
217
218 LOGGER.info("Deleting '%s'...", identifier)
219 for output in outputs:
220 output.delete_post(identifier)
221 database.delete_post(self.db, identifier, self.user_id, self.service)
222
223class BlueskyPdsInput(BlueskyInput):
224 def __init__(self, settings: dict, db: DataBaseWorker) -> None:
225 super().__init__(settings, db)
226
227 def __on_commit(self, outputs: list[cross.Output], message: firehose_models.MessageFrame):
228 blocks = message.body.get('blocks')
229 if not blocks:
230 return
231
232 parsed = parse_firehose(message)
233 if not isinstance(parsed, models.ComAtprotoSyncSubscribeRepos.Commit):
234 return
235 blocks = parsed.blocks
236
237 car = None
238 def get_lazy_repo() -> CAR:
239 nonlocal car, blocks
240
241 if isinstance(blocks, str):
242 blocks = blocks.encode()
243 assert blocks
244
245 if car:
246 return car
247 car = CAR.from_bytes(blocks)
248 return car
249
250 for op in parsed.ops:
251 if op.action == 'delete':
252 if not op.prev:
253 continue
254
255 if not op.path.startswith('app.bsky.feed.post'):
256 continue
257
258 self._on_delete_post(outputs, {
259 'cid': op.prev.encode(),
260 'uri': f'at://{parsed.repo}/{op.path}'
261 })
262 continue
263
264 if op.action != 'create':
265 continue
266
267 if not op.cid:
268 continue
269
270 record_data = get_lazy_repo().blocks.get(op.cid)
271 if not record_data:
272 continue
273
274 record_dict = dict(record_data)
275 record_dict['$xpost.strongRef'] = {
276 'cid': op.cid.encode(),
277 'uri': f'at://{parsed.repo}/{op.path}'
278 }
279 if record_dict['$type'] == 'app.bsky.feed.post':
280 self._on_post(outputs, record_dict)
281
282
283 async def listen(self, outputs: list[cross.Output], submit: Callable[[Callable[[], Any]], Any]):
284 streaming: str = f"wss://{self.pds.split("://", 1)[1]}/xrpc"
285
286 client = AsyncFirehoseSubscribeReposClient(base_uri=streaming)
287
288 async def on_message(message: firehose_models.MessageFrame):
289 if message.header.t != '#commit':
290 return
291
292 if message.body.get('repo') != self.user_id:
293 return
294
295 if message.body.get('tooBig'):
296 LOGGER.error("#commit message is tooBig!")
297 return
298
299 submit(lambda: self.__on_commit(outputs, message))
300 return
301
302 LOGGER.info("Listening to %s...", streaming + '/com.atproto.sync.subscribeRepos')
303 await client.start(on_message)
304
305ALLOWED_GATES = ['mentioned', 'following', 'followers', 'everybody']
306
307class BlueskyOutputOptions:
308 def __init__(self, o: dict) -> None:
309 self.quote_gate: bool = False
310 self.thread_gate: list[str] = ['everybody']
311 self.encode_videos: bool = True
312
313 quote_gate = o.get('quote_gate')
314 if quote_gate is not None:
315 self.quote_gate = bool(quote_gate)
316
317 thread_gate = o.get('thread_gate')
318 if thread_gate is not None:
319 if any([v not in ALLOWED_GATES for v in thread_gate]):
320 raise ValueError(f"'thread_gate' only accepts {', '.join(ALLOWED_GATES)} or [], got: {thread_gate}")
321 self.thread_gate = thread_gate
322
323 encode_videos = o.get('encode_videos')
324 if encode_videos is not None:
325 self.encode_videos = bool(encode_videos)
326
327class BlueskyOutput(cross.Output):
328 def __init__(self, input: cross.Input, settings: dict, db: DataBaseWorker) -> None:
329 super().__init__(input, settings, db)
330 self.options = BlueskyOutputOptions(settings.get('options') or {})
331
332 if not util.as_envvar(settings.get('app-password')):
333 raise Exception("Account app password not provided!")
334
335 did, pds = resolve_identity(
336 handle=util.as_envvar(settings.get('handle')),
337 did=util.as_envvar(settings.get('did')),
338 pds=util.as_envvar(settings.get('pds'))
339 )
340
341 reqs = Request(timeout=Timeout(None, connect=30.0))
342
343 self.bsky = Client2(pds, request=reqs)
344 self.bsky.login(did, util.as_envvar(settings.get('app-password')))
345
346 def _find_parent(self, parent_id: str):
347 login = self.bsky.me
348 if not login:
349 raise Exception("Client not logged in!")
350
351 thread_tuple = database.find_mapped_thread(
352 self.db,
353 parent_id,
354 self.input.user_id,
355 self.input.service,
356 login.did,
357 SERVICE
358 )
359
360 if not thread_tuple:
361 LOGGER.error("Failed to find thread tuple in the database!")
362 return None
363
364 root_ref = json.loads(thread_tuple[0])
365 reply_ref = json.loads(thread_tuple[1])
366
367 root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_ref['uri']), cid=str(root_ref['cid']))
368 reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_ref['uri']), cid=str(reply_ref['cid']))
369
370 return (
371 models.create_strong_ref(root_record),
372 models.create_strong_ref(reply_record),
373 thread_tuple[2],
374 thread_tuple[3]
375 )
376
377 def _split_attachments(self, attachments: list[media_util.MediaInfo]):
378 sup_media: list[media_util.MediaInfo] = []
379 unsup_media: list[media_util.MediaInfo] = []
380
381 for a in attachments:
382 if a.mime.startswith('image/') or a.mime.startswith('video/'): # TODO convert gifs to videos
383 sup_media.append(a)
384 else:
385 unsup_media.append(a)
386
387 return (sup_media, unsup_media)
388
389 def _split_media_per_post(
390 self,
391 tokens: list[client_utils.TextBuilder],
392 media: list[media_util.MediaInfo]):
393
394 posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
395 available_indices: list[int] = list(range(len(posts)))
396
397 current_image_post_idx: int | None = None
398
399 def make_blank_post() -> dict:
400 return {
401 "tokens": [client_utils.TextBuilder().text('')],
402 "attachments": []
403 }
404
405 def pop_next_empty_index() -> int:
406 if available_indices:
407 return available_indices.pop(0)
408 else:
409 new_idx = len(posts)
410 posts.append(make_blank_post())
411 return new_idx
412
413 for att in media:
414 if att.mime.startswith('video/'):
415 current_image_post_idx = None
416 idx = pop_next_empty_index()
417 posts[idx]["attachments"].append(att)
418 elif att.mime.startswith('image/'):
419 if (
420 current_image_post_idx is not None
421 and len(posts[current_image_post_idx]["attachments"]) < 4
422 ):
423 posts[current_image_post_idx]["attachments"].append(att)
424 else:
425 idx = pop_next_empty_index()
426 posts[idx]["attachments"].append(att)
427 current_image_post_idx = idx
428
429 result: list[tuple[client_utils.TextBuilder, list[media_util.MediaInfo]]] = []
430 for p in posts:
431 result.append((p["tokens"], p["attachments"]))
432 return result
433
434 def accept_post(self, post: cross.Post):
435 login = self.bsky.me
436 if not login:
437 raise Exception("Client not logged in!")
438
439 parent_id = post.get_parent_id()
440
441 # used for db insertion
442 new_root_id = None
443 new_parent_id = None
444
445 root_ref = None
446 reply_ref = None
447 if parent_id:
448 parents = self._find_parent(parent_id)
449 if not parents:
450 return
451 root_ref, reply_ref, new_root_id, new_parent_id = parents
452
453 tokens = post.get_tokens().copy()
454
455 unique_labels: set[str] = set()
456 cw = post.get_cw()
457 if cw:
458 tokens.insert(0, cross.TextToken("CW: " + cw + "\n\n"))
459 unique_labels.add('graphic-media')
460
461 # from bsky.app, a post can only have one of those labels
462 if PORN_PATTERN.search(cw):
463 unique_labels.add('porn')
464 elif ADULT_PATTERN.search(cw):
465 unique_labels.add('sexual')
466
467 if post.is_sensitive():
468 unique_labels.add('graphic-media')
469
470 labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
471
472 sup_media, unsup_media = self._split_attachments(post.get_attachments())
473
474 if unsup_media:
475 if tokens:
476 tokens.append(cross.TextToken('\n'))
477 for i, attachment in enumerate(unsup_media):
478 tokens.append(cross.LinkToken(
479 attachment.url,
480 f"[{media_util.get_filename_from_url(attachment.url)}]"
481 ))
482 tokens.append(cross.TextToken(' '))
483
484
485 split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300)
486 post_text: list[client_utils.TextBuilder] = []
487
488 # convert tokens into rich text. skip post if contains unsupported tokens
489 for block in split_tokens:
490 rich_text = tokens_to_richtext(block)
491
492 if not rich_text:
493 LOGGER.error("Skipping '%s' as it contains invalid rich text types!", post.get_id())
494 return
495 post_text.append(rich_text)
496
497 if not post_text:
498 post_text = [client_utils.TextBuilder().text('')]
499
500 for m in sup_media:
501 if m.mime.startswith('image/'):
502 if len(m.io) > 2_000_000:
503 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large.", post.get_id())
504 return
505
506 if m.mime.startswith('video/'):
507 if m.mime != 'video/mp4' and not self.options.encode_videos:
508 LOGGER.info("Video is not mp4, but encoding is disabled. Skipping '%s'...", post.get_id())
509 return
510
511 if len(m.io) > 100_000_000:
512 LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
513 return
514
515 created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
516 baked_media = self._split_media_per_post(post_text, sup_media)
517
518 for text, attachments in baked_media:
519 if not attachments:
520 if reply_ref and root_ref:
521 new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
522 parent=reply_ref,
523 root=root_ref
524 ), labels=labels, time_iso=post.get_post_date_iso())
525 else:
526 new_post = self.bsky.send_post(text, labels=labels, time_iso=post.get_post_date_iso())
527 root_ref = models.create_strong_ref(new_post)
528
529 self.bsky.create_gates(
530 self.options.thread_gate,
531 self.options.quote_gate,
532 new_post.uri,
533 time_iso=post.get_post_date_iso()
534 )
535 reply_ref = models.create_strong_ref(new_post)
536 created_records.append(new_post)
537 else:
538 # if a single post is an image - everything else is an image
539 if attachments[0].mime.startswith('image/'):
540 images: list[bytes] = []
541 image_alts: list[str] = []
542 image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
543
544 for attachment in attachments:
545 image_io = media_util.compress_image(attachment.io, quality=100)
546 metadata = media_util.get_media_meta(image_io)
547
548 if len(image_io) > 1_000_000:
549 LOGGER.info("Compressing %s...", attachment.name)
550 image_io = media_util.compress_image(image_io)
551
552 images.append(image_io)
553 image_alts.append(attachment.alt)
554 image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
555 width=metadata['width'],
556 height=metadata['height']
557 ))
558
559 new_post = self.bsky.send_images(
560 text=post_text[0],
561 images=images,
562 image_alts=image_alts,
563 image_aspect_ratios=image_aspect_ratios,
564 reply_to= models.AppBskyFeedPost.ReplyRef(
565 parent=reply_ref,
566 root=root_ref
567 ) if root_ref and reply_ref else None,
568 labels=labels,
569 time_iso=post.get_post_date_iso()
570 )
571 if not root_ref:
572 root_ref = models.create_strong_ref(new_post)
573
574 self.bsky.create_gates(
575 self.options.thread_gate,
576 self.options.quote_gate,
577 new_post.uri,
578 time_iso=post.get_post_date_iso()
579 )
580 reply_ref = models.create_strong_ref(new_post)
581 created_records.append(new_post)
582 else: # video is guarantedd to be one
583 metadata = media_util.get_media_meta(attachments[0].io)
584 if metadata['duration'] > 180:
585 LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
586 return
587
588 video_io = attachments[0].io
589 if attachments[0].mime != 'video/mp4':
590 LOGGER.info("Converting %s to mp4...", attachments[0].name)
591 video_io = media_util.convert_to_mp4(video_io)
592
593 aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
594 width=metadata['width'],
595 height=metadata['height']
596 )
597
598 new_post = self.bsky.send_video(
599 text=post_text[0],
600 video=video_io,
601 video_aspect_ratio=aspect_ratio,
602 video_alt=attachments[0].alt,
603 reply_to= models.AppBskyFeedPost.ReplyRef(
604 parent=reply_ref,
605 root=root_ref
606 ) if root_ref and reply_ref else None,
607 labels=labels,
608 time_iso=post.get_post_date_iso()
609 )
610 if not root_ref:
611 root_ref = models.create_strong_ref(new_post)
612
613 self.bsky.create_gates(
614 self.options.thread_gate,
615 self.options.quote_gate,
616 new_post.uri,
617 time_iso=post.get_post_date_iso()
618 )
619 reply_ref = models.create_strong_ref(new_post)
620 created_records.append(new_post)
621
622 db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
623 assert db_post, "ghghghhhhh"
624
625 db_identifiers = [json.dumps(cr.model_dump(), sort_keys=True) for cr in created_records]
626
627 if new_root_id is None or new_parent_id is None:
628 new_root_id = database.insert_post(
629 self.db,
630 db_identifiers[0],
631 login.did,
632 SERVICE
633 )
634 new_parent_id = new_root_id
635 database.insert_mapping(self.db, db_post['id'], new_parent_id)
636 db_identifiers = db_identifiers[1:]
637
638 for db_id in db_identifiers:
639 new_parent_id = database.insert_reply(
640 self.db,
641 db_id,
642 login.did,
643 SERVICE,
644 new_parent_id,
645 new_root_id
646 )
647 database.insert_mapping(self.db, db_post['id'], new_parent_id)
648
649 def delete_post(self, identifier: str):
650 login = self.bsky.me
651 if not login:
652 raise Exception("Client not logged in!")
653
654 post = database.find_post(self.db, identifier, self.input.user_id, self.input.service)
655 if not post:
656 return
657
658 mappings = database.find_mappings(self.db, post['id'], SERVICE, login.did)
659 for mapping in mappings[::-1]:
660 LOGGER.info("Deleting '%s'...", mapping[0])
661 self.bsky.delete_post(json.loads(mapping[0])['uri'])
662 database.delete_post(self.db, mapping[0], SERVICE, login.did)
663
664
665def tokens_to_richtext(tokens: list[cross.Token]) -> client_utils.TextBuilder | None:
666 builder = client_utils.TextBuilder()
667
668 def flatten_link(href: str):
669 split = href.split('://', 1)
670 if len(split) > 1:
671 href = split[1]
672
673 if len(href) > 32:
674 href = href[:32] + '...'
675
676 return href
677
678 for token in tokens:
679 if isinstance(token, cross.TextToken):
680 builder.text(token.text)
681 elif isinstance(token, cross.LinkToken):
682 if util.canonical_label(token.label, token.href):
683 builder.link(flatten_link(token.href), token.href)
684 continue
685
686 builder.link(token.label, token.href)
687 elif isinstance(token, cross.TagToken):
688 builder.tag('#' + token.tag, token.tag)
689 else:
690 # fail on unsupported tokens
691 return None
692
693 return builder