social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 4.4 kB view raw
1from dataclasses import dataclass, field 2 3import json 4import re 5import os 6from typing import Any, cast 7import magic 8import subprocess 9import urllib.parse 10 11import requests 12 13FILENAME = re.compile(r'filename="?([^\";]*)"?') 14MAGIC = magic.Magic(mime=True) 15 16 17@dataclass 18class Blob: 19 url: str 20 mime: str 21 io: bytes = field(repr=False) 22 name: str | None = None 23 alt: str | None = None 24 25 26@dataclass 27class MediaInfo: 28 width: int 29 height: int 30 duration: float | None = None 31 32 33def mime_from_bytes(io: bytes) -> str: 34 mime = MAGIC.from_buffer(io) 35 if not mime: 36 mime = "application/octet-stream" 37 return mime 38 39def download_blob(url: str, alt: str | None = None, max_bytes: int = 100_000_000) -> Blob | None: 40 name = get_filename_from_url(url) 41 io = download_chuncked(url, max_bytes) 42 if not io: 43 return None 44 return Blob(url, mime_from_bytes(io), io, name, alt) 45 46def download_chuncked(url: str, max_bytes: int = 100_000_000) -> bytes | None: 47 response = requests.get(url, stream=True, timeout=20) 48 if response.status_code != 200: 49 return None 50 51 downloaded_bytes = b"" 52 current_size = 0 53 54 for chunk in response.iter_content(chunk_size=8192): 55 if not chunk: 56 continue 57 58 current_size += len(chunk) 59 if current_size > max_bytes: 60 response.close() 61 return None 62 63 downloaded_bytes += chunk 64 65 return downloaded_bytes 66 67 68def get_filename_from_url(url: str) -> str: 69 try: 70 response = requests.head(url, timeout=5, allow_redirects=True) 71 disposition = response.headers.get("Content-Disposition") 72 if disposition: 73 filename = FILENAME.findall(disposition) 74 if filename: 75 return filename[0] 76 except requests.RequestException: 77 pass 78 79 parsed_url = urllib.parse.urlparse(url) 80 base_name = os.path.basename(parsed_url.path) 81 82 # hardcoded fix to return the cid for pds blobs 83 if base_name == "com.atproto.sync.getBlob": 84 qs = urllib.parse.parse_qs(parsed_url.query) 85 if qs and qs.get("cid"): 86 return qs["cid"][0] 87 88 return base_name 89 90 91def convert_to_mp4(video: Blob) -> Blob: 92 cmd = [ 93 "ffmpeg", 94 "-i", "pipe:0", 95 "-c:v", "libx264", 96 "-crf", "30", 97 "-preset", "slow", 98 "-c:a", "aac", 99 "-b:a", "128k", 100 "-movflags", "frag_keyframe+empty_moov+default_base_moof", 101 "-f", "mp4", 102 "pipe:1", 103 ] 104 105 proc = subprocess.Popen( 106 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE 107 ) 108 out_bytes, err = proc.communicate(input=video.io) 109 110 if proc.returncode != 0: 111 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") 112 113 return Blob(video.url, mime_from_bytes(out_bytes), out_bytes, video.name, video.alt) 114 115 116def compress_image(image: Blob, quality: int = 95) -> Blob: 117 cmd = [ 118 "ffmpeg", 119 "-f", "image2pipe", 120 "-i", "pipe:0", 121 "-c:v", "webp", 122 "-q:v", str(quality), 123 "-f", "image2pipe", 124 "pipe:1", 125 ] 126 127 proc = subprocess.Popen( 128 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE 129 ) 130 out_bytes, err = proc.communicate(input=image.io) 131 132 if proc.returncode != 0: 133 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") 134 135 return Blob(image.url, "image/webp", out_bytes, image.name, image.alt) 136 137 138def probe_bytes(bytes: bytes) -> dict[str, Any]: 139 cmd = [ 140 "ffprobe", 141 "-v", 142 "error", 143 "-show_format", 144 "-show_streams", 145 "-print_format", 146 "json", 147 "pipe:0", 148 ] 149 proc = subprocess.run( 150 cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE 151 ) 152 153 if proc.returncode != 0: 154 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}") 155 156 return json.loads(proc.stdout) 157 158 159def get_media_meta(bytes: bytes) -> MediaInfo: 160 probe = probe_bytes(bytes) 161 streams = [s for s in probe["streams"] if s["codec_type"] == "video"] 162 if not streams: 163 raise ValueError("No video stream found") 164 165 media: dict[str, Any] = cast(dict[str, Any], streams[0]) 166 return MediaInfo( 167 width=media["width"], 168 height=media["height"], 169 duration=media.get("duration", probe["format"].get("duration")), 170 )