social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import requests 2import subprocess 3import json 4import re, urllib.parse, os 5from util import LOGGER 6import magic 7 8FILENAME = re.compile(r'filename="?([^\";]*)"?') 9MAGIC = magic.Magic(mime=True) 10 11class MediaInfo(): 12 def __init__(self, url: str, name: str, mime: str, alt: str, io: bytes) -> None: 13 self.url = url 14 self.name = name 15 self.mime = mime 16 self.alt = alt 17 self.io = io 18 19def download_media(url: str, alt: str) -> MediaInfo | None: 20 name = get_filename_from_url(url) 21 io = download_blob(url, max_bytes=100_000_000) 22 if not io: 23 LOGGER.error("Failed to download media attachment! %s", url) 24 return None 25 mime = MAGIC.from_buffer(io) 26 if not mime: 27 mime = 'application/octet-stream' 28 return MediaInfo(url, name, mime, alt, io) 29 30def get_filename_from_url(url): 31 try: 32 response = requests.head(url, allow_redirects=True) 33 disposition = response.headers.get('Content-Disposition') 34 if disposition: 35 filename = FILENAME.findall(disposition) 36 if filename: 37 return filename[0] 38 except requests.RequestException: 39 pass 40 41 parsed_url = urllib.parse.urlparse(url) 42 base_name = os.path.basename(parsed_url.path) 43 44 # hardcoded fix to return the cid for pds 45 if base_name == 'com.atproto.sync.getBlob': 46 qs = urllib.parse.parse_qs(parsed_url.query) 47 if qs and qs.get('cid'): 48 return qs['cid'][0] 49 50 return base_name 51 52def probe_bytes(bytes: bytes) -> dict: 53 cmd = [ 54 'ffprobe', 55 '-v', 'error', 56 '-show_format', 57 '-show_streams', 58 '-print_format', 'json', 59 'pipe:0' 60 ] 61 proc = subprocess.run(cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 62 63 if proc.returncode != 0: 64 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}") 65 66 return json.loads(proc.stdout) 67 68def convert_to_mp4(video_bytes: bytes) -> bytes: 69 cmd = [ 70 'ffmpeg', 71 '-i', 'pipe:0', 72 '-c:v', 'libx264', 73 '-crf', '30', 74 '-preset', 'slow', 75 '-c:a', 'aac', 76 '-b:a', '128k', 77 '-movflags', 'frag_keyframe+empty_moov+default_base_moof', 78 '-f', 'mp4', 79 'pipe:1' 80 ] 81 82 proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 83 out_bytes, err = proc.communicate(input=video_bytes) 84 85 if proc.returncode != 0: 86 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") 87 88 return out_bytes 89 90def compress_image(image_bytes: bytes, quality: int = 90): 91 cmd = [ 92 'ffmpeg', 93 '-f', 'image2pipe', 94 '-i', 'pipe:0', 95 '-c:v', 'webp', 96 '-q:v', str(quality), 97 '-f', 'image2pipe', 98 'pipe:1' 99 ] 100 101 proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 102 out_bytes, err = proc.communicate(input=image_bytes) 103 104 if proc.returncode != 0: 105 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") 106 107 return out_bytes 108 109def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None: 110 response = requests.get(url, stream=True, timeout=20) 111 if response.status_code != 200: 112 LOGGER.info("Failed to download %s! %s", url, response.text) 113 return None 114 115 downloaded_bytes = b"" 116 current_size = 0 117 118 for chunk in response.iter_content(chunk_size=8192): 119 if not chunk: 120 continue 121 122 current_size += len(chunk) 123 if current_size > max_bytes: 124 response.close() 125 return None 126 127 downloaded_bytes += chunk 128 129 return downloaded_bytes 130 131 132def get_media_meta(bytes: bytes): 133 probe = probe_bytes(bytes) 134 streams = [s for s in probe['streams'] if s['codec_type'] == 'video'] 135 if not streams: 136 raise ValueError("No video stream found") 137 138 media = streams[0] 139 return { 140 'width': int(media['width']), 141 'height': int(media['height']), 142 'duration': float(media.get('duration', probe['format'].get('duration', -1))) 143 }