import json import os import re import subprocess import urllib.parse import magic import requests from util.util import LOGGER FILENAME = re.compile(r'filename="?([^\";]*)"?') MAGIC = magic.Magic(mime=True) class MediaInfo: def __init__(self, url: str, name: str, mime: str, alt: str, io: bytes) -> None: self.url = url self.name = name self.mime = mime self.alt = alt self.io = io def download_media(url: str, alt: str) -> MediaInfo | None: name = get_filename_from_url(url) io = download_blob(url, max_bytes=100_000_000) if not io: LOGGER.error("Failed to download media attachment! %s", url) return None mime = MAGIC.from_buffer(io) if not mime: mime = "application/octet-stream" return MediaInfo(url, name, mime, alt, io) def get_filename_from_url(url): try: response = requests.head(url, allow_redirects=True) disposition = response.headers.get("Content-Disposition") if disposition: filename = FILENAME.findall(disposition) if filename: return filename[0] except requests.RequestException: pass parsed_url = urllib.parse.urlparse(url) base_name = os.path.basename(parsed_url.path) # hardcoded fix to return the cid for pds if base_name == "com.atproto.sync.getBlob": qs = urllib.parse.parse_qs(parsed_url.query) if qs and qs.get("cid"): return qs["cid"][0] return base_name def probe_bytes(bytes: bytes) -> dict: cmd = [ "ffprobe", "-v", "error", "-show_format", "-show_streams", "-print_format", "json", "pipe:0", ] proc = subprocess.run( cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if proc.returncode != 0: raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}") return json.loads(proc.stdout) def convert_to_mp4(video_bytes: bytes) -> bytes: cmd = [ "ffmpeg", "-i", "pipe:0", "-c:v", "libx264", "-crf", "30", "-preset", "slow", "-c:a", "aac", "-b:a", "128k", "-movflags", "frag_keyframe+empty_moov+default_base_moof", "-f", "mp4", "pipe:1", ] proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out_bytes, err = proc.communicate(input=video_bytes) if proc.returncode != 0: raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") return out_bytes def compress_image(image_bytes: bytes, quality: int = 90): cmd = [ "ffmpeg", "-f", "image2pipe", "-i", "pipe:0", "-c:v", "webp", "-q:v", str(quality), "-f", "image2pipe", "pipe:1", ] proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out_bytes, err = proc.communicate(input=image_bytes) if proc.returncode != 0: raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") return out_bytes def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None: response = requests.get(url, stream=True, timeout=20) if response.status_code != 200: LOGGER.info("Failed to download %s! %s", url, response.text) return None downloaded_bytes = b"" current_size = 0 for chunk in response.iter_content(chunk_size=8192): if not chunk: continue current_size += len(chunk) if current_size > max_bytes: response.close() return None downloaded_bytes += chunk return downloaded_bytes def get_media_meta(bytes: bytes): probe = probe_bytes(bytes) streams = [s for s in probe["streams"] if s["codec_type"] == "video"] if not streams: raise ValueError("No video stream found") media = streams[0] return { "width": int(media["width"]), "height": int(media["height"]), "duration": float(media.get("duration", probe["format"].get("duration", -1))), }