social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

better media handling(?)

zenfyr.dev 866a152a 7e3232c7

verified
+73 -102
bluesky.py
···
return tokens
class BlueskyPost(cross.Post):
-
def __init__(self, pds_url: str, did: str, post: dict) -> None:
+
def __init__(self, post: dict, attachments: list[media_util.MediaInfo]) -> None:
super().__init__()
self.post = post
self.tokens = tokenize_post(post)
···
self.cw = ''
if labels:
self.cw = ', '.join([str(label['val']).replace('-', ' ') for label in labels])
-
-
def get_blob_url(blob: str):
-
nonlocal pds_url, did
-
return f'{pds_url}/xrpc/com.atproto.sync.getBlob?did={did}&cid={blob}'
-
-
attachments: list[cross.MediaAttachment] = []
-
embed = self.post.get('embed', {})
-
if embed.get('$type') == 'app.bsky.embed.images':
-
model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
-
assert isinstance(model, models.AppBskyEmbedImages.Main)
-
-
for image in model.images:
-
attachments.append(BlueskyAttachment(
-
get_blob_url(image.image.cid.encode()),
-
'image', image.alt
-
))
-
elif embed.get('$type') == 'app.bsky.embed.video':
-
model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
-
assert isinstance(model, models.AppBskyEmbedVideo.Main)
-
-
attachments.append(BlueskyAttachment(
-
get_blob_url(model.video.cid.encode()),
-
'video', model.alt if model.alt else ''
-
))
self.attachments = attachments
def get_tokens(self) -> list[cross.Token]:
···
def is_sensitive(self) -> bool:
return self.post.get('labels', {}).get('values') or False
-
def get_attachments(self) -> list[cross.MediaAttachment]:
-
return self.attachments or []
-
-
class BlueskyAttachment(cross.MediaAttachment):
-
def __init__(self, url: str, type: str, alt: str) -> None:
-
super().__init__()
-
self.url = url
-
self.type = type
-
self.alt = alt
-
-
def get_url(self) -> str:
-
return self.url
-
-
def get_type(self) -> str | None:
-
return self.type
-
-
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
-
o_meta = media_util.get_media_meta(bytes)
-
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
-
-
def get_alt(self) -> str:
-
return self.alt
+
def get_attachments(self) -> list[media_util.MediaInfo]:
+
return self.attachments
class BlueskyInput(cross.Input):
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
···
return
LOGGER.info("Crossposting '%s'...", post_ref)
-
cross_post = BlueskyPost(self.pds, self.user_id, post)
+
+
def get_blob_url(blob: str):
+
return f'{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.user_id}&cid={blob}'
+
+
attachments: list[media_util.MediaInfo] = []
+
embed = post.get('embed', {})
+
if embed.get('$type') == 'app.bsky.embed.images':
+
model = get_model_or_create(embed, model=models.AppBskyEmbedImages.Main)
+
assert isinstance(model, models.AppBskyEmbedImages.Main)
+
+
for image in model.images:
+
url = get_blob_url(image.image.cid.encode())
+
LOGGER.info("Downloading %s...", url)
+
io = media_util.download_media(url, image.alt)
+
if not io:
+
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
+
return
+
attachments.append(io)
+
elif embed.get('$type') == 'app.bsky.embed.video':
+
model = get_model_or_create(embed, model=models.AppBskyEmbedVideo.Main)
+
assert isinstance(model, models.AppBskyEmbedVideo.Main)
+
url = get_blob_url(model.video.cid.encode())
+
LOGGER.info("Downloading %s...", url)
+
io = media_util.download_media(url, model.alt if model.alt else '')
+
if not io:
+
LOGGER.error("Skipping '%s'. Failed to download media!", post_ref)
+
return
+
attachments.append(io)
+
+
cross_post = BlueskyPost(post, attachments)
for output in outputs:
output.accept_post(cross_post)
return
···
raise Exception("Account app password not provided!")
did, pds = resolve_identity(
-
handle=util.as_envvar(settings.get('hanlde')),
+
handle=util.as_envvar(settings.get('handle')),
did=util.as_envvar(settings.get('did')),
pds=util.as_envvar(settings.get('pds'))
)
···
thread_tuple[3]
)
-
def _split_attachments(self, attachments: list[cross.MediaAttachment]):
-
sup_media: list[cross.MediaAttachment] = []
-
unsup_media: list[cross.MediaAttachment] = []
+
def _split_attachments(self, attachments: list[media_util.MediaInfo]):
+
sup_media: list[media_util.MediaInfo] = []
+
unsup_media: list[media_util.MediaInfo] = []
-
for attachment in attachments:
-
attachment_type = attachment.get_type()
-
if not attachment_type:
-
continue
-
-
if attachment_type in {'video', 'image'}: # TODO convert gifs to videos
-
sup_media.append(attachment)
+
for a in attachments:
+
if a.mime.startswith('image/') or a.mime.startswith('video/'): # TODO convert gifs to videos
+
sup_media.append(a)
else:
-
unsup_media.append(attachment)
+
unsup_media.append(a)
return (sup_media, unsup_media)
def _split_media_per_post(
self,
tokens: list[client_utils.TextBuilder],
-
media: list[cross.MediaAttachment]):
+
media: list[media_util.MediaInfo]):
posts: list[dict] = [{"tokens": tokens, "attachments": []} for tokens in tokens]
available_indices: list[int] = list(range(len(posts)))
···
return new_idx
for att in media:
-
if att.get_type() == 'video':
+
if att.mime.startswith('video/'):
current_image_post_idx = None
idx = pop_next_empty_index()
posts[idx]["attachments"].append(att)
-
elif att.get_type() == 'image':
+
elif att.mime.startswith('image/'):
if (
current_image_post_idx is not None
and len(posts[current_image_post_idx]["attachments"]) < 4
···
posts[idx]["attachments"].append(att)
current_image_post_idx = idx
-
result: list[tuple[client_utils.TextBuilder, list[cross.MediaAttachment]]] = []
+
result: list[tuple[client_utils.TextBuilder, list[media_util.MediaInfo]]] = []
for p in posts:
result.append((p["tokens"], p["attachments"]))
return result
···
tokens.append(cross.TextToken('\n'))
for i, attachment in enumerate(unsup_media):
tokens.append(cross.LinkToken(
-
attachment.get_url(),
-
f"[{media_util.get_filename_from_url(attachment.get_url())}]"
+
attachment.url,
+
f"[{media_util.get_filename_from_url(attachment.url)}]"
))
tokens.append(cross.TextToken(' '))
-
split_tokens: list[list[cross.Token]] = util.split_tokens(tokens, 300)
+
split_tokens: list[list[cross.Token]] = cross.split_tokens(tokens, 300)
post_text: list[client_utils.TextBuilder] = []
# convert tokens into rich text. skip post if contains unsupported tokens
···
if not post_text:
post_text = [client_utils.TextBuilder().text('')]
-
# download media first. increased RAM usage, but more reliable
for m in sup_media:
-
if not m.bytes:
-
if m.get_type() == 'image':
-
image_bytes = media_util.download_blob(m.get_url(), max_bytes=2_000_000)
-
if not image_bytes:
-
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
-
return
-
m.bytes = image_bytes
-
elif m.get_type() == 'video':
-
video_bytes = media_util.download_blob(m.get_url(), max_bytes=100_000_000)
-
if not video_bytes:
-
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
-
return
-
m.bytes = video_bytes
+
if m.mime.startswith('image/'):
+
if len(m.io) > 2_000_000:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large.", post.get_id())
+
return
+
+
if m.mime.startswith('video/'):
+
if len(m.io) > 100_000_000:
+
LOGGER.error("Skipping post_id '%s', failed to download attachment! File too large?", post.get_id())
+
return
created_records: list[models.AppBskyFeedPost.CreateRecordResponse] = []
baked_media = self._split_media_per_post(post_text, sup_media)
···
created_records.append(new_post)
else:
# if a single post is an image - everything else is an image
-
if attachments[0].get_type() == 'image':
+
if attachments[0].mime.startswith('image/'):
images: list[bytes] = []
image_alts: list[str] = []
image_aspect_ratios: list[models.AppBskyEmbedDefs.AspectRatio] = []
for attachment in attachments:
-
assert attachment.bytes
-
image_io = media_util.compress_image(attachment.bytes, quality=100)
-
metadata = attachment.create_meta(image_io)
+
image_io = media_util.compress_image(attachment.io, quality=100)
+
metadata = media_util.get_media_meta(image_io)
if len(image_io) > 1_000_000:
-
LOGGER.info("Compressing %s...", attachment.get_url())
+
LOGGER.info("Compressing %s...", attachment.name)
+
image_io = media_util.compress_image(image_io)
images.append(image_io)
-
image_alts.append(attachment.get_alt())
+
image_alts.append(attachment.alt)
image_aspect_ratios.append(models.AppBskyEmbedDefs.AspectRatio(
-
width=metadata.get_width(),
-
height=metadata.get_height()
+
width=metadata['width'],
+
height=metadata['height']
))
new_post = self.bsky.send_images(
···
reply_ref = models.create_strong_ref(new_post)
created_records.append(new_post)
else: # video is guarantedd to be one
-
video_data = attachments[0]
-
assert video_data.bytes
-
video_io = video_data.bytes
-
-
metadata = video_data.create_meta(video_io)
-
if metadata.get_duration() > 180:
+
metadata = media_util.get_media_meta(attachments[0].io)
+
if metadata['duration'] > 180:
LOGGER.info("Skipping post_id '%s', video attachment too long!", post.get_id())
return
-
probe = media_util.probe_bytes(video_io)
-
format_name = probe['format']['format_name']
-
if 'mp4' not in format_name.split(','):
-
LOGGER.error("Converting %s to mp4...", video_data.get_url())
+
video_io = attachments[0].io
+
if attachments[0].mime != 'video/mp4':
+
LOGGER.error("Converting %s to mp4...", attachments[0].name)
video_io = media_util.convert_to_mp4(video_io)
aspect_ratio = models.AppBskyEmbedDefs.AspectRatio(
-
width=metadata.get_width(),
-
height=metadata.get_height()
+
width=metadata['width'],
+
height=metadata['height']
)
new_post = self.bsky.send_video(
text=post_text[0],
video=video_io,
video_aspect_ratio=aspect_ratio,
-
video_alt=video_data.get_alt(),
+
video_alt=attachments[0].alt,
reply_to= models.AppBskyFeedPost.ReplyRef(
parent=reply_ref,
root=root_ref
+125 -19
cross.py
···
from typing import Callable, Any
from database import DataBaseWorker
from datetime import datetime, timezone
+
from media_util import MediaInfo, get_media_meta
+
import util
+
import re
+
+
ALTERNATE = re.compile(r'\S+|\s+')
# generic token
class Token():
···
def get_duration(self) -> float:
return self.duration
-
-
class MediaAttachment():
-
def __init__(self) -> None:
-
self.bytes: bytes | None = None # filled-in later
-
pass
-
-
def create_meta(self, bytes: bytes) -> MediaMeta:
-
return MediaMeta(-1, -1, -1)
-
-
def get_url(self) -> str:
-
return ''
-
-
def get_type(self) -> str | None:
-
return None
-
-
def get_alt(self) -> str:
-
return ''
class Post():
def __init__(self) -> None:
···
def get_post_date_iso(self) -> str:
return datetime.now(timezone.utc).isoformat()
-
def get_attachments(self) -> list[MediaAttachment]:
+
def get_attachments(self) -> list[MediaInfo]:
return []
def get_id(self) -> str:
···
pass
def delete_post(self, identifier: str):
-
pass
+
pass
+
+
def split_tokens(tokens: list[Token], max_chars: int, max_link_len: int = 35) -> list[list[Token]]:
+
def start_new_block():
+
nonlocal current_block, blocks, current_length
+
if current_block:
+
blocks.append(current_block)
+
current_block = []
+
current_length = 0
+
+
def append_text_to_block(text_segment):
+
nonlocal current_block
+
# if the last element in the current block is also text, just append to it
+
if current_block and isinstance(current_block[-1], TextToken):
+
current_block[-1].text += text_segment
+
else:
+
current_block.append(TextToken(text_segment))
+
+
blocks: list[list[Token]] = []
+
current_block: list[Token] = []
+
current_length: int = 0
+
+
for token in tokens:
+
if isinstance(token, TextToken):
+
# split content into alternating “words” (\S+) and “whitespace” (\s+).
+
# this ensures every space/newline is treated as its own segment.
+
segments: list[str] = ALTERNATE.findall(token.text)
+
+
for seg in segments:
+
if seg.isspace():
+
# whitespace segment: we count it, and if it doesn't fully fit,
+
# split the whitespace across blocks to preserve exact spacing.
+
seg_len: int = len(seg)
+
while seg_len > 0:
+
space_left = max_chars - current_length
+
if space_left == 0:
+
start_new_block()
+
continue
+
+
take = min(space_left, seg_len)
+
part = seg[:take]
+
append_text_to_block(part)
+
+
current_length += len(part)
+
seg = seg[take:]
+
seg_len -= take
+
+
if current_length == max_chars:
+
start_new_block()
+
+
else:
+
# seg is a “word” (no whitespace inside).
+
word: str = seg
+
wlen: int = len(word)
+
+
# if the word itself is longer than n, we must split it with hyphens.
+
if wlen > max_chars:
+
# first, if we're in the middle of a block, close it & start fresh.
+
if current_length > 0:
+
start_new_block()
+
+
remaining = word
+
# carve off (n-1)-sized chunks + “-” so each chunk is n chars.
+
while len(remaining) > (max_chars - 1):
+
chunk = remaining[: max_chars - 1] + '-'
+
append_text_to_block(chunk)
+
# that chunk fills the current block
+
start_new_block()
+
remaining = remaining[max_chars - 1 :]
+
+
# now whatever remains is ≤ n characters
+
if remaining:
+
append_text_to_block(remaining)
+
current_length = len(remaining)
+
+
else:
+
# word fits fully within a block (≤ n).
+
if current_length + wlen <= max_chars:
+
append_text_to_block(word)
+
current_length += wlen
+
else:
+
# not enough space in current block → start a new one
+
start_new_block()
+
append_text_to_block(word)
+
current_length = wlen
+
+
elif isinstance(token, LinkToken):
+
link_len = len(token.label)
+
if util.canonical_label(token.label, token.href):
+
link_len = min(link_len, max_link_len)
+
+
if current_length + link_len <= max_chars:
+
current_block.append(token)
+
current_length += link_len
+
else:
+
start_new_block()
+
current_block.append(token)
+
current_length = link_len
+
+
elif isinstance(token, TagToken):
+
# we treat a hashtag like “#tagname” for counting.
+
hashtag_len = 1 + len(token.tag)
+
if current_length + hashtag_len <= max_chars:
+
current_block.append(token)
+
current_length += hashtag_len
+
else:
+
start_new_block()
+
current_block.append(token)
+
current_length = hashtag_len
+
+
else:
+
# if you happen to have other types, just append them without affecting length.
+
current_block.append(token)
+
+
# append any remaining tokens as the final block
+
if current_block:
+
blocks.append(current_block)
+
+
return blocks
+1 -1
main.py
···
input = INPUTS[input_settings['type']](input_settings, db_worker)
if not outputs_settings:
-
LOGGER.warning("No outputs specified! Check your config!")
+
LOGGER.warning("No outputs specified! Check the config!")
outputs: list[cross.Output] = []
for output_settings in outputs_settings:
+45 -88
mastodon.py
···
from database import DataBaseWorker
from typing import Callable, Any
import asyncio, time
-
import magic
from bs4 import BeautifulSoup, Tag
from bs4.element import NavigableString
···
'audio': 'audio',
'unknown': 'other'
}
+
POSSIBLE_MIMES = [
+
'audio/ogg',
+
'audio/mp3',
+
'image/webp',
+
'image/jpeg',
+
'image/png',
+
'video/mp4',
+
'video/quicktime',
+
'video/webm'
+
]
def tokenize_post(status: dict) -> list[cross.Token]:
soup = BeautifulSoup(status['content'], "html.parser")
···
return tokens
class MastodonPost(cross.Post):
-
def __init__(self, status: dict) -> None:
+
def __init__(self, status: dict, media_attachments: list[media_util.MediaInfo]) -> None:
super().__init__()
self.status = status
-
media_attachments: list[cross.MediaAttachment] = []
-
-
for attachment in status.get('media_attachments', []):
-
media_attachments.append(MastodonAttachment(attachment))
-
self.media_attachments = media_attachments
-
self.tokens = tokenize_post(status)
def get_tokens(self) -> list[cross.Token]:
···
def is_sensitive(self) -> bool:
return self.status.get('sensitive', False)
-
def get_attachments(self) -> list[cross.MediaAttachment]:
+
def get_attachments(self) -> list[media_util.MediaInfo]:
return self.media_attachments
-
class MastodonAttachment(cross.MediaAttachment):
-
def __init__(self, attachment: dict) -> None:
-
super().__init__()
-
self.attachment = attachment
-
-
if attachment.get('type') == 'video' or attachment.get('type') == 'image':
-
if attachment.get('meta') and attachment.get('meta', {}).get('original'):
-
def from_status(bytes: bytes) -> cross.MediaMeta:
-
o_meta = attachment.get('meta', {}).get('original')
-
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
-
self.meta_generator = from_status
-
else:
-
def from_bytes(bytes: bytes) -> cross.MediaMeta:
-
o_meta = media_util.get_media_meta(bytes)
-
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
-
self.meta_generator = from_bytes
-
-
# URL to download the attachment from
-
def get_url(self) -> str:
-
return self.attachment.get('url', '')
-
-
# type of attachment
-
def get_type(self) -> str | None:
-
return FORMATS.get(self.attachment.get('type', 'other'), 'other')
-
-
# create file metadata from bytes or other
-
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
-
if self.meta_generator:
-
return self.meta_generator(bytes)
-
return cross.MediaMeta(-1, -1, -1)
-
-
# get media description
-
def get_alt(self) -> str:
-
return self.attachment.get('description') or ''
-
class MastodonInput(cross.Input):
def __init__(self, settings: dict, db: DataBaseWorker) -> None:
self.options = settings.get('options', {})
···
return
LOGGER.info("Crossposting '%s'...", status['id'])
-
cross_post = MastodonPost(status)
+
+
media_attachments: list[media_util.MediaInfo] = []
+
for attachment in status.get('media_attachments', []):
+
LOGGER.info("Downloading %s...", attachment['url'])
+
info = media_util.download_media(attachment['url'], attachment.get('description') or '')
+
if not info:
+
LOGGER.error("Skipping '%s'. Failed to download media!", status['id'])
+
return
+
media_attachments.append(info)
+
+
cross_post = MastodonPost(status, media_attachments)
for output in outputs:
output.accept_post(cross_post)
···
media_config: dict = configuration.get('media_attachments', {})
self.image_size_limit: int = media_config.get('image_size_limit', 16777216)
self.video_size_limit: int = media_config.get('video_size_limit', 103809024)
-
self.supported_mime_types: list[str] = media_config.get('supported_mime_types', [
-
'audio/ogg',
-
'image/jpeg',
-
'image/png',
-
'video/mp4'
-
])
+
self.supported_mime_types: list[str] = media_config.get('supported_mime_types', POSSIBLE_MIMES)
-
# *oma max post chars
+
# *oma: max post chars
max_toot_chars = instance_info.get('max_toot_chars')
if max_toot_chars:
self.max_characters: int = max_toot_chars
-
# *oma max upload limit
+
# *oma: max upload limit
upload_limit = instance_info.get('upload_limit')
if upload_limit:
self.image_size_limit: int = upload_limit
self.video_size_limit: int = upload_limit
+
# *oma ext: supported text types
self.text_format = 'text/plain'
pleroma = instance_info.get('pleroma')
if pleroma:
···
elif 'text/markdown' in post_formats:
self.text_format = 'text/markdown'
-
def upload_media(self, attachments: list[cross.MediaAttachment]) -> list[str] | None:
-
prepare: list[tuple[str, str, bytes]] = []
-
-
for attachment in attachments:
-
alt = attachment.get_alt()
-
mbytes: bytes | None
+
def upload_media(self, attachments: list[media_util.MediaInfo]) -> list[str] | None:
+
for a in attachments:
+
if a.mime.startswith('image/') and len(a.io) > self.image_size_limit:
+
return None
-
if attachment.get_type() == 'image':
-
mbytes = media_util.download_blob(attachment.get_url(), self.image_size_limit)
-
elif attachment.get_type() in {'video', 'gif'}:
-
mbytes = media_util.download_blob(attachment.get_url(), self.video_size_limit)
-
else:
-
mbytes = media_util.download_blob(attachment.get_url(), 7_000_000)
-
-
if not mbytes:
+
if a.mime.startswith('video/') and len(a.io) > self.video_size_limit:
return None
-
filename = media_util.get_filename_from_url(attachment.get_url())
-
LOGGER.info("Downloaded %s", filename)
-
prepare.append((filename, alt, mbytes))
+
if not a.mime.startswith('image/') and not a.mime.startswith('video/'):
+
if len(a.io) > 7_000_000:
+
return None
uploads: list[dict] = []
-
-
for name, desc, bbytes in prepare:
-
mime_type = magic.Magic(mime=True).from_buffer(bbytes)
-
if not mime_type:
-
mime_type = 'application/octet-stream'
-
-
files = {
-
'file': (name, bbytes, mime_type)
-
}
+
for a in attachments:
data = {}
-
if desc:
-
data['description'] = desc
+
if a.alt:
+
data['description'] = a.alt
req = requests.post(f"{self.service}/api/v2/media", headers= {
'Authorization': f'Bearer {self.token}'
-
}, files=files, data=data)
+
}, files={'file': (a.name, a.io, a.mime)}, data=data)
if req.status_code == 200:
-
LOGGER.info("Uploaded %s! (%s)", name, req.json()['id'])
+
LOGGER.info("Uploaded %s! (%s)", a.name, req.json()['id'])
uploads.append({
'done': True,
'id': req.json()['id']
})
elif req.status_code == 202:
-
LOGGER.info("Waiting for %s to process!", name)
+
LOGGER.info("Waiting for %s to process!", a.name)
uploads.append({
'done': False,
'id': req.json()['id']
})
else:
-
LOGGER.error("Failes to download %s! %s", name, req.text)
+
LOGGER.error("Failed to upload %s! %s", a.name, req.text)
req.raise_for_status()
while any([not val['done'] for val in uploads]):
···
return p_text
-
def split_tokens_media(self, tokens: list[cross.Token], media: list[cross.MediaAttachment]):
-
split_tokens = util.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
+
def split_tokens_media(self, tokens: list[cross.Token], media: list[media_util.MediaInfo]):
+
split_tokens = cross.split_tokens(tokens, self.max_characters, self.characters_reserved_per_url)
post_text: list[str] = []
for block in split_tokens:
···
posts[idx]["attachments"].append(att)
current_image_post_idx = idx
-
result: list[tuple[str, list[cross.MediaAttachment]]] = []
+
result: list[tuple[str, list[media_util.MediaInfo]]] = []
for p in posts:
result.append((p['text'], p["attachments"]))
+30 -1
media_util.py
···
import json
import re, urllib.parse, os
from util import LOGGER
+
import magic
FILENAME = re.compile(r'filename="?([^\";]*)"?')
+
MAGIC = magic.Magic(mime=True)
+
+
class MediaInfo():
+
def __init__(self, url: str, name: str, mime: str, alt: str, io: bytes) -> None:
+
self.url = url
+
self.name = name
+
self.mime = mime
+
self.alt = alt
+
self.io = io
+
+
def download_media(url: str, alt: str) -> MediaInfo | None:
+
name = get_filename_from_url(url)
+
io = download_blob(url, max_bytes=100_000_000)
+
if not io:
+
LOGGER.error("Failed to download media attachment! %s", url)
+
return None
+
mime = MAGIC.from_buffer(io)
+
if not mime:
+
mime = 'application/octet-stream'
+
return MediaInfo(url, name, mime, alt, io)
def get_filename_from_url(url):
try:
···
pass
parsed_url = urllib.parse.urlparse(url)
-
return os.path.basename(parsed_url.path)
+
base_name = os.path.basename(parsed_url.path)
+
+
# hardcoded fix to return the cid for pds
+
if base_name == 'com.atproto.sync.getBlob':
+
qs = urllib.parse.parse_qs(parsed_url.query)
+
if qs and qs.get('cid'):
+
return qs['cid'][0]
+
+
return base_name
def probe_bytes(bytes: bytes) -> dict:
cmd = [
+15 -35
misskey.py
···
return tokens
class MisskeyPost(cross.Post):
-
def __init__(self, note: dict) -> None:
+
def __init__(self, note: dict, files: list[media_util.MediaInfo]) -> None:
super().__init__()
self.note = note
-
-
media_attachments: list[cross.MediaAttachment] = []
-
-
sensitive = False
-
for attachment in note.get('files', []):
-
media_attachments.append(MisskeyAttachment(attachment))
-
sensitive |= attachment.get('isSensitive', False)
-
-
self.sensitive = sensitive
-
self.media_attachments = media_attachments
-
+
self.sensitive = any([a.get('isSensitive', False) for a in note.get('files', [])])
+
self.media_attachments = files
self.tokens = tokenize_note(self.note)
def get_tokens(self) -> list[cross.Token]:
···
date = self.note.get('createdAt')
return date or super().get_post_date_iso()
-
def get_attachments(self) -> list[cross.MediaAttachment]:
+
def get_attachments(self) -> list[media_util.MediaInfo]:
return self.media_attachments
def get_id(self) -> str:
···
def is_sensitive(self) -> bool:
return self.sensitive
-
class MisskeyAttachment(cross.MediaAttachment):
-
def __init__(self, attachment: dict) -> None:
-
super().__init__()
-
self.attachment = attachment
-
-
def create_meta(self, bytes: bytes) -> cross.MediaMeta:
-
# it's nort worth it
-
if get_image_common(self.attachment['type']):
-
o_meta = media_util.get_media_meta(bytes)
-
return cross.MediaMeta(o_meta['width'], o_meta['height'], o_meta.get('duration', -1))
-
return cross.MediaMeta(-1, -1, -1)
-
-
def get_url(self) -> str:
-
return self.attachment.get('url', '')
-
-
def get_type(self) -> str | None:
-
return get_image_common(self.attachment['type'])
-
-
def get_alt(self) -> str:
-
return self.attachment.get('comment') or ''
-
class MisskeyInput(cross.Input):
def __init__(self, settings: dict, db: cross.DataBaseWorker) -> None:
self.options = settings.get('options', {})
···
return
LOGGER.info("Crossposting '%s'...", note['id'])
-
cross_post = MisskeyPost(note)
+
+
media_attachments: list[media_util.MediaInfo] = []
+
for attachment in note.get('files', []):
+
LOGGER.info("Downloading %s...", attachment['url'])
+
info = media_util.download_media(attachment['url'], attachment.get('comment') or '')
+
if not info:
+
LOGGER.error("Skipping '%s'. Failed to download media!", note['id'])
+
return
+
media_attachments.append(info)
+
+
cross_post = MisskeyPost(note, media_attachments)
for output in outputs:
output.accept_post(cross_post)
-122
util.py
···
-
import re
-
import cross
import logging, sys, os
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
LOGGER = logging.getLogger("XPost")
-
-
ALTERNATE = re.compile(r'\S+|\s+')
def canonical_label(label: str | None, href: str):
if not label or label == href:
···
return True
return False
-
-
def split_tokens(tokens: list[cross.Token], max_chars: int, max_link_len: int = 35) -> list[list[cross.Token]]:
-
def start_new_block():
-
nonlocal current_block, blocks, current_length
-
if current_block:
-
blocks.append(current_block)
-
current_block = []
-
current_length = 0
-
-
def append_text_to_block(text_segment):
-
nonlocal current_block
-
# if the last element in the current block is also text, just append to it
-
if current_block and isinstance(current_block[-1], cross.TextToken):
-
current_block[-1].text += text_segment
-
else:
-
current_block.append(cross.TextToken(text_segment))
-
-
blocks: list[list[cross.Token]] = []
-
current_block: list[cross.Token] = []
-
current_length: int = 0
-
-
for token in tokens:
-
if isinstance(token, cross.TextToken):
-
# split content into alternating “words” (\S+) and “whitespace” (\s+).
-
# this ensures every space/newline is treated as its own segment.
-
segments: list[str] = ALTERNATE.findall(token.text)
-
-
for seg in segments:
-
if seg.isspace():
-
# whitespace segment: we count it, and if it doesn't fully fit,
-
# split the whitespace across blocks to preserve exact spacing.
-
seg_len: int = len(seg)
-
while seg_len > 0:
-
space_left = max_chars - current_length
-
if space_left == 0:
-
start_new_block()
-
continue
-
-
take = min(space_left, seg_len)
-
part = seg[:take]
-
append_text_to_block(part)
-
-
current_length += len(part)
-
seg = seg[take:]
-
seg_len -= take
-
-
if current_length == max_chars:
-
start_new_block()
-
-
else:
-
# seg is a “word” (no whitespace inside).
-
word: str = seg
-
wlen: int = len(word)
-
-
# if the word itself is longer than n, we must split it with hyphens.
-
if wlen > max_chars:
-
# first, if we're in the middle of a block, close it & start fresh.
-
if current_length > 0:
-
start_new_block()
-
-
remaining = word
-
# carve off (n-1)-sized chunks + “-” so each chunk is n chars.
-
while len(remaining) > (max_chars - 1):
-
chunk = remaining[: max_chars - 1] + '-'
-
append_text_to_block(chunk)
-
# that chunk fills the current block
-
start_new_block()
-
remaining = remaining[max_chars - 1 :]
-
-
# now whatever remains is ≤ n characters
-
if remaining:
-
append_text_to_block(remaining)
-
current_length = len(remaining)
-
-
else:
-
# word fits fully within a block (≤ n).
-
if current_length + wlen <= max_chars:
-
append_text_to_block(word)
-
current_length += wlen
-
else:
-
# not enough space in current block → start a new one
-
start_new_block()
-
append_text_to_block(word)
-
current_length = wlen
-
-
elif isinstance(token, cross.LinkToken):
-
link_len = len(token.label)
-
if canonical_label(token.label, token.href):
-
link_len = min(link_len, max_link_len)
-
-
if current_length + link_len <= max_chars:
-
current_block.append(token)
-
current_length += link_len
-
else:
-
start_new_block()
-
current_block.append(token)
-
current_length = link_len
-
-
elif isinstance(token, cross.TagToken):
-
# we treat a hashtag like “#tagname” for counting.
-
hashtag_len = 1 + len(token.tag)
-
if current_length + hashtag_len <= max_chars:
-
current_block.append(token)
-
current_length += hashtag_len
-
else:
-
start_new_block()
-
current_block.append(token)
-
current_length = hashtag_len
-
-
else:
-
# if you happen to have other types, just append them without affecting length.
-
current_block.append(token)
-
-
# append any remaining tokens as the final block
-
if current_block:
-
blocks.append(current_block)
-
-
return blocks
def safe_get(obj: dict, key: str, default):
val = obj.get(key, default)