social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from dataclasses import dataclass, field
2
3import json
4import re
5import os
6from typing import Any, cast
7import magic
8import subprocess
9import urllib.parse
10
11import requests
12
13FILENAME = re.compile(r'filename="?([^\";]*)"?')
14MAGIC = magic.Magic(mime=True)
15
16
17@dataclass
18class Blob:
19 url: str
20 mime: str
21 io: bytes = field(repr=False)
22 name: str | None = None
23 alt: str | None = None
24
25
26@dataclass
27class MediaInfo:
28 width: int
29 height: int
30 duration: float | None = None
31
32
33def mime_from_bytes(io: bytes) -> str:
34 mime = MAGIC.from_buffer(io)
35 if not mime:
36 mime = "application/octet-stream"
37 return mime
38
39def download_blob(url: str, alt: str | None = None, max_bytes: int = 100_000_000) -> Blob | None:
40 name = get_filename_from_url(url)
41 io = download_chuncked(url, max_bytes)
42 if not io:
43 return None
44 return Blob(url, mime_from_bytes(io), io, name, alt)
45
46def download_chuncked(url: str, max_bytes: int = 100_000_000) -> bytes | None:
47 response = requests.get(url, stream=True, timeout=20)
48 if response.status_code != 200:
49 return None
50
51 downloaded_bytes = b""
52 current_size = 0
53
54 for chunk in response.iter_content(chunk_size=8192):
55 if not chunk:
56 continue
57
58 current_size += len(chunk)
59 if current_size > max_bytes:
60 response.close()
61 return None
62
63 downloaded_bytes += chunk
64
65 return downloaded_bytes
66
67
68def get_filename_from_url(url: str) -> str:
69 try:
70 response = requests.head(url, timeout=5, allow_redirects=True)
71 disposition = response.headers.get("Content-Disposition")
72 if disposition:
73 filename = FILENAME.findall(disposition)
74 if filename:
75 return filename[0]
76 except requests.RequestException:
77 pass
78
79 parsed_url = urllib.parse.urlparse(url)
80 base_name = os.path.basename(parsed_url.path)
81
82 # hardcoded fix to return the cid for pds blobs
83 if base_name == "com.atproto.sync.getBlob":
84 qs = urllib.parse.parse_qs(parsed_url.query)
85 if qs and qs.get("cid"):
86 return qs["cid"][0]
87
88 return base_name
89
90
91def convert_to_mp4(video: Blob) -> Blob:
92 cmd = [
93 "ffmpeg",
94 "-i", "pipe:0",
95 "-c:v", "libx264",
96 "-crf", "30",
97 "-preset", "slow",
98 "-c:a", "aac",
99 "-b:a", "128k",
100 "-movflags", "frag_keyframe+empty_moov+default_base_moof",
101 "-f", "mp4",
102 "pipe:1",
103 ]
104
105 proc = subprocess.Popen(
106 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
107 )
108 out_bytes, err = proc.communicate(input=video.io)
109
110 if proc.returncode != 0:
111 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
112
113 return Blob(video.url, mime_from_bytes(out_bytes), out_bytes, video.name, video.alt)
114
115
116def compress_image(image: Blob, quality: int = 95) -> Blob:
117 cmd = [
118 "ffmpeg",
119 "-f", "image2pipe",
120 "-i", "pipe:0",
121 "-c:v", "webp",
122 "-q:v", str(quality),
123 "-f", "image2pipe",
124 "pipe:1",
125 ]
126
127 proc = subprocess.Popen(
128 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
129 )
130 out_bytes, err = proc.communicate(input=image.io)
131
132 if proc.returncode != 0:
133 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
134
135 return Blob(image.url, "image/webp", out_bytes, image.name, image.alt)
136
137
138def probe_bytes(bytes: bytes) -> dict[str, Any]:
139 cmd = [
140 "ffprobe",
141 "-v",
142 "error",
143 "-show_format",
144 "-show_streams",
145 "-print_format",
146 "json",
147 "pipe:0",
148 ]
149 proc = subprocess.run(
150 cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE
151 )
152
153 if proc.returncode != 0:
154 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}")
155
156 return json.loads(proc.stdout)
157
158
159def get_media_meta(bytes: bytes) -> MediaInfo:
160 probe = probe_bytes(bytes)
161 streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
162 if not streams:
163 raise ValueError("No video stream found")
164
165 media: dict[str, Any] = cast(dict[str, Any], streams[0])
166 return MediaInfo(
167 width=media["width"],
168 height=media["height"],
169 duration=media.get("duration", probe["format"].get("duration")),
170 )