social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import json
2import os
3import re
4import subprocess
5import urllib.parse
6
7import magic
8import requests
9
10from util.util import LOGGER
11
12FILENAME = re.compile(r'filename="?([^\";]*)"?')
13MAGIC = magic.Magic(mime=True)
14
15
16class MediaInfo:
17 def __init__(self, url: str, name: str, mime: str, alt: str, io: bytes) -> None:
18 self.url = url
19 self.name = name
20 self.mime = mime
21 self.alt = alt
22 self.io = io
23
24
25def download_media(url: str, alt: str) -> MediaInfo | None:
26 name = get_filename_from_url(url)
27 io = download_blob(url, max_bytes=100_000_000)
28 if not io:
29 LOGGER.error("Failed to download media attachment! %s", url)
30 return None
31 mime = MAGIC.from_buffer(io)
32 if not mime:
33 mime = "application/octet-stream"
34 return MediaInfo(url, name, mime, alt, io)
35
36
37def get_filename_from_url(url):
38 try:
39 response = requests.head(url, allow_redirects=True)
40 disposition = response.headers.get("Content-Disposition")
41 if disposition:
42 filename = FILENAME.findall(disposition)
43 if filename:
44 return filename[0]
45 except requests.RequestException:
46 pass
47
48 parsed_url = urllib.parse.urlparse(url)
49 base_name = os.path.basename(parsed_url.path)
50
51 # hardcoded fix to return the cid for pds
52 if base_name == "com.atproto.sync.getBlob":
53 qs = urllib.parse.parse_qs(parsed_url.query)
54 if qs and qs.get("cid"):
55 return qs["cid"][0]
56
57 return base_name
58
59
60def probe_bytes(bytes: bytes) -> dict:
61 cmd = [
62 "ffprobe",
63 "-v", "error",
64 "-show_format",
65 "-show_streams",
66 "-print_format", "json",
67 "pipe:0",
68 ]
69 proc = subprocess.run(
70 cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE
71 )
72
73 if proc.returncode != 0:
74 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}")
75
76 return json.loads(proc.stdout)
77
78
79def convert_to_mp4(video_bytes: bytes) -> bytes:
80 cmd = [
81 "ffmpeg",
82 "-i", "pipe:0",
83 "-c:v", "libx264",
84 "-crf", "30",
85 "-preset", "slow",
86 "-c:a", "aac",
87 "-b:a", "128k",
88 "-movflags", "frag_keyframe+empty_moov+default_base_moof",
89 "-f", "mp4",
90 "pipe:1",
91 ]
92
93 proc = subprocess.Popen(
94 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
95 )
96 out_bytes, err = proc.communicate(input=video_bytes)
97
98 if proc.returncode != 0:
99 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
100
101 return out_bytes
102
103
104def compress_image(image_bytes: bytes, quality: int = 90):
105 cmd = [
106 "ffmpeg",
107 "-f", "image2pipe",
108 "-i", "pipe:0",
109 "-c:v", "webp",
110 "-q:v", str(quality),
111 "-f", "image2pipe",
112 "pipe:1",
113 ]
114
115 proc = subprocess.Popen(
116 cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
117 )
118 out_bytes, err = proc.communicate(input=image_bytes)
119
120 if proc.returncode != 0:
121 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
122
123 return out_bytes
124
125
126def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None:
127 response = requests.get(url, stream=True, timeout=20)
128 if response.status_code != 200:
129 LOGGER.info("Failed to download %s! %s", url, response.text)
130 return None
131
132 downloaded_bytes = b""
133 current_size = 0
134
135 for chunk in response.iter_content(chunk_size=8192):
136 if not chunk:
137 continue
138
139 current_size += len(chunk)
140 if current_size > max_bytes:
141 response.close()
142 return None
143
144 downloaded_bytes += chunk
145
146 return downloaded_bytes
147
148
149def get_media_meta(bytes: bytes):
150 probe = probe_bytes(bytes)
151 streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
152 if not streams:
153 raise ValueError("No video stream found")
154
155 media = streams[0]
156 return {
157 "width": int(media["width"]),
158 "height": int(media["height"]),
159 "duration": float(media.get("duration", probe["format"].get("duration", -1))),
160 }