from dataclasses import dataclass, field import json import re import os from typing import Any, cast import magic import subprocess import urllib.parse import requests FILENAME = re.compile(r'filename="?([^\";]*)"?') MAGIC = magic.Magic(mime=True) @dataclass class Blob: url: str mime: str io: bytes = field(repr=False) name: str | None = None alt: str | None = None @dataclass class MediaInfo: width: int height: int duration: float | None = None def mime_from_bytes(io: bytes) -> str: mime = MAGIC.from_buffer(io) if not mime: mime = "application/octet-stream" return mime def download_blob(url: str, alt: str | None = None, max_bytes: int = 100_000_000) -> Blob | None: name = get_filename_from_url(url) io = download_chuncked(url, max_bytes) if not io: return None return Blob(url, mime_from_bytes(io), io, name, alt) def download_chuncked(url: str, max_bytes: int = 100_000_000) -> bytes | None: response = requests.get(url, stream=True, timeout=20) if response.status_code != 200: return None downloaded_bytes = b"" current_size = 0 for chunk in response.iter_content(chunk_size=8192): if not chunk: continue current_size += len(chunk) if current_size > max_bytes: response.close() return None downloaded_bytes += chunk return downloaded_bytes def get_filename_from_url(url: str) -> str: try: response = requests.head(url, timeout=5, allow_redirects=True) disposition = response.headers.get("Content-Disposition") if disposition: filename = FILENAME.findall(disposition) if filename: return filename[0] except requests.RequestException: pass parsed_url = urllib.parse.urlparse(url) base_name = os.path.basename(parsed_url.path) # hardcoded fix to return the cid for pds blobs if base_name == "com.atproto.sync.getBlob": qs = urllib.parse.parse_qs(parsed_url.query) if qs and qs.get("cid"): return qs["cid"][0] return base_name def convert_to_mp4(video: Blob) -> Blob: cmd = [ "ffmpeg", "-i", "pipe:0", "-c:v", "libx264", "-crf", "30", "-preset", "slow", "-c:a", "aac", "-b:a", "128k", "-movflags", "frag_keyframe+empty_moov+default_base_moof", "-f", "mp4", "pipe:1", ] proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out_bytes, err = proc.communicate(input=video.io) if proc.returncode != 0: raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") return Blob(video.url, mime_from_bytes(out_bytes), out_bytes, video.name, video.alt) def compress_image(image: Blob, quality: int = 95) -> Blob: cmd = [ "ffmpeg", "-f", "image2pipe", "-i", "pipe:0", "-c:v", "webp", "-q:v", str(quality), "-f", "image2pipe", "pipe:1", ] proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) out_bytes, err = proc.communicate(input=image.io) if proc.returncode != 0: raise RuntimeError(f"ffmpeg compress failed: {err.decode()}") return Blob(image.url, "image/webp", out_bytes, image.name, image.alt) def probe_bytes(bytes: bytes) -> dict[str, Any]: cmd = [ "ffprobe", "-v", "error", "-show_format", "-show_streams", "-print_format", "json", "pipe:0", ] proc = subprocess.run( cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) if proc.returncode != 0: raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}") return json.loads(proc.stdout) def get_media_meta(bytes: bytes) -> MediaInfo: probe = probe_bytes(bytes) streams = [s for s in probe["streams"] if s["codec_type"] == "video"] if not streams: raise ValueError("No video stream found") media: dict[str, Any] = cast(dict[str, Any], streams[0]) return MediaInfo( width=media["width"], height=media["height"], duration=media.get("duration", probe["format"].get("duration")), )