social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

handle media

zenfyr.dev d1c38c76 bde55533

verified
Changed files
+251 -32
bluesky
cross
+76 -32
bluesky/input.py
···
from atproto.util import AtUri
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
-
from cross.attachments import LabelsAttachment, LanguagesAttachment, RemoteUrlAttachment
+
from cross.attachments import (
+
LabelsAttachment,
+
LanguagesAttachment,
+
MediaAttachment,
+
RemoteUrlAttachment,
+
)
+
from cross.media import Blob, download_blob
from cross.post import Post
from cross.service import InputService
from database.connection import DatabasePool
···
post_cid = cast(str, record["$xpost.strongRef"]["cid"])
parent_uri = cast(
-
str,
-
None if not record.get("reply") else record["reply"]["parent"]["uri"]
+
str, None if not record.get("reply") else record["reply"]["parent"]["uri"]
)
parent = None
if parent_uri:
parent = self._get_post(self.url, self.did, parent_uri)
if not parent:
-
self.log.info("Skipping %s, parent %s not found in db", post_uri, parent_uri)
+
self.log.info(
+
"Skipping %s, parent %s not found in db", post_uri, parent_uri
+
)
return
+
# TODO FRAGMENTS
post = Post(id=post_uri, parent_id=parent_uri, text=record["text"])
did, _, rid = AtUri.record_uri(post_uri)
-
post.attachments.put(RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}"))
+
post.attachments.put(
+
RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}")
+
)
-
# TODO Media Attachments
embed = record.get("embed", {})
if embed:
match cast(str, embed["$type"]):
···
if collection == "app.bsky.feed.post":
self.log.info("Skipping '%s'! Quote..", post_uri)
return
+
case "app.bsky.embed.images":
+
blobs: list[Blob] = []
+
for image in embed["images"]:
+
blob_cid = image["image"]["ref"]["$link"]
+
url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}"
+
self.log.info("Downloading %s...", blob_cid)
+
blob: Blob | None = download_blob(url, image.get("alt"))
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download blob %s.",
+
post_uri,
+
blob_cid,
+
)
+
return
+
blobs.append(blob)
+
post.attachments.put(MediaAttachment(blobs=blobs))
+
case "app.bsky.embed.video":
+
blob_cid = embed["video"]["ref"]["$link"]
+
url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}"
+
self.log.info("Downloading %s...", blob_cid)
+
blob: Blob | None = download_blob(url, embed.get("alt"))
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download blob %s.",
+
post_uri,
+
blob_cid,
+
)
+
return
+
post.attachments.put(MediaAttachment(blobs=[blob]))
case _:
self.log.warning(f"Unhandled embedd type {embed['$type']}")
pass
if "langs" in record:
-
post.attachments.put(
-
LanguagesAttachment(langs=record["langs"])
-
)
+
post.attachments.put(LanguagesAttachment(langs=record["langs"]))
if "labels" in record:
post.attachments.put(
LabelsAttachment(
···
)
if parent:
-
self._insert_post({
-
"user": self.did,
-
"service": self.url,
-
"identifier": post_uri,
-
"parent": parent['id'],
-
"root": parent['id'] if not parent['root'] else parent['root'],
-
"extra_data": json.dumps({'cid': post_cid})
-
})
+
self._insert_post(
+
{
+
"user": self.did,
+
"service": self.url,
+
"identifier": post_uri,
+
"parent": parent["id"],
+
"root": parent["id"] if not parent["root"] else parent["root"],
+
"extra_data": json.dumps({"cid": post_cid}),
+
}
+
)
else:
-
self._insert_post({
-
"user": self.did,
-
"service": self.url,
-
"identifier": post_uri,
-
"extra_data": json.dumps({'cid': post_cid})
-
})
+
self._insert_post(
+
{
+
"user": self.did,
+
"service": self.url,
+
"identifier": post_uri,
+
"extra_data": json.dumps({"cid": post_cid}),
+
}
+
)
for out in self.outputs:
self.submitter(lambda: out.accept_post(post))
···
reposted_uri = cast(str, record["subject"]["uri"])
reposted = self._get_post(self.url, self.did, reposted_uri)
if not reposted:
-
self.log.info("Skipping repost '%s' as reposted post '%s' was not found in the db.")
+
self.log.info(
+
"Skipping repost '%s' as reposted post '%s' was not found in the db."
+
)
return
-
self._insert_post({
-
"user": self.did,
-
"service": self.url,
-
"identifier": post_uri,
-
"reposted": reposted['id'],
-
"extra_data": json.dumps({'cid': post_cid})
-
})
+
self._insert_post(
+
{
+
"user": self.did,
+
"service": self.url,
+
"identifier": post_uri,
+
"reposted": reposted["id"],
+
"extra_data": json.dumps({"cid": post_cid}),
+
}
+
)
for out in self.outputs:
self.submitter(lambda: out.accept_repost(post_uri, reposted_uri))
···
else:
for output in self.outputs:
self.submitter(lambda: output.delete_post(post_id))
-
self._delete_post_by_id(post['id'])
+
self._delete_post_by_id(post["id"])
class BlueskyJetstreamInputService(BlueskyBaseInputService):
+5
cross/attachments.py
···
from dataclasses import dataclass
+
from cross.media import Blob
+
@dataclass(kw_only=True)
class Attachment:
···
class RemoteUrlAttachment(Attachment):
url: str
+
@dataclass(kw_only=True)
+
class MediaAttachment(Attachment):
+
blobs: list[Blob]
@dataclass(kw_only=True)
class QuoteAttachment(Attachment):
+170
cross/media.py
···
+
from dataclasses import dataclass, field
+
+
import json
+
import re
+
import os
+
from typing import Any, cast
+
import magic
+
import subprocess
+
import urllib.parse
+
+
import requests
+
+
FILENAME = re.compile(r'filename="?([^\";]*)"?')
+
MAGIC = magic.Magic(mime=True)
+
+
+
@dataclass
+
class Blob:
+
url: str
+
mime: str
+
io: bytes = field(repr=False)
+
name: str | None = None
+
alt: str | None = None
+
+
+
@dataclass
+
class MediaInfo:
+
width: int
+
height: int
+
duration: float | None = None
+
+
+
def mime_from_bytes(io: bytes) -> str:
+
mime = MAGIC.from_buffer(io)
+
if not mime:
+
mime = "application/octet-stream"
+
return mime
+
+
def download_blob(url: str, alt: str | None = None, max_bytes: int = 100_000_000) -> Blob | None:
+
name = get_filename_from_url(url)
+
io = download_chuncked(url, max_bytes)
+
if not io:
+
return None
+
return Blob(url, mime_from_bytes(io), io, name, alt)
+
+
def download_chuncked(url: str, max_bytes: int = 100_000_000) -> bytes | None:
+
response = requests.get(url, stream=True, timeout=20)
+
if response.status_code != 200:
+
return None
+
+
downloaded_bytes = b""
+
current_size = 0
+
+
for chunk in response.iter_content(chunk_size=8192):
+
if not chunk:
+
continue
+
+
current_size += len(chunk)
+
if current_size > max_bytes:
+
response.close()
+
return None
+
+
downloaded_bytes += chunk
+
+
return downloaded_bytes
+
+
+
def get_filename_from_url(url: str) -> str:
+
try:
+
response = requests.head(url, timeout=5, allow_redirects=True)
+
disposition = response.headers.get("Content-Disposition")
+
if disposition:
+
filename = FILENAME.findall(disposition)
+
if filename:
+
return filename[0]
+
except requests.RequestException:
+
pass
+
+
parsed_url = urllib.parse.urlparse(url)
+
base_name = os.path.basename(parsed_url.path)
+
+
# hardcoded fix to return the cid for pds blobs
+
if base_name == "com.atproto.sync.getBlob":
+
qs = urllib.parse.parse_qs(parsed_url.query)
+
if qs and qs.get("cid"):
+
return qs["cid"][0]
+
+
return base_name
+
+
+
def convert_to_mp4(video: Blob) -> Blob:
+
cmd = [
+
"ffmpeg",
+
"-i", "pipe:0",
+
"-c:v", "libx264",
+
"-crf", "30",
+
"-preset", "slow",
+
"-c:a", "aac",
+
"-b:a", "128k",
+
"-movflags", "frag_keyframe+empty_moov+default_base_moof",
+
"-f", "mp4",
+
"pipe:1",
+
]
+
+
proc = subprocess.Popen(
+
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+
)
+
out_bytes, err = proc.communicate(input=video.io)
+
+
if proc.returncode != 0:
+
raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
+
+
return Blob(video.url, mime_from_bytes(out_bytes), out_bytes, video.name, video.alt)
+
+
+
def compress_image(image: Blob, quality: int = 95) -> Blob:
+
cmd = [
+
"ffmpeg",
+
"-f", "image2pipe",
+
"-i", "pipe:0",
+
"-c:v", "webp",
+
"-q:v", str(quality),
+
"-f", "image2pipe",
+
"pipe:1",
+
]
+
+
proc = subprocess.Popen(
+
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+
)
+
out_bytes, err = proc.communicate(input=image.io)
+
+
if proc.returncode != 0:
+
raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
+
+
return Blob(image.url, "image/webp", out_bytes, image.name, image.alt)
+
+
+
def probe_bytes(bytes: bytes) -> dict[str, Any]:
+
cmd = [
+
"ffprobe",
+
"-v",
+
"error",
+
"-show_format",
+
"-show_streams",
+
"-print_format",
+
"json",
+
"pipe:0",
+
]
+
proc = subprocess.run(
+
cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+
)
+
+
if proc.returncode != 0:
+
raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}")
+
+
return json.loads(proc.stdout)
+
+
+
def get_media_meta(bytes: bytes) -> MediaInfo:
+
probe = probe_bytes(bytes)
+
streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
+
if not streams:
+
raise ValueError("No video stream found")
+
+
media: dict[str, Any] = cast(dict[str, Any], streams[0])
+
return MediaInfo(
+
width=media["width"],
+
height=media["height"],
+
duration=media.get("duration", probe["format"].get("duration")),
+
)