social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import requests
2import subprocess
3import json
4import re, urllib.parse, os
5from util import LOGGER
6import magic
7
8FILENAME = re.compile(r'filename="?([^\";]*)"?')
9MAGIC = magic.Magic(mime=True)
10
11class MediaInfo():
12 def __init__(self, url: str, name: str, mime: str, alt: str, io: bytes) -> None:
13 self.url = url
14 self.name = name
15 self.mime = mime
16 self.alt = alt
17 self.io = io
18
19def download_media(url: str, alt: str) -> MediaInfo | None:
20 name = get_filename_from_url(url)
21 io = download_blob(url, max_bytes=100_000_000)
22 if not io:
23 LOGGER.error("Failed to download media attachment! %s", url)
24 return None
25 mime = MAGIC.from_buffer(io)
26 if not mime:
27 mime = 'application/octet-stream'
28 return MediaInfo(url, name, mime, alt, io)
29
30def get_filename_from_url(url):
31 try:
32 response = requests.head(url, allow_redirects=True)
33 disposition = response.headers.get('Content-Disposition')
34 if disposition:
35 filename = FILENAME.findall(disposition)
36 if filename:
37 return filename[0]
38 except requests.RequestException:
39 pass
40
41 parsed_url = urllib.parse.urlparse(url)
42 base_name = os.path.basename(parsed_url.path)
43
44 # hardcoded fix to return the cid for pds
45 if base_name == 'com.atproto.sync.getBlob':
46 qs = urllib.parse.parse_qs(parsed_url.query)
47 if qs and qs.get('cid'):
48 return qs['cid'][0]
49
50 return base_name
51
52def probe_bytes(bytes: bytes) -> dict:
53 cmd = [
54 'ffprobe',
55 '-v', 'error',
56 '-show_format',
57 '-show_streams',
58 '-print_format', 'json',
59 'pipe:0'
60 ]
61 proc = subprocess.run(cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
62
63 if proc.returncode != 0:
64 raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}")
65
66 return json.loads(proc.stdout)
67
68def convert_to_mp4(video_bytes: bytes) -> bytes:
69 cmd = [
70 'ffmpeg',
71 '-i', 'pipe:0',
72 '-c:v', 'libx264',
73 '-crf', '30',
74 '-preset', 'slow',
75 '-c:a', 'aac',
76 '-b:a', '128k',
77 '-movflags', 'frag_keyframe+empty_moov+default_base_moof',
78 '-f', 'mp4',
79 'pipe:1'
80 ]
81
82 proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
83 out_bytes, err = proc.communicate(input=video_bytes)
84
85 if proc.returncode != 0:
86 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
87
88 return out_bytes
89
90def compress_image(image_bytes: bytes, quality: int = 90):
91 cmd = [
92 'ffmpeg',
93 '-f', 'image2pipe',
94 '-i', 'pipe:0',
95 '-c:v', 'webp',
96 '-q:v', str(quality),
97 '-f', 'image2pipe',
98 'pipe:1'
99 ]
100
101 proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
102 out_bytes, err = proc.communicate(input=image_bytes)
103
104 if proc.returncode != 0:
105 raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
106
107 return out_bytes
108
109def download_blob(url: str, max_bytes: int = 5_000_000) -> bytes | None:
110 response = requests.get(url, stream=True, timeout=20)
111 if response.status_code != 200:
112 LOGGER.info("Failed to download %s! %s", url, response.text)
113 return None
114
115 downloaded_bytes = b""
116 current_size = 0
117
118 for chunk in response.iter_content(chunk_size=8192):
119 if not chunk:
120 continue
121
122 current_size += len(chunk)
123 if current_size > max_bytes:
124 response.close()
125 return None
126
127 downloaded_bytes += chunk
128
129 return downloaded_bytes
130
131
132def get_media_meta(bytes: bytes):
133 probe = probe_bytes(bytes)
134 streams = [s for s in probe['streams'] if s['codec_type'] == 'video']
135 if not streams:
136 raise ValueError("No video stream found")
137
138 media = streams[0]
139 return {
140 'width': int(media['width']),
141 'height': int(media['height']),
142 'duration': float(media.get('duration', probe['format'].get('duration', -1)))
143 }