social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at master 4.2 kB view raw
1import json 2import os 3import re 4import subprocess 5import urllib.parse 6 7import magic 8import requests 9 10from util.util import LOGGER 11 12FILENAME = re.compile(r'filename="?([^\";]*)"?') 13MAGIC = magic.Magic(mime=True) 14 15 16class MediaInfo: 17 def __init__(self, url: str, name: str, mime: str, alt: str, io: bytes) -> None: 18 self.url = url 19 self.name = name 20 self.mime = mime 21 self.alt = alt 22 self.io = io 23 24 25def download_media(url: str, alt: str) -> MediaInfo | None: 26 name = get_filename_from_url(url) 27 io = download_blob(url, max_bytes=100_000_000) 28 if not io: 29 LOGGER.error("Failed to download media attachment! %s", url) 30 return None 31 mime = MAGIC.from_buffer(io) 32 if not mime: 33 mime = "application/octet-stream" 34 return MediaInfo(url, name, mime, alt, io) 35 36 37def get_filename_from_url(url): 38 try: 39 response = requests.head(url, allow_redirects=True) 40 disposition = response.headers.get("Content-Disposition") 41 if disposition: 42 filename = FILENAME.findall(disposition) 43 if filename: 44 return filename[0] 45 except requests.RequestException: 46 pass 47 48 parsed_url = urllib.parse.urlparse(url) 49 base_name = os.path.basename(parsed_url.path) 50 51 # hardcoded fix to return the cid for pds 52 if base_name == "com.atproto.sync.getBlob": 53 qs = urllib.parse.parse_qs(parsed_url.query) 54 if qs and qs.get("cid"): 55 return qs["cid"][0] 56 57 return base_name 58 59 60def probe_bytes(bytes: bytes) -> dict: 61 cmd = [ 62 "ffprobe", 63 "-v", "error", 64 "-show_format", 65 "-show_streams", 66 "-print_format", "json", 67 "pipe:0", 68 ] 69 proc = subprocess.run( 70 cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE 71 ) 72 73 if proc.returncode != 0: 74 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}") 75 76 return json.loads(proc.stdout) 77 78 79def convert_to_mp4(video_bytes: bytes) -> bytes: 80 cmd = [ 81 "ffmpeg", 82 "-i", "pipe:0", 83 "-c:v", "libx264", 84 "-crf", "30", 85 "-preset", "slow", 86 "-c:a", "aac", 87 "-b:a", "128k", 88 "-movflags", "frag_keyframe+empty_moov+default_base_moof", 89 "-f", "mp4", 90 "pipe:1", 91 ] 92 93 proc = subprocess.Popen( 94 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE 95 ) 96 out_bytes, err = proc.communicate(input=video_bytes) 97 98 if proc.returncode != 0: 99 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") 100 101 return out_bytes 102 103 104def compress_image(image_bytes: bytes, quality: int = 90): 105 cmd = [ 106 "ffmpeg", 107 "-f", "image2pipe", 108 "-i", "pipe:0", 109 "-c:v", "webp", 110 "-q:v", str(quality), 111 "-f", "image2pipe", 112 "pipe:1", 113 ] 114 115 proc = subprocess.Popen( 116 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE 117 ) 118 out_bytes, err = proc.communicate(input=image_bytes) 119 120 if proc.returncode != 0: 121 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") 122 123 return out_bytes 124 125 126def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None: 127 response = requests.get(url, stream=True, timeout=20) 128 if response.status_code != 200: 129 LOGGER.info("Failed to download %s! %s", url, response.text) 130 return None 131 132 downloaded_bytes = b"" 133 current_size = 0 134 135 for chunk in response.iter_content(chunk_size=8192): 136 if not chunk: 137 continue 138 139 current_size += len(chunk) 140 if current_size > max_bytes: 141 response.close() 142 return None 143 144 downloaded_bytes += chunk 145 146 return downloaded_bytes 147 148 149def get_media_meta(bytes: bytes): 150 probe = probe_bytes(bytes) 151 streams = [s for s in probe["streams"] if s["codec_type"] == "video"] 152 if not streams: 153 raise ValueError("No video stream found") 154 155 media = streams[0] 156 return { 157 "width": int(media["width"]), 158 "height": int(media["height"]), 159 "duration": float(media.get("duration", probe["format"].get("duration", -1))), 160 }