social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

this is a bit of a mess

- added remote attachments for unsupported media types.
- added media splitting for bluesky posts
- broke up the accept_post function

zenfyr.dev 7b2d0f10 899ae340

verified
+196 -116
bluesky.py
···
self.client.login(did, util.get_or_envvar(settings, 'app-password'))
self.bsky = Bluesky(self.client)
+
def _find_parent(self, parent_id: str):
+
login = self.client.me
+
if not login:
+
raise Exception("Client not logged in!")
+
+
reply_data = database.find_post(self.db, parent_id, self.input.user_id, self.input.service)
+
assert reply_data, "reply_data requested, but doesn't exist in db (should've been skipped bt firehose)"
+
+
reply_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['id'], SERVICE, login.did)]
+
if not reply_mappings:
+
LOGGER.error("Failed to find mappings for a post in the db!")
+
return None
+
+
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[-1]['uri']), cid=str(reply_mappings[-1]['cid']))
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[0]['uri']), cid=str(reply_mappings[0]['cid']))
+
if reply_data['root_id']:
+
root_data = database.find_post_by_id(self.db, reply_data['root_id'])
+
assert root_data, "root_data requested but doesn't exist in db"
+
+
root_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['root_id'], SERVICE, login.did)]
+
if not root_mappings:
+
LOGGER.error("Failed to find mappings for a post in the db!")
+
return None
+
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_mappings[0]['uri']), cid=str(root_mappings[0]['cid']))
+
+
return (
+
models.create_strong_ref(root_record),
+
models.create_strong_ref(reply_record),
+
reply_data['root_id'],
+
reply_data['id']
+
)
+
+
def _split_attachments(self, attachments: list[cross.MediaAttachment]):
+
sup_media: list[cross.MediaAttachment] = []
+
unsup_media: list[cross.MediaAttachment] = []
+
+
for attachment in attachments:
+
attachment_type = attachment.get_type()
+
if not attachment_type:
+
continue
+
+
if attachment_type in {'video', 'image'}: # TODO convert gifs to videos
+
sup_media.append(attachment)
+
else:
+
unsup_media.append(attachment)
+
+
return (sup_media, unsup_media)
+
+
def _split_media_per_post(
+
self,
+
tokens: list[client_utils.TextBuilder],
+
media: list[cross.MediaAttachment]):
+
+
posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
+
available_indices: list[int] = list(range(len(posts)))
+
+
current_image_post_idx: int | None = None
+
+
def make_blank_post() -> dict:
+
return {
+
"tokens": [client_utils.TextBuilder().text('')],
+
"attachments": []
+
}
+
+
def pop_next_empty_index() -> int:
+
if available_indices:
+
return available_indices.pop(0)
+
else:
+
new_idx = len(posts)
+
posts.append(make_blank_post())
+
return new_idx
+
+
for att in media:
+
if att.get_type() == 'video':
+
current_image_post_idx = None
+
idx = pop_next_empty_index()
+
posts[idx]["attachments"].append(att)
+
elif att.get_type() == 'image':
+
if (
+
current_image_post_idx is not None
+
and len(posts[current_image_post_idx]["attachments"]) < 4
+
):
+
posts[current_image_post_idx]["attachments"].append(att)
+
else:
+
idx = pop_next_empty_index()
+
posts[idx]["attachments"].append(att)
+
current_image_post_idx = idx
+
+
result: list[tuple[client_utils.TextBuilder, list[cross.MediaAttachment]]] = []
+
for p in posts:
+
result.append((p["tokens"], p["attachments"]))
+
return result
+
def accept_post(self, post: cross.Post):
login = self.client.me
if not login:
···
root_ref = None
reply_ref = None
if parent_id:
-
# parentless posts are skipped by the input
-
reply_data = database.find_post(self.db, parent_id, self.input.user_id, self.input.service)
-
assert reply_data, "reply_data requested, but doesn't exist in db (should've been skipped bt firehose)"
-
-
reply_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['id'], SERVICE, login.did)]
-
if not reply_mappings:
-
LOGGER.error("Failed to find mappings for a post in the db!")
+
parents = self._find_parent(parent_id)
+
if not parents:
return
-
-
reply_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[-1]['uri']), cid=str(reply_mappings[-1]['cid']))
-
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(reply_mappings[0]['uri']), cid=str(reply_mappings[0]['cid']))
-
if reply_data['root_id']:
-
root_data = database.find_post_by_id(self.db, reply_data['root_id'])
-
assert root_data, "root_data requested but doesn't exist in db"
-
-
root_mappings = [json.loads(data[0]) for data in database.find_mappings(self.db, reply_data['root_id'], SERVICE, login.did)]
-
if not root_mappings:
-
LOGGER.error("Failed to find mappings for a post in the db!")
-
return
-
root_record = models.AppBskyFeedPost.CreateRecordResponse(uri=str(root_mappings[0]['uri']), cid=str(root_mappings[0]['cid']))
-
-
new_root_id = reply_data['root_id']
-
new_parent_id = reply_data['id']
-
-
root_ref = models.create_strong_ref(root_record)
-
reply_ref = models.create_strong_ref(reply_record)
+
root_ref, reply_ref, new_root_id, new_parent_id = parents
tokens = post.get_tokens()
···
unique_labels.add('graphic-media')
labels = models.ComAtprotoLabelDefs.SelfLabels(values=[models.ComAtprotoLabelDefs.SelfLabel(val=label) for label in unique_labels])
+
+
sup_media, unsup_media = self._split_attachments(post.get_attachments())
+
+
if unsup_media:
+
if tokens:
+
tokens.append(cross.TextToken('\n'))
+
for i, attachment in enumerate(unsup_media):
+
tokens.append(cross.LinkToken(
+
attachment.get_url(),
+
f"[{media_util.get_filename_from_url(attachment.get_url())}]"
+
))
+
tokens.append(cross.TextToken(' '))
+
split_tokens: list[list[cross.Token]] = util.split_tokens(post.get_tokens(), 300)
post_text: list[client_utils.TextBuilder] = []
···
if not post_text:
post_text = [client_utils.TextBuilder().text('')]
+
# download media first. increased RAM usage, but more reliable
+
for m in sup_media:
+
if not m.bytes:
+
if m.get_type() == 'image':
+
image_bytes = media_util.download_blob(m.get_url(), max_bytes=2_000_000)
+
if not image_bytes:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
+
return
+
m.bytes = image_bytes
+
elif m.get_type() == 'video':
+
video_bytes = media_util.download_blob(m.get_url(), max_bytes=100_000_000)
+
if not video_bytes:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
+
return
+
m.bytes = video_bytes
+
created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
-
attachments = post.get_attachments()
-
if not attachments:
-
for text in post_text:
+
baked_media = self._split_media_per_post(post_text, sup_media)
+
+
for text, attachments in baked_media:
+
if not attachments:
if reply_ref and root_ref:
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
parent=reply_ref,
···
self.bsky.create_gates(self.options, new_post.uri)
reply_ref = models.create_strong_ref(new_post)
created_records.append(new_post)
-
elif len(attachments) <= 4:
-
if len(attachments) == 1 and attachments[0].get_type() == 'video':
-
video_data = attachments[0]
-
-
video_io = media_util.download_blob(video_data.get_url(), max_bytes=100_000_000)
-
if not video_io:
-
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
-
return
-
-
metadata = video_data.create_meta(video_io)
-
if metadata.get_duration() > 180:
-
LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
-
return
-
-
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
-
width=metadata.get_width(),
-
height=metadata.get_height()
-
)
-
-
new_post = self.bsky.send_video(
-
text=post_text[0],
-
video=video_io,
-
video_aspect_ratio=aspect_ratio,
-
video_alt=video_data.get_alt(),
-
reply_to= models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
) if root_ref and reply_ref else None,
-
labels=labels
-
)
-
if not root_ref:
-
root_ref = models.create_strong_ref(new_post)
-
-
self.bsky.create_gates(self.options, new_post.uri)
-
reply_ref = models.create_strong_ref(new_post)
else:
-
for attachment in attachments:
-
if attachment.get_type() != 'image':
-
LOGGER.info("Skipping post_id '%s'. Attachment type mismatch. got: '%s' expected: 'image'", post.get_id(), attachment.get_type())
-
return
-
-
images: list[bytes] = []
-
image_alts: list[str] = []
-
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
-
for attachment in attachments:
-
image_io = media_util.download_blob(attachment.get_url(), max_bytes=2_000_000)
-
if not image_io:
+
# if a single post is an image - everything else is an image
+
if attachments[0].get_type() == 'image':
+
images: list[bytes] = []
+
image_alts: list[str] = []
+
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
+
+
for attachment in attachments:
+
assert attachment.bytes
+
image_io = media_util.compress_image(attachment.bytes, quality=100)
+
metadata = attachment.create_meta(image_io)
+
+
if len(image_io) > 1_000_000:
+
LOGGER.info("Compressing %s...", attachment.get_url())
+
+
images.append(image_io)
+
image_alts.append(attachment.get_alt())
+
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
+
width=metadata.get_width(),
+
height=metadata.get_height()
+
))
+
+
new_post = self.bsky.send_images(
+
text=post_text[0],
+
images=images,
+
image_alts=image_alts,
+
image_aspect_ratios=image_aspect_ratios,
+
reply_to= models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
) if root_ref and reply_ref else None,
+
labels=labels
+
)
+
if not root_ref:
+
root_ref = models.create_strong_ref(new_post)
+
+
self.bsky.create_gates(self.options, new_post.uri)
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
+
else: # video is guarantedd to be one
+
video_data = attachments[0]
+
assert attachment.bytes
+
+
video_io = attachment.bytes
+
if not video_io:
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
return
-
LOGGER.info("Converting %s to .webp...", attachment.get_url())
-
image_io = media_util.compress_image(image_io, quality=100)
-
metadata = attachment.create_meta(image_io)
-
if len(image_io) > 1_000_000:
-
LOGGER.info("Compressing %s...", attachment.get_url())
-
-
images.append(image_io)
-
image_alts.append(attachment.get_alt())
-
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
+
metadata = video_data.create_meta(video_io)
+
if metadata.get_duration() > 180:
+
LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
+
return
+
+
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
width=metadata.get_width(),
height=metadata.get_height()
-
))
+
)
-
new_post = self.bsky.send_images(
-
text=post_text[0],
-
images=images,
-
image_alts=image_alts,
-
image_aspect_ratios=image_aspect_ratios,
-
reply_to= models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
) if root_ref and reply_ref else None,
-
labels=labels
-
)
-
if not root_ref:
-
root_ref = models.create_strong_ref(new_post)
+
new_post = self.bsky.send_video(
+
text=post_text[0],
+
video=video_io,
+
video_aspect_ratio=aspect_ratio,
+
video_alt=video_data.get_alt(),
+
reply_to= models.AppBskyFeedPost.ReplyRef(
+
parent=reply_ref,
+
root=root_ref
+
) if root_ref and reply_ref else None,
+
labels=labels
+
)
+
if not root_ref:
+
root_ref = models.create_strong_ref(new_post)
-
self.bsky.create_gates(self.options, new_post.uri)
-
reply_ref = models.create_strong_ref(new_post)
-
-
created_records.append(new_post)
-
for text in post_text[1:]:
-
new_post = self.bsky.send_post(text, reply_to=models.AppBskyFeedPost.ReplyRef(
-
parent=reply_ref,
-
root=root_ref
-
), labels=labels)
-
self.bsky.create_gates(self.options, new_post.uri)
-
-
reply_ref = models.create_strong_ref(new_post)
-
created_records.append(new_post)
-
else:
-
LOGGER.info("Skipping post_id '%s', too many attachments!", post.get_id())
-
return
-
-
if not created_records:
-
LOGGER.info("Skipped post_id '%s', for some reason...")
+
self.bsky.create_gates(self.options, new_post.uri)
+
reply_ref = models.create_strong_ref(new_post)
+
created_records.append(new_post)
db_post = database.find_post(self.db, post.get_id(), self.input.user_id, self.input.service)
assert db_post, "ghghghhhhh"
+1
cross.py
···
class MediaAttachment():
def __init__(self) -> None:
+
self.bytes: bytes | None = None # filled-in later
pass
def create_meta(self, bytes: bytes) -> MediaMeta:
+1 -1
mastodon.py
···
# type of attachment
def get_type(self) -> str | None:
-
return FORMATS[self.attachment.get('type', 'other')]
+
return FORMATS.get(self.attachment.get('type', 'other'), 'other')
# create file metadata from bytes or other
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
+17
media_util.py
···
import requests
import subprocess
import json
+
import re, urllib.parse, os
from util import LOGGER
+
+
FILENAME = re.compile(r'filename="?([^\";]*)"?')
+
+
def get_filename_from_url(url):
+
try:
+
response = requests.head(url, allow_redirects=True)
+
disposition = response.headers.get('Content-Disposition')
+
if disposition:
+
filename = FILENAME.findall(disposition)
+
if filename:
+
return filename[0]
+
except requests.RequestException:
+
pass
+
+
parsed_url = urllib.parse.urlparse(url)
+
return os.path.basename(parsed_url.path)
def probe_bytes(bytes: bytes) -> dict:
cmd = [