social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

Compare changes

Choose any two refs to compare.

+15
.tangled/workflows/run-tests.yml
···
+
when:
+
- event: ["push", "manual"]
+
branch: ["next"]
+
+
engine: nixery
+
+
dependencies:
+
nixpkgs:
+
- uv
+
- python312
+
+
steps:
+
- name: run tests
+
command: |
+
uv run --python python3.12 pytest -vv
+8 -3
README.md
···
-
# XPost
+
# xpost next
+
+
> [!NOTE]
+
> this is the dev branch for xpost next, a full rewrite of xpost. the older version is available on the master branch.
+
>
+
> planned work for this branch can be found and tracked here: https://tangled.org/@zenfyr.dev/xpost/issues/1
-
XPost is a social media cross-posting tool that differs from others by using streaming APIs to allow instant, zero-input cross-posting. this means you can continue posting on your preferred platform without using special apps.
+
xpost is a social media cross-posting tool that differs from others by using streaming APIs to allow instant, zero-input cross-posting. this means you can continue posting on your preferred platform without using special apps.
-
XPost tries to support as many features as possible. for example, when cross-posting from mastodon to bluesky, unsupported file types will be attached as links. posts with mixed media or too many files will be split and spread across text.
+
xpost tries to support as many features as possible. for example, when cross-posting from mastodon to bluesky, unsupported file types will be attached as links. posts with mixed media or too many files will be split and spread across text.
+164
atproto/identity.py
···
+
from pathlib import Path
+
from typing import Any, override
+
+
import dns.resolver
+
import requests
+
+
import env
+
from util.cache import Cacheable, TTLCache
+
from util.util import LOGGER, normalize_service_url, shutdown_hook
+
+
+
class DidDocument():
+
def __init__(self, raw_doc: dict[str, Any]) -> None:
+
self.raw: dict[str, Any] = raw_doc
+
self.atproto_pds: str | None = None
+
+
def get_atproto_pds(self) -> str | None:
+
if self.atproto_pds:
+
return self.atproto_pds
+
+
services = self.raw.get("service")
+
if not services:
+
return None
+
+
for service in services:
+
if (
+
service.get("id") == "#atproto_pds"
+
and service.get("type") == "AtprotoPersonalDataServer"
+
):
+
endpoint = service.get("serviceEndpoint")
+
if endpoint:
+
url = normalize_service_url(endpoint)
+
self.atproto_pds = url
+
return url
+
self.atproto_pds = ""
+
return None
+
+
+
class DidResolver(Cacheable):
+
def __init__(self, plc_host: str) -> None:
+
self.plc_host: str = plc_host
+
self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60)
+
+
def try_resolve_plc(self, did: str) -> DidDocument | None:
+
url = f"{self.plc_host}/{did}"
+
response = requests.get(url, timeout=10, allow_redirects=True)
+
+
if response.status_code == 200:
+
return DidDocument(response.json())
+
elif response.status_code == 404 or response.status_code == 410:
+
return None # tombstone or not registered
+
else:
+
response.raise_for_status()
+
+
def try_resolve_web(self, did: str) -> DidDocument | None:
+
url = f"http://{did[len('did:web:') :]}/.well-known/did.json"
+
response = requests.get(url, timeout=10, allow_redirects=True)
+
+
if response.status_code == 200:
+
return DidDocument(response.json())
+
elif response.status_code == 404 or response.status_code == 410:
+
return None # tombstone or gone
+
else:
+
response.raise_for_status()
+
+
def resolve_did(self, did: str) -> DidDocument:
+
cached = self.__cache.get(did)
+
if cached:
+
return cached
+
+
if did.startswith("did:plc:"):
+
from_plc = self.try_resolve_plc(did)
+
if from_plc:
+
self.__cache.set(did, from_plc)
+
return from_plc
+
elif did.startswith("did:web:"):
+
from_web = self.try_resolve_web(did)
+
if from_web:
+
self.__cache.set(did, from_web)
+
return from_web
+
raise Exception(f"Failed to resolve {did}!")
+
+
@override
+
def dump_cache(self, path: Path):
+
self.__cache.dump_cache(path)
+
+
@override
+
def load_cache(self, path: Path):
+
self.__cache.load_cache(path)
+
+
class HandleResolver(Cacheable):
+
def __init__(self) -> None:
+
self.__cache: TTLCache[str, str] = TTLCache(ttl_seconds=12 * 60 * 60)
+
+
def try_resolve_dns(self, handle: str) -> str | None:
+
try:
+
dns_query = f"_atproto.{handle}"
+
answers = dns.resolver.resolve(dns_query, "TXT")
+
+
for rdata in answers:
+
for txt_data in rdata.strings:
+
did = txt_data.decode("utf-8").strip()
+
if did.startswith("did="):
+
return did[4:]
+
except dns.resolver.NXDOMAIN:
+
LOGGER.debug(f"DNS record not found for _atproto.{handle}")
+
return None
+
except dns.resolver.NoAnswer:
+
LOGGER.debug(f"No TXT records found for _atproto.{handle}")
+
return None
+
+
def try_resolve_http(self, handle: str) -> str | None:
+
url = f"http://{handle}/.well-known/atproto-did"
+
response = requests.get(url, timeout=10, allow_redirects=True)
+
+
if response.status_code == 200:
+
did = response.text.strip()
+
if did.startswith("did:"):
+
return did
+
else:
+
raise ValueError(f"Got invalid did: from {url} = {did}!")
+
else:
+
response.raise_for_status()
+
+
def resolve_handle(self, handle: str) -> str:
+
cached = self.__cache.get(handle)
+
if cached:
+
return cached
+
+
from_dns = self.try_resolve_dns(handle)
+
if from_dns:
+
self.__cache.set(handle, from_dns)
+
return from_dns
+
+
from_http = self.try_resolve_http(handle)
+
if from_http:
+
self.__cache.set(handle, from_http)
+
return from_http
+
+
raise Exception(f"Failed to resolve handle {handle}!")
+
+
@override
+
def dump_cache(self, path: Path):
+
self.__cache.dump_cache(path)
+
+
@override
+
def load_cache(self, path: Path):
+
self.__cache.load_cache(path)
+
+
+
handle_resolver = HandleResolver()
+
did_resolver = DidResolver(env.PLC_HOST)
+
+
did_cache = env.CACHE_DIR.joinpath('did.cache')
+
handle_cache = env.CACHE_DIR.joinpath('handle.cache')
+
+
did_resolver.load_cache(did_cache)
+
handle_resolver.load_cache(handle_cache)
+
+
def cache_dump():
+
did_resolver.dump_cache(did_cache)
+
handle_resolver.dump_cache(handle_cache)
+
+
shutdown_hook.append(cache_dump)
+11
atproto/util.py
···
+
URI = "at://"
+
URI_LEN = len(URI)
+
+
+
class AtUri:
+
@classmethod
+
def record_uri(cls, uri: str) -> tuple[str, str, str]:
+
did, collection, rid = uri[URI_LEN:].split("/")
+
if not (did and collection and rid):
+
raise ValueError(f"Ivalid record uri {uri}!")
+
return did, collection, rid
+50
bluesky/info.py
···
+
from abc import ABC, abstractmethod
+
from typing import Any, override
+
+
from atproto.identity import did_resolver, handle_resolver
+
from cross.service import Service
+
from util.util import normalize_service_url
+
+
SERVICE = "https://bsky.app"
+
+
+
def validate_and_transform(data: dict[str, Any]):
+
if not data["handle"] and not data["did"]:
+
raise KeyError("no 'handle' or 'did' specified for bluesky input!")
+
+
if "did" in data:
+
did = str(data["did"]) # only did:web and did:plc are supported
+
if not did.startswith("did:plc:") and not did.startswith("did:web:"):
+
raise ValueError(f"Invalid handle {did}!")
+
+
if "pds" in data:
+
data["pds"] = normalize_service_url(data["pds"])
+
+
+
class BlueskyService(ABC, Service):
+
pds: str
+
did: str
+
+
def _init_identity(self) -> None:
+
handle, did, pds = self.get_identity_options()
+
if did:
+
self.did = did
+
if pds:
+
self.pds = pds
+
+
if not did:
+
if not handle:
+
raise KeyError("No did: or atproto handle provided!")
+
self.log.info("Resolving ATP identity for %s...", handle)
+
self.did = handle_resolver.resolve_handle(handle)
+
+
if not pds:
+
self.log.info("Resolving PDS from %s DID document...", self.did)
+
atp_pds = did_resolver.resolve_did(self.did).get_atproto_pds()
+
if not atp_pds:
+
raise Exception("Failed to resolve atproto pds for %s")
+
self.pds = atp_pds
+
+
@abstractmethod
+
def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
+
pass
+283
bluesky/input.py
···
+
import asyncio
+
import json
+
import re
+
from abc import ABC
+
from dataclasses import dataclass, field
+
from typing import Any, cast, override
+
+
import websockets
+
+
from atproto.util import AtUri
+
from bluesky.tokens import tokenize_post
+
from bluesky.info import SERVICE, BlueskyService, validate_and_transform
+
from cross.attachments import (
+
LabelsAttachment,
+
LanguagesAttachment,
+
MediaAttachment,
+
QuoteAttachment,
+
RemoteUrlAttachment,
+
)
+
from cross.media import Blob, download_blob
+
from cross.post import Post
+
from cross.service import InputService
+
from database.connection import DatabasePool
+
from util.util import normalize_service_url
+
+
+
@dataclass(kw_only=True)
+
class BlueskyInputOptions:
+
handle: str | None = None
+
did: str | None = None
+
pds: str | None = None
+
filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "BlueskyInputOptions":
+
validate_and_transform(data)
+
+
if "filters" in data:
+
data["filters"] = [re.compile(r) for r in data["filters"]]
+
+
return BlueskyInputOptions(**data)
+
+
+
@dataclass(kw_only=True)
+
class BlueskyJetstreamInputOptions(BlueskyInputOptions):
+
jetstream: str = "wss://jetstream2.us-west.bsky.network/subscribe"
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "BlueskyJetstreamInputOptions":
+
jetstream = data.pop("jetstream", None)
+
+
base = BlueskyInputOptions.from_dict(data).__dict__.copy()
+
if jetstream:
+
base["jetstream"] = normalize_service_url(jetstream)
+
+
return BlueskyJetstreamInputOptions(**base)
+
+
+
class BlueskyBaseInputService(BlueskyService, InputService, ABC):
+
def __init__(self, db: DatabasePool) -> None:
+
super().__init__(SERVICE, db)
+
+
def _on_post(self, record: dict[str, Any]):
+
post_uri = cast(str, record["$xpost.strongRef"]["uri"])
+
post_cid = cast(str, record["$xpost.strongRef"]["cid"])
+
+
parent_uri = cast(
+
str, None if not record.get("reply") else record["reply"]["parent"]["uri"]
+
)
+
parent = None
+
if parent_uri:
+
parent = self._get_post(self.url, self.did, parent_uri)
+
if not parent:
+
self.log.info(
+
"Skipping %s, parent %s not found in db", post_uri, parent_uri
+
)
+
return
+
+
tokens = tokenize_post(record["text"], record.get('facets', {}))
+
post = Post(id=post_uri, parent_id=parent_uri, tokens=tokens)
+
+
did, _, rid = AtUri.record_uri(post_uri)
+
post.attachments.put(
+
RemoteUrlAttachment(url=f"https://bsky.app/profile/{did}/post/{rid}")
+
)
+
+
embed: dict[str, Any] = record.get("embed", {})
+
blob_urls: list[tuple[str, str, str | None]] = []
+
def handle_embeds(embed: dict[str, Any]) -> str | None:
+
nonlocal blob_urls, post
+
match cast(str, embed["$type"]):
+
case "app.bsky.embed.record" | "app.bsky.embed.recordWithMedia":
+
rcrd = embed['record']['record'] if embed['record'].get('record') else embed['record']
+
did, collection, _ = AtUri.record_uri(rcrd["uri"])
+
if collection != "app.bsky.feed.post":
+
return f"Unhandled record collection {collection}"
+
if did != self.did:
+
return ""
+
+
rquote = self._get_post(self.url, did, rcrd["uri"])
+
if not rquote:
+
return f"Quote {rcrd["uri"]} not found in the db"
+
post.attachments.put(QuoteAttachment(quoted_id=rcrd["uri"], quoted_user=did))
+
+
if embed.get('media'):
+
return handle_embeds(embed["media"])
+
case "app.bsky.embed.images":
+
for image in embed["images"]:
+
blob_cid = image["image"]["ref"]["$link"]
+
url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}"
+
blob_urls.append((url, blob_cid, image.get("alt")))
+
case "app.bsky.embed.video":
+
blob_cid = embed["video"]["ref"]["$link"]
+
url = f"{self.pds}/xrpc/com.atproto.sync.getBlob?did={self.did}&cid={blob_cid}"
+
blob_urls.append((url, blob_cid, embed.get("alt")))
+
case _:
+
self.log.warning(f"Unhandled embed type {embed['$type']}")
+
+
if embed:
+
fexit = handle_embeds(embed)
+
if fexit is not None:
+
self.log.info("Skipping %s! %s", post_uri, fexit)
+
return
+
+
if blob_urls:
+
blobs: list[Blob] = []
+
for url, cid, alt in blob_urls:
+
self.log.info("Downloading %s...", cid)
+
blob: Blob | None = download_blob(url, alt)
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download blob %s.", post_uri, cid
+
)
+
return
+
blobs.append(blob)
+
post.attachments.put(MediaAttachment(blobs=blobs))
+
+
if "langs" in record:
+
post.attachments.put(LanguagesAttachment(langs=record["langs"]))
+
if "labels" in record:
+
post.attachments.put(
+
LabelsAttachment(
+
labels=[
+
label["val"].replace("-", " ") for label in record["values"]
+
]
+
),
+
)
+
+
if parent:
+
self._insert_post(
+
{
+
"user": self.did,
+
"service": self.url,
+
"identifier": post_uri,
+
"parent": parent["id"],
+
"root": parent["id"] if not parent["root"] else parent["root"],
+
"extra_data": json.dumps({"cid": post_cid}),
+
}
+
)
+
else:
+
self._insert_post(
+
{
+
"user": self.did,
+
"service": self.url,
+
"identifier": post_uri,
+
"extra_data": json.dumps({"cid": post_cid}),
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_post(post))
+
+
def _on_repost(self, record: dict[str, Any]):
+
post_uri = cast(str, record["$xpost.strongRef"]["uri"])
+
post_cid = cast(str, record["$xpost.strongRef"]["cid"])
+
+
reposted_uri = cast(str, record["subject"]["uri"])
+
reposted = self._get_post(self.url, self.did, reposted_uri)
+
if not reposted:
+
self.log.info(
+
"Skipping repost '%s' as reposted post '%s' was not found in the db.",
+
post_uri,
+
reposted_uri,
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.did,
+
"service": self.url,
+
"identifier": post_uri,
+
"reposted": reposted["id"],
+
"extra_data": json.dumps({"cid": post_cid}),
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_repost(post_uri, reposted_uri))
+
+
def _on_delete_post(self, post_id: str, repost: bool):
+
post = self._get_post(self.url, self.did, post_id)
+
if not post:
+
return
+
+
if repost:
+
for output in self.outputs:
+
self.submitter(lambda: output.delete_repost(post_id))
+
else:
+
for output in self.outputs:
+
self.submitter(lambda: output.delete_post(post_id))
+
self._delete_post_by_id(post["id"])
+
+
+
class BlueskyJetstreamInputService(BlueskyBaseInputService):
+
def __init__(self, db: DatabasePool, options: BlueskyJetstreamInputOptions) -> None:
+
super().__init__(db)
+
self.options: BlueskyJetstreamInputOptions = options
+
self._init_identity()
+
+
@override
+
def get_identity_options(self) -> tuple[str | None, str | None, str | None]:
+
return (self.options.handle, self.options.did, self.options.pds)
+
+
def _accept_msg(self, msg: websockets.Data) -> None:
+
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
+
if data.get("did") != self.did:
+
return
+
commit: dict[str, Any] | None = data.get("commit")
+
if not commit:
+
return
+
+
commit_type: str = cast(str, commit["operation"])
+
match commit_type:
+
case "create":
+
record: dict[str, Any] = cast(dict[str, Any], commit["record"])
+
record["$xpost.strongRef"] = {
+
"cid": commit["cid"],
+
"uri": f"at://{self.did}/{commit['collection']}/{commit['rkey']}",
+
}
+
+
match cast(str, commit["collection"]):
+
case "app.bsky.feed.post":
+
self._on_post(record)
+
case "app.bsky.feed.repost":
+
self._on_repost(record)
+
case _:
+
pass
+
case "delete":
+
post_id: str = (
+
f"at://{self.did}/{commit['collection']}/{commit['rkey']}"
+
)
+
match cast(str, commit["collection"]):
+
case "app.bsky.feed.post":
+
self._on_delete_post(post_id, False)
+
case "app.bsky.feed.repost":
+
self._on_delete_post(post_id, True)
+
case _:
+
pass
+
case _:
+
pass
+
+
@override
+
async def listen(self):
+
url = self.options.jetstream + "?"
+
url += "wantedCollections=app.bsky.feed.post"
+
url += "&wantedCollections=app.bsky.feed.repost"
+
url += f"&wantedDids={self.did}"
+
+
async for ws in websockets.connect(url):
+
try:
+
self.log.info("Listening to %s...", self.options.jetstream)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
self.submitter(lambda: self._accept_msg(msg))
+
+
listen = asyncio.create_task(listen_for_messages())
+
+
_ = await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
self.log.error(e, stack_info=True, exc_info=True)
+
self.log.info("Reconnecting to %s...", self.options.jetstream)
+
continue
+95
bluesky/tokens.py
···
+
from cross.tokens import LinkToken, MentionToken, TagToken, TextToken, Token
+
+
+
def tokenize_post(text: str, facets: list[dict]) -> list[Token]:
+
def decode(ut8: bytes) -> str:
+
return ut8.decode(encoding="utf-8")
+
+
if not text:
+
return []
+
ut8_text = text.encode(encoding="utf-8")
+
if not facets:
+
return [TextToken(text=decode(ut8_text))]
+
+
slices: list[tuple[int, int, str, str]] = []
+
+
for facet in facets:
+
features: list[dict] = facet.get("features", [])
+
if not features:
+
continue
+
+
# we don't support overlapping facets/features
+
feature = features[0]
+
feature_type = feature["$type"]
+
index = facet["index"]
+
match feature_type:
+
case "app.bsky.richtext.facet#tag":
+
slices.append(
+
(index["byteStart"], index["byteEnd"], "tag", feature["tag"])
+
)
+
case "app.bsky.richtext.facet#link":
+
slices.append(
+
(index["byteStart"], index["byteEnd"], "link", feature["uri"])
+
)
+
case "app.bsky.richtext.facet#mention":
+
slices.append(
+
(index["byteStart"], index["byteEnd"], "mention", feature["did"])
+
)
+
+
if not slices:
+
return [TextToken(text=decode(ut8_text))]
+
+
slices.sort(key=lambda s: s[0])
+
unique: list[tuple[int, int, str, str]] = []
+
current_end = 0
+
for start, end, ttype, val in slices:
+
if start >= current_end:
+
unique.append((start, end, ttype, val))
+
current_end = end
+
+
if not unique:
+
return [TextToken(text=decode(ut8_text))]
+
+
tokens: list[Token] = []
+
prev = 0
+
+
for start, end, ttype, val in unique:
+
if start > prev:
+
# text between facets
+
tokens.append(TextToken(text=decode(ut8_text[prev:start])))
+
# facet token
+
match ttype:
+
case "link":
+
label = decode(ut8_text[start:end])
+
+
# try to unflatten links
+
split = val.split("://", 1)
+
if len(split) > 1:
+
if split[1].startswith(label):
+
tokens.append(LinkToken(href=val))
+
prev = end
+
continue
+
+
if label.endswith("...") and split[1].startswith(label[:-3]):
+
tokens.append(LinkToken(href=val))
+
prev = end
+
continue
+
+
tokens.append(LinkToken(href=val, label=label))
+
case "tag":
+
tag = decode(ut8_text[start:end])
+
tokens.append(TagToken(tag=tag[1:] if tag.startswith("#") else tag))
+
case "mention":
+
mention = decode(ut8_text[start:end])
+
tokens.append(
+
MentionToken(
+
username=mention[1:] if mention.startswith("@") else mention,
+
uri=val,
+
)
+
)
+
prev = end
+
+
if prev < len(ut8_text):
+
tokens.append(TextToken(text=decode(ut8_text[prev:])))
+
+
return tokens
+23 -8
cross/attachments.py
···
from dataclasses import dataclass
+
from cross.media import Blob
-
@dataclass
-
class Attachment():
+
+
@dataclass(kw_only=True)
+
class Attachment:
pass
-
@dataclass
-
class SpoilerAttachment(Attachment):
-
spoiler: str
-
@dataclass
+
@dataclass(kw_only=True)
+
class LabelsAttachment(Attachment):
+
labels: list[str]
+
+
+
@dataclass(kw_only=True)
class LanguagesAttachment(Attachment):
langs: list[str]
-
@dataclass
+
+
@dataclass(kw_only=True)
class SensitiveAttachment(Attachment):
sensitive: bool
-
@dataclass
+
+
@dataclass(kw_only=True)
class RemoteUrlAttachment(Attachment):
url: str
+
+
@dataclass(kw_only=True)
+
class MediaAttachment(Attachment):
+
blobs: list[Blob]
+
+
@dataclass(kw_only=True)
+
class QuoteAttachment(Attachment):
+
quoted_id: str
+
quoted_user: str
-18
cross/fragments.py
···
-
from dataclasses import dataclass
-
-
@dataclass
-
class Fragment:
-
start: int
-
end: int
-
-
@dataclass
-
class LinkFragment(Fragment):
-
url: str
-
-
@dataclass
-
class TagFragment(Fragment):
-
tag: str
-
-
@dataclass
-
class MentionFragment(Fragment):
-
uri: str
+170
cross/media.py
···
+
from dataclasses import dataclass, field
+
+
import json
+
import re
+
import os
+
from typing import Any, cast
+
import magic
+
import subprocess
+
import urllib.parse
+
+
import requests
+
+
FILENAME = re.compile(r'filename="?([^\";]*)"?')
+
MAGIC = magic.Magic(mime=True)
+
+
+
@dataclass
+
class Blob:
+
url: str
+
mime: str
+
io: bytes = field(repr=False)
+
name: str | None = None
+
alt: str | None = None
+
+
+
@dataclass
+
class MediaInfo:
+
width: int
+
height: int
+
duration: float | None = None
+
+
+
def mime_from_bytes(io: bytes) -> str:
+
mime = MAGIC.from_buffer(io)
+
if not mime:
+
mime = "application/octet-stream"
+
return mime
+
+
def download_blob(url: str, alt: str | None = None, max_bytes: int = 100_000_000) -> Blob | None:
+
name = get_filename_from_url(url)
+
io = download_chuncked(url, max_bytes)
+
if not io:
+
return None
+
return Blob(url, mime_from_bytes(io), io, name, alt)
+
+
def download_chuncked(url: str, max_bytes: int = 100_000_000) -> bytes | None:
+
response = requests.get(url, stream=True, timeout=20)
+
if response.status_code != 200:
+
return None
+
+
downloaded_bytes = b""
+
current_size = 0
+
+
for chunk in response.iter_content(chunk_size=8192):
+
if not chunk:
+
continue
+
+
current_size += len(chunk)
+
if current_size > max_bytes:
+
response.close()
+
return None
+
+
downloaded_bytes += chunk
+
+
return downloaded_bytes
+
+
+
def get_filename_from_url(url: str) -> str:
+
try:
+
response = requests.head(url, timeout=5, allow_redirects=True)
+
disposition = response.headers.get("Content-Disposition")
+
if disposition:
+
filename = FILENAME.findall(disposition)
+
if filename:
+
return filename[0]
+
except requests.RequestException:
+
pass
+
+
parsed_url = urllib.parse.urlparse(url)
+
base_name = os.path.basename(parsed_url.path)
+
+
# hardcoded fix to return the cid for pds blobs
+
if base_name == "com.atproto.sync.getBlob":
+
qs = urllib.parse.parse_qs(parsed_url.query)
+
if qs and qs.get("cid"):
+
return qs["cid"][0]
+
+
return base_name
+
+
+
def convert_to_mp4(video: Blob) -> Blob:
+
cmd = [
+
"ffmpeg",
+
"-i", "pipe:0",
+
"-c:v", "libx264",
+
"-crf", "30",
+
"-preset", "slow",
+
"-c:a", "aac",
+
"-b:a", "128k",
+
"-movflags", "frag_keyframe+empty_moov+default_base_moof",
+
"-f", "mp4",
+
"pipe:1",
+
]
+
+
proc = subprocess.Popen(
+
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+
)
+
out_bytes, err = proc.communicate(input=video.io)
+
+
if proc.returncode != 0:
+
raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
+
+
return Blob(video.url, mime_from_bytes(out_bytes), out_bytes, video.name, video.alt)
+
+
+
def compress_image(image: Blob, quality: int = 95) -> Blob:
+
cmd = [
+
"ffmpeg",
+
"-f", "image2pipe",
+
"-i", "pipe:0",
+
"-c:v", "webp",
+
"-q:v", str(quality),
+
"-f", "image2pipe",
+
"pipe:1",
+
]
+
+
proc = subprocess.Popen(
+
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+
)
+
out_bytes, err = proc.communicate(input=image.io)
+
+
if proc.returncode != 0:
+
raise RuntimeError(f"ffmpeg compress failed: {err.decode()}")
+
+
return Blob(image.url, "image/webp", out_bytes, image.name, image.alt)
+
+
+
def probe_bytes(bytes: bytes) -> dict[str, Any]:
+
cmd = [
+
"ffprobe",
+
"-v",
+
"error",
+
"-show_format",
+
"-show_streams",
+
"-print_format",
+
"json",
+
"pipe:0",
+
]
+
proc = subprocess.run(
+
cmd, input=bytes, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+
)
+
+
if proc.returncode != 0:
+
raise RuntimeError(f"ffprobe failed: {proc.stderr.decode()}")
+
+
return json.loads(proc.stdout)
+
+
+
def get_media_meta(bytes: bytes) -> MediaInfo:
+
probe = probe_bytes(bytes)
+
streams = [s for s in probe["streams"] if s["codec_type"] == "video"]
+
if not streams:
+
raise ValueError("No video stream found")
+
+
media: dict[str, Any] = cast(dict[str, Any], streams[0])
+
return MediaInfo(
+
width=media["width"],
+
height=media["height"],
+
duration=media.get("duration", probe["format"].get("duration")),
+
)
+14 -8
cross/post.py
···
from dataclasses import dataclass, field
+
from typing import TypeVar
+
from cross.attachments import Attachment
-
from cross.fragments import Fragment
-
from typing import TypeVar
+
from cross.tokens import Token
+
+
T = TypeVar("T", bound=Attachment)
-
T = TypeVar('T', bound=Attachment)
class AttachmentKeeper:
def __init__(self) -> None:
self._map: dict[type, Attachment] = {}
-
def put(self, cls: type[T], attachment: T) -> None:
-
self._map[cls] = attachment
+
def put(self, attachment: Attachment) -> None:
+
self._map[attachment.__class__] = attachment
def get(self, cls: type[T]) -> T | None:
instance = self._map.get(cls)
···
raise TypeError(f"Expected {cls.__name__}, got {type(instance).__name__}")
return instance
+
def __repr__(self) -> str:
+
return f"AttachmentKeeper(_map={self._map.values()})"
+
+
@dataclass
class Post:
id: str
parent_id: str | None
-
text: bytes # utf-8 text bytes
-
attachments: AttachmentKeeper
-
fragments: list[Fragment] = field(default_factory=list)
+
tokens: list[Token]
+
text_type: str = "text/plain"
+
attachments: AttachmentKeeper = field(default_factory=AttachmentKeeper)
+140 -10
cross/service.py
···
-
from pathlib import Path
+
import logging
import sqlite3
-
from typing import cast
+
from abc import ABC, abstractmethod
+
from typing import Any, Callable, cast
-
from database.connection import get_conn
+
from cross.post import Post
+
from database.connection import DatabasePool
+
+
columns: list[str] = [
+
"user",
+
"service",
+
"identifier",
+
"parent",
+
"root",
+
"reposted",
+
"extra_data",
+
]
+
placeholders: str = ", ".join(["?" for _ in columns])
+
column_names: str = ", ".join(columns)
class Service:
-
def __init__(self, url: str, db: Path) -> None:
+
def __init__(self, url: str, db: DatabasePool) -> None:
self.url: str = url
-
self.conn: sqlite3.Connection = get_conn(db)
+
self.db: DatabasePool = db
+
self.log: logging.Logger = logging.getLogger(self.__class__.__name__)
+
# self._lock: threading.Lock = threading.Lock()
-
def get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None:
-
cursor = self.conn.cursor()
+
def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None:
+
cursor = self.db.get_conn().cursor()
_ = cursor.execute(
"""
SELECT * FROM posts
WHERE service = ?
-
AND user_id = ?
+
AND user = ?
AND identifier = ?
""",
(url, user, identifier),
)
return cast(sqlite3.Row, cursor.fetchone())
-
def get_post_by_id(self, id: int) -> sqlite3.Row | None:
-
cursor = self.conn.cursor()
+
def _get_post_by_id(self, id: int) -> sqlite3.Row | None:
+
cursor = self.db.get_conn().cursor()
_ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
return cast(sqlite3.Row, cursor.fetchone())
+
+
def _get_mappings(
+
self, original: int, service: str, user: str
+
) -> list[sqlite3.Row]:
+
cursor = self.db.get_conn().cursor()
+
_ = cursor.execute(
+
"""
+
SELECT *
+
FROM posts AS p
+
JOIN mappings AS m
+
ON p.id = m.mapped
+
WHERE m.original = ?
+
AND p.service = ?
+
AND p.user = ?
+
ORDER BY p.id;
+
""",
+
(original, service, user),
+
)
+
return cursor.fetchall()
+
+
def _find_mapped_thread(
+
self, parent: str, iservice: str, iuser: str, oservice: str, ouser: str
+
):
+
reply_data = self._get_post(iservice, iuser, parent)
+
if not reply_data:
+
return None
+
+
reply_mappings: list[sqlite3.Row] | None = self._get_mappings(
+
reply_data["id"], oservice, ouser
+
)
+
if not reply_mappings:
+
return None
+
+
reply_identifier: sqlite3.Row = reply_mappings[-1]
+
root_identifier: sqlite3.Row = reply_mappings[0]
+
+
if reply_data["root_id"]:
+
root_data = self._get_post_by_id(reply_data["root_id"])
+
if not root_data:
+
return None
+
+
root_mappings = self._get_mappings(reply_data["root_id"], oservice, ouser)
+
if not root_mappings:
+
return None
+
root_identifier = root_mappings[0]
+
+
return (
+
root_identifier[0], # real ids
+
reply_identifier[0],
+
reply_data["root_id"], # db ids
+
reply_data["id"],
+
)
+
+
def _insert_post(self, post_data: dict[str, Any]):
+
values = [post_data.get(col) for col in columns]
+
cursor = self.db.get_conn().cursor()
+
_ = cursor.execute(
+
f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values
+
)
+
+
def _insert_post_mapping(self, original: int, mapped: int):
+
cursor = self.db.get_conn().cursor()
+
_ = cursor.execute(
+
"INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);",
+
(original, mapped),
+
)
+
_ = cursor.execute(
+
"INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);",
+
(mapped, original),
+
)
+
+
def _delete_post(self, url: str, user: str, identifier: str):
+
cursor = self.db.get_conn().cursor()
+
_ = cursor.execute(
+
"""
+
DELETE FROM posts
+
WHERE identifier = ?
+
AND service = ?
+
AND user = ?
+
""",
+
(identifier, url, user),
+
)
+
+
def _delete_post_by_id(self, id: int):
+
cursor = self.db.get_conn().cursor()
+
_ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,))
+
+
+
class OutputService(Service):
+
def accept_post(self, service: str, user: str, post: Post):
+
self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
+
+
def delete_post(self, service: str, user: str, post_id: str):
+
self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
+
+
def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
+
self.log.warning(
+
"NOT IMPLEMENTED (%s), accept_repost %s of %s",
+
self.url,
+
repost_id,
+
reposted_id,
+
)
+
+
def delete_repost(self, service: str, user: str, repost_id: str):
+
self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
+
+
+
class InputService(ABC, Service):
+
outputs: list[OutputService]
+
submitter: Callable[[Callable[[], None]], None]
+
+
@abstractmethod
+
async def listen(self):
+
pass
+23
cross/tokens.py
···
+
from dataclasses import dataclass
+
+
@dataclass(kw_only=True)
+
class Token:
+
pass
+
+
@dataclass(kw_only=True)
+
class TextToken(Token):
+
text: str
+
+
@dataclass(kw_only=True)
+
class LinkToken(Token):
+
href: str
+
label: str | None = None
+
+
@dataclass(kw_only=True)
+
class TagToken(Token):
+
tag: str
+
+
@dataclass(kw_only=True)
+
class MentionToken(Token):
+
username: str
+
uri: str | None = None
+20 -2
database/connection.py
···
+
import sqlite3
+
import threading
from pathlib import Path
-
import sqlite3
+
+
+
class DatabasePool:
+
def __init__(self, db: Path) -> None:
+
self.db: Path = db
+
self._local: threading.local = threading.local()
+
self._conns: list[sqlite3.Connection] = []
+
+
def get_conn(self) -> sqlite3.Connection:
+
if getattr(self._local, 'conn', None) is None:
+
self._local.conn = get_conn(self.db)
+
self._conns.append(self._local.conn)
+
return self._local.conn
+
+
def close(self):
+
for c in self._conns:
+
c.close()
def get_conn(db: Path) -> sqlite3.Connection:
-
conn = sqlite3.connect(db, autocommit=True)
+
conn = sqlite3.connect(db, autocommit=True, check_same_thread=False)
conn.row_factory = sqlite3.Row
_ = conn.executescript("""
PRAGMA journal_mode = WAL;
+13 -27
database/migrations.py
···
import sqlite3
from pathlib import Path
+
from typing import Callable
-
from util.util import LOGGER
from database.connection import get_conn
-
+
from util.util import LOGGER
class DatabaseMigrator:
def __init__(self, db_path: Path, migrations_folder: Path) -> None:
self.db_path: Path = db_path
self.migrations_folder: Path = migrations_folder
self.conn: sqlite3.Connection = get_conn(db_path)
+
_ = self.conn.execute("PRAGMA foreign_keys = OFF;")
+
self.conn.autocommit = False
def close(self):
self.conn.close()
···
_ = cursor.execute(f"PRAGMA user_version = {version}")
self.conn.commit()
-
def get_migrations(self) -> list[tuple[int, Path]]:
-
if not self.migrations_folder.exists():
-
return []
-
-
files: list[tuple[int, Path]] = []
-
for f in self.migrations_folder.glob("*.sql"):
-
try:
-
version = int(f.stem.split("_")[0])
-
files.append((version, f))
-
except (ValueError, IndexError):
-
LOGGER.warning("Warning: Skipping invalid migration file: %", f.name)
-
-
return sorted(files, key=lambda x: x[0])
-
-
def apply_migration(self, version: int, path: Path):
-
with open(path, "r") as f:
-
sql = f.read()
-
-
cursor = self.conn.cursor()
+
def apply_migration(self, version: int, filename: str, migration: Callable[[sqlite3.Connection], None]):
try:
-
_ = cursor.executescript(sql)
+
_ = migration(self.conn)
self.set_version(version)
-
LOGGER.info("Applied migration: %s", path.name)
+
self.conn.commit()
+
LOGGER.info("Applied migration: %s..", filename)
except sqlite3.Error as e:
self.conn.rollback()
-
raise Exception(f"Error applying migration {version}: {e}")
+
raise Exception(f"Error applying migration {filename}: {e}")
def migrate(self):
current_version = self.get_version()
-
migrations = self.get_migrations()
+
from migrations._registry import load_migrations
+
migrations = load_migrations(self.migrations_folder)
if not migrations:
LOGGER.warning("No migration files found.")
···
LOGGER.info("No pending migrations.")
return
-
for version, filepath in pending:
-
self.apply_migration(version, filepath)
+
for version, filename, migration in pending:
+
self.apply_migration(version, filename, migration)
+11 -2
env.py
···
import os
+
from pathlib import Path
-
DATA_DIR = os.environ.get('DATA_DIR') or "./data"
-
MIGRATIONS_DIR = os.environ.get('MIGRATIONS_DIR') or "./migrations"
+
DEV = bool(os.environ.get("DEV")) or False
+
+
DATA_DIR = Path(os.environ.get("DATA_DIR") or "./data")
+
CACHE_DIR = Path(os.environ.get("CACHE_DIR") or DATA_DIR.joinpath("cache"))
+
SETTINGS_DIR = Path(os.environ.get("SETTINGS_DIR") or DATA_DIR.joinpath("settings.json"))
+
DATABASE_DIR = Path(os.environ.get("DATABASE_DIR") or DATA_DIR.joinpath("data.db"))
+
+
MIGRATIONS_DIR = Path(os.environ.get("MIGRATIONS_DIR") or "./migrations")
+
+
PLC_HOST = os.environ.get("PLC_HOST") or "https://plc.directory"
+40 -18
main.py
···
+
import asyncio
+
import json
import queue
import threading
from pathlib import Path
-
from time import sleep
from typing import Callable
+
from database.connection import DatabasePool
import env
from database.migrations import DatabaseMigrator
-
from util.util import LOGGER
+
from registry import create_input_service, create_output_service
+
from registry_bootstrap import bootstrap
+
from util.util import LOGGER, read_env, shutdown_hook
def main() -> None:
-
data = Path(env.DATA_DIR)
+
if not env.DATA_DIR.exists():
+
env.DATA_DIR.mkdir(parents=True)
-
if not data.exists():
-
data.mkdir(parents=True)
-
-
settings = data.joinpath("settings.json")
-
database = data.joinpath("db.sqlite")
-
-
if not settings.exists():
-
LOGGER.info("First launch detected! Creating %s and exiting!", settings)
+
if not env.SETTINGS_DIR.exists():
+
LOGGER.info("First launch detected! Creating %s and exiting!", env.SETTINGS_DIR)
return
-
LOGGER.info("Loading settings...")
-
# TODO
-
-
migrator = DatabaseMigrator(database, Path(env.MIGRATIONS_DIR))
+
migrator = DatabaseMigrator(env.DATABASE_DIR, env.MIGRATIONS_DIR)
try:
migrator.migrate()
except Exception:
LOGGER.exception("Failed to migrate database!")
+
return
finally:
migrator.close()
+
db_pool = DatabasePool(env.DATABASE_DIR)
+
+
LOGGER.info("Bootstrapping registries...")
+
bootstrap()
+
+
LOGGER.info("Loading settings...")
+
+
with open(env.SETTINGS_DIR) as f:
+
settings = json.load(f)
+
read_env(settings)
+
+
if "input" not in settings:
+
raise KeyError("No `input` sepcified in settings!")
+
if "outputs" not in settings:
+
raise KeyError("No `outputs` spicified in settings!")
+
+
input = create_input_service(db_pool, settings["input"])
+
outputs = [create_output_service(db_pool, data) for data in settings["outputs"]]
+
LOGGER.info("Starting task worker...")
def worker(task_queue: queue.Queue[Callable[[], None] | None]):
···
thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
thread.start()
-
LOGGER.info("Connecting to %s...", 'TODO') # TODO
+
LOGGER.info("Connecting to %s...", input.url)
+
input.outputs = outputs
+
input.submitter = lambda c: task_queue.put(c)
try:
-
task_queue.put(lambda: print("hi"))
-
sleep(10) # TODO
+
asyncio.run(input.listen())
except KeyboardInterrupt:
LOGGER.info("Stopping...")
task_queue.join()
task_queue.put(None)
thread.join()
+
db_pool.close()
+
+
for shook in shutdown_hook:
+
shook()
+
if __name__ == "__main__":
main()
+109
mastodon/info.py
···
+
from abc import ABC, abstractmethod
+
from dataclasses import dataclass
+
from typing import Any
+
+
import requests
+
+
from cross.service import Service
+
from util.util import normalize_service_url
+
+
+
def validate_and_transform(data: dict[str, Any]):
+
if "token" not in data or "instance" not in data:
+
raise KeyError("Missing required values 'token' or 'instance'")
+
+
data["instance"] = normalize_service_url(data["instance"])
+
+
+
@dataclass(kw_only=True)
+
class InstanceInfo:
+
max_characters: int = 500
+
max_media_attachments: int = 4
+
characters_reserved_per_url: int = 23
+
+
image_size_limit: int = 16777216
+
video_size_limit: int = 103809024
+
+
text_format: str = "text/plain"
+
+
@classmethod
+
def from_api(cls, data: dict[str, Any]) -> "InstanceInfo":
+
config: dict[str, Any] = {}
+
+
if "statuses" in data:
+
statuses_config: dict[str, Any] = data.get("statuses", {})
+
if "max_characters" in statuses_config:
+
config["max_characters"] = statuses_config["max_characters"]
+
if "max_media_attachments" in statuses_config:
+
config["max_media_attachments"] = statuses_config[
+
"max_media_attachments"
+
]
+
if "characters_reserved_per_url" in statuses_config:
+
config["characters_reserved_per_url"] = statuses_config[
+
"characters_reserved_per_url"
+
]
+
+
# glitch content type
+
if "supported_mime_types" in statuses_config:
+
text_mimes: list[str] = statuses_config["supported_mime_types"]
+
+
if "text/x.misskeymarkdown" in text_mimes:
+
config["text_format"] = "text/x.misskeymarkdown"
+
elif "text/markdown" in text_mimes:
+
config["text_format"] = "text/markdown"
+
+
if "media_attachments" in data:
+
media_config: dict[str, Any] = data["media_attachments"]
+
if "image_size_limit" in media_config:
+
config["image_size_limit"] = media_config["image_size_limit"]
+
if "video_size_limit" in media_config:
+
config["video_size_limit"] = media_config["video_size_limit"]
+
+
# *oma extensions
+
if "max_toot_chars" in data:
+
config["max_characters"] = data["max_toot_chars"]
+
if "upload_limit" in data:
+
config["image_size_limit"] = data["upload_limit"]
+
config["video_size_limit"] = data["upload_limit"]
+
+
if "pleroma" in data:
+
pleroma: dict[str, Any] = data["pleroma"]
+
if "metadata" in pleroma:
+
metadata: dict[str, Any] = pleroma["metadata"]
+
if "post_formats" in metadata:
+
post_formats: list[str] = metadata["post_formats"]
+
+
if "text/x.misskeymarkdown" in post_formats:
+
config["text_format"] = "text/x.misskeymarkdown"
+
elif "text/markdown" in post_formats:
+
config["text_format"] = "text/markdown"
+
+
return InstanceInfo(**config)
+
+
+
class MastodonService(ABC, Service):
+
def verify_credentials(self):
+
token = self._get_token()
+
response = requests.get(
+
f"{self.url}/api/v1/accounts/verify_credentials",
+
headers={"Authorization": f"Bearer {token}"},
+
)
+
if response.status_code != 200:
+
self.log.error("Failed to validate user credentials!")
+
response.raise_for_status()
+
return dict(response.json())
+
+
def fetch_instance_info(self):
+
token = self._get_token()
+
responce = requests.get(
+
f"{self.url}/api/v1/instance",
+
headers={"Authorization": f"Bearer {token}"},
+
)
+
if responce.status_code != 200:
+
self.log.error("Failed to get instance info!")
+
responce.raise_for_status()
+
return dict(responce.json())
+
+
@abstractmethod
+
def _get_token(self) -> str:
+
pass
+232
mastodon/input.py
···
+
import asyncio
+
import json
+
import re
+
from dataclasses import dataclass, field
+
from typing import Any, cast, override
+
+
import websockets
+
+
from cross.attachments import (
+
LabelsAttachment,
+
LanguagesAttachment,
+
MediaAttachment,
+
QuoteAttachment,
+
RemoteUrlAttachment,
+
SensitiveAttachment,
+
)
+
from cross.media import Blob, download_blob
+
from cross.post import Post
+
from cross.service import InputService
+
from database.connection import DatabasePool
+
from mastodon.info import MastodonService, validate_and_transform
+
from mastodon.parser import StatusParser
+
+
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
+
+
+
@dataclass(kw_only=True)
+
class MastodonInputOptions:
+
token: str
+
instance: str
+
allowed_visibility: list[str] = field(
+
default_factory=lambda: ALLOWED_VISIBILITY.copy()
+
)
+
filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "MastodonInputOptions":
+
validate_and_transform(data)
+
+
if "allowed_visibility" in data:
+
for vis in data.get("allowed_visibility", []):
+
if vis not in ALLOWED_VISIBILITY:
+
raise ValueError(f"Invalid visibility option {vis}!")
+
+
if "filters" in data:
+
data["filters"] = [re.compile(r) for r in data["filters"]]
+
+
return MastodonInputOptions(**data)
+
+
+
class MastodonInputService(MastodonService, InputService):
+
def __init__(self, db: DatabasePool, options: MastodonInputOptions) -> None:
+
super().__init__(options.instance, db)
+
self.options: MastodonInputOptions = options
+
+
self.log.info("Verifying %s credentails...", self.url)
+
response = self.verify_credentials()
+
self.user_id: str = response["id"]
+
+
self.log.info("Getting %s configuration...", self.url)
+
response = self.fetch_instance_info()
+
self.streaming_url: str = response["urls"]["streaming_api"]
+
+
@override
+
def _get_token(self) -> str:
+
return self.options.token
+
+
def _on_create_post(self, status: dict[str, Any]):
+
if status["account"]["id"] != self.user_id:
+
return
+
+
if status["visibility"] not in self.options.allowed_visibility:
+
return
+
+
reblog: dict[str, Any] | None = status.get("reblog")
+
if reblog:
+
if reblog["account"]["id"] != self.user_id:
+
return
+
self._on_reblog(status, reblog)
+
return
+
+
if status.get("poll"):
+
self.log.info("Skipping '%s'! Contains a poll..", status["id"])
+
return
+
+
quote: dict[str, Any] | None = status.get("quote")
+
if quote:
+
quote = quote['quoted_status'] if quote.get('quoted_status') else quote
+
if not quote or quote["account"]["id"] != self.user_id:
+
return
+
+
rquote = self._get_post(self.url, self.user_id, quote['id'])
+
if not rquote:
+
self.log.info(
+
"Skipping %s, parent %s not found in db", status["id"], quote['id']
+
)
+
return
+
+
in_reply: str | None = status.get("in_reply_to_id")
+
in_reply_to: str | None = status.get("in_reply_to_account_id")
+
if in_reply_to and in_reply_to != self.user_id:
+
return
+
+
parent = None
+
if in_reply:
+
parent = self._get_post(self.url, self.user_id, in_reply)
+
if not parent:
+
self.log.info(
+
"Skipping %s, parent %s not found in db", status["id"], in_reply
+
)
+
return
+
parser = StatusParser(status)
+
parser.feed(status["content"])
+
tokens = parser.get_result()
+
+
post = Post(id=status["id"], parent_id=in_reply, tokens=tokens)
+
+
if quote:
+
post.attachments.put(QuoteAttachment(quoted_id=quote['id'], quoted_user=self.user_id))
+
if status.get("url"):
+
post.attachments.put(RemoteUrlAttachment(url=status["url"]))
+
if status.get("sensitive"):
+
post.attachments.put(SensitiveAttachment(sensitive=True))
+
if status.get("language"):
+
post.attachments.put(LanguagesAttachment(langs=[status["language"]]))
+
if status.get("spoiler"):
+
post.attachments.put(LabelsAttachment(labels=[status["spoiler"]]))
+
+
blobs: list[Blob] = []
+
for media in status.get("media_attachments", []):
+
self.log.info("Downloading %s...", media["url"])
+
blob: Blob | None = download_blob(media["url"], media.get("alt"))
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download media %s.",
+
status["id"],
+
media["url"],
+
)
+
return
+
blobs.append(blob)
+
+
if blobs:
+
post.attachments.put(MediaAttachment(blobs=blobs))
+
+
if parent:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": status["id"],
+
"parent": parent["id"],
+
"root": parent["id"] if not parent["root"] else parent["root"],
+
}
+
)
+
else:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": status["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_post(post))
+
+
def _on_reblog(self, status: dict[str, Any], reblog: dict[str, Any]):
+
reposted = self._get_post(self.url, self.user_id, reblog["id"])
+
if not reposted:
+
self.log.info(
+
"Skipping repost '%s' as reposted post '%s' was not found in the db.",
+
status["id"],
+
reblog["id"],
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": status["id"],
+
"reposted": reposted["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_repost(status["id"], reblog["id"]))
+
+
def _on_delete_post(self, status_id: str):
+
post = self._get_post(self.url, self.user_id, status_id)
+
if not post:
+
return
+
+
if post["reposted_id"]:
+
for output in self.outputs:
+
self.submitter(lambda: output.delete_repost(status_id))
+
else:
+
for output in self.outputs:
+
self.submitter(lambda: output.delete_post(status_id))
+
self._delete_post_by_id(post["id"])
+
+
def _accept_msg(self, msg: websockets.Data) -> None:
+
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
+
event: str = cast(str, data["event"])
+
payload: str = cast(str, data["payload"])
+
+
if event == "update":
+
self._on_create_post(json.loads(payload))
+
elif event == "delete":
+
self._on_delete_post(payload)
+
+
@override
+
async def listen(self):
+
url = f"{self.streaming_url}/api/v1/streaming?stream=user"
+
+
async for ws in websockets.connect(
+
url, additional_headers={"Authorization": f"Bearer {self.options.token}"}
+
):
+
try:
+
self.log.info("Listening to %s...", self.streaming_url)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
self.submitter(lambda: self._accept_msg(msg))
+
+
listen = asyncio.create_task(listen_for_messages())
+
+
_ = await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
self.log.error(e, stack_info=True, exc_info=True)
+
self.log.info("Reconnecting to %s...", self.streaming_url)
+
continue
+178
mastodon/output.py
···
+
from dataclasses import dataclass
+
from typing import Any, override
+
+
import requests
+
+
from cross.attachments import (
+
LanguagesAttachment,
+
QuoteAttachment,
+
RemoteUrlAttachment,
+
SensitiveAttachment,
+
)
+
from cross.post import Post
+
from cross.service import OutputService
+
from database.connection import DatabasePool
+
from mastodon.info import InstanceInfo, MastodonService, validate_and_transform
+
+
ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
+
+
+
@dataclass(kw_only=True)
+
class MastodonOutputOptions:
+
token: str
+
instance: str
+
visibility: str = "public"
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "MastodonOutputOptions":
+
validate_and_transform(data)
+
+
if "visibility" in data:
+
if data["visibility"] not in ALLOWED_POSTING_VISIBILITY:
+
raise ValueError(f"Invalid visibility option {data['visibility']}!")
+
+
return MastodonOutputOptions(**data)
+
+
+
# TODO
+
class MastodonOutputService(MastodonService, OutputService):
+
def __init__(self, db: DatabasePool, options: MastodonOutputOptions) -> None:
+
super().__init__(options.instance, db)
+
self.options: MastodonOutputOptions = options
+
+
self.log.info("Verifying %s credentails...", self.url)
+
response = self.verify_credentials()
+
self.user_id: str = response["id"]
+
+
self.log.info("Getting %s configuration...", self.url)
+
response = self.fetch_instance_info()
+
self.instance_info: InstanceInfo = InstanceInfo.from_api(response)
+
+
def accept_post(self, service: str, user: str, post: Post):
+
new_root_id: int | None = None
+
new_parent_id: int | None = None
+
+
reply_ref: str | None = None
+
if post.parent_id:
+
thread = self._find_mapped_thread(
+
post.parent_id, service, user, self.url, self.user_id
+
)
+
+
if not thread:
+
self.log.error("Failed to find thread tuple in the database!")
+
return
+
_, reply_ref, new_root_id, new_parent_id = thread
+
+
quote = post.attachments.get(QuoteAttachment)
+
if quote:
+
if quote.quoted_user != user:
+
self.log.info("Quoted other user, skipping!")
+
return
+
+
quoted_post = self._get_post(service, user, quote.quoted_id)
+
if not quoted_post:
+
self.log.error("Failed to find quoted post in the database!")
+
return
+
+
quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.user_id)
+
if not quoted_mappings:
+
self.log.error("Failed to find mappings for quoted post!")
+
return
+
+
quoted_local_id = quoted_mappings[-1][0]
+
# TODO resolve service identifier
+
+
post_tokens = post.tokens.copy()
+
+
remote_url = post.attachments.get(RemoteUrlAttachment)
+
if remote_url and remote_url.url and post.text_type == "text/x.misskeymarkdown":
+
# TODO stip mfm
+
pass
+
+
raw_statuses = [] # TODO split tokens and media across posts
+
if not raw_statuses:
+
self.log.error("Failed to split post into statuses!")
+
return
+
+
langs = post.attachments.get(LanguagesAttachment)
+
sensitive = post.attachments.get(SensitiveAttachment)
+
+
if langs and langs.langs:
+
pass # TODO
+
+
if sensitive and sensitive.sensitive:
+
pass # TODO
+
+
def delete_post(self, service: str, user: str, post_id: str):
+
post = self._get_post(service, user, post_id)
+
if not post:
+
self.log.info("Post not found in db, skipping delete..")
+
return
+
+
mappings = self._get_mappings(post["id"], self.url, self.user_id)
+
for mapping in mappings[::-1]:
+
self.log.info("Deleting '%s'...", mapping["identifier"])
+
requests.delete(
+
f"{self.url}/api/v1/statuses/{mapping['identifier']}",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
)
+
self._delete_post_by_id(mapping["id"])
+
+
def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
+
reposted = self._get_post(service, user, reposted_id)
+
if not reposted:
+
self.log.info("Post not found in db, skipping repost..")
+
return
+
+
mappings = self._get_mappings(reposted["id"], self.url, self.user_id)
+
if mappings:
+
rsp = requests.post(
+
f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
)
+
+
if rsp.status_code != 200:
+
self.log.error(
+
"Failed to boost status! status_code: %s, msg: %s",
+
rsp.status_code,
+
rsp.content,
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": rsp.json()["id"],
+
"reposted": mappings[0]["id"],
+
}
+
)
+
inserted = self._get_post(self.url, self.user_id, rsp.json()["id"])
+
if not inserted:
+
raise ValueError("Inserted post not found!")
+
self._insert_post_mapping(reposted["id"], inserted["id"])
+
+
def delete_repost(self, service: str, user: str, repost_id: str):
+
repost = self._get_post(service, user, repost_id)
+
if not repost:
+
self.log.info("Repost not found in db, skipping delete..")
+
return
+
+
mappings = self._get_mappings(repost["id"], self.url, self.user_id)
+
rmappings = self._get_mappings(repost["reposted"], self.url, self.user_id)
+
+
if mappings and rmappings:
+
self.log.info(
+
"Removing '%s' Repost of '%s'...",
+
mappings[0]["identifier"],
+
rmappings[0]["identifier"],
+
)
+
requests.post(
+
f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
)
+
self._delete_post_by_id(mappings[0]["id"])
+
+
@override
+
def _get_token(self) -> str:
+
return self.options.token
+31
mastodon/parser.py
···
+
from typing import Any, override
+
+
from cross.tokens import LinkToken, MentionToken, TagToken
+
from util.html import HTMLToTokensParser
+
+
+
class StatusParser(HTMLToTokensParser):
+
def __init__(self, status: dict[str, Any]) -> None:
+
super().__init__()
+
self.tags: set[str] = set(tag["url"] for tag in status.get("tags", []))
+
self.mentions: set[str] = set(m["url"] for m in status.get("mentions", []))
+
+
@override
+
def handle_a_endtag(self):
+
label, _attr = self._tag_stack.pop("a")
+
+
href = _attr.get("href")
+
if href:
+
cls = _attr.get("class", "")
+
if cls:
+
if "hashtag" in cls and href in self.tags:
+
tag = label[1:] if label.startswith("#") else label
+
+
self.tokens.append(TagToken(tag=tag))
+
return
+
if "mention" in cls and href in self.mentions:
+
username = label[1:] if label.startswith("@") else label
+
+
self.tokens.append(MentionToken(username=username, uri=href))
+
return
+
self.tokens.append(LinkToken(href=href, label=label))
-13
migrations/001_initdb.sql
···
-
CREATE TABLE IF NOT EXISTS posts (
-
id INTEGER PRIMARY KEY AUTOINCREMENT,
-
user_id TEXT NOT NULL,
-
service TEXT NOT NULL,
-
identifier TEXT NOT NULL,
-
parent_id INTEGER NULL REFERENCES posts(id),
-
root_id INTEGER NULL REFERENCES posts(id)
-
);
-
-
CREATE TABLE IF NOT EXISTS mappings (
-
original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
-
mapped_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE
-
);
+21
migrations/001_initdb_v1.py
···
+
import sqlite3
+
+
+
def migrate(conn: sqlite3.Connection):
+
_ = conn.execute("""
+
CREATE TABLE IF NOT EXISTS posts (
+
id INTEGER PRIMARY KEY AUTOINCREMENT,
+
user_id TEXT NOT NULL,
+
service TEXT NOT NULL,
+
identifier TEXT NOT NULL,
+
parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
+
root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
+
);
+
""")
+
_ = conn.execute("""
+
CREATE TABLE IF NOT EXISTS mappings (
+
original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
+
mapped_post_id INTEGER NOT NULL
+
);
+
""")
+
pass
-2
migrations/002_add_reposted_column.sql
···
-
ALTER TABLE posts
-
ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL;
+11
migrations/002_add_reposted_column_v1.py
···
+
import sqlite3
+
+
+
def migrate(conn: sqlite3.Connection):
+
columns = conn.execute("PRAGMA table_info(posts)")
+
column_names = [col[1] for col in columns]
+
if "reposted_id" not in column_names:
+
_ = conn.execute("""
+
ALTER TABLE posts
+
ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
+
""")
-2
migrations/003_add_extra_data.sql
···
-
ALTER TABLE posts
-
ADD COLUMN extra_data TEXT NULL;
+22
migrations/003_add_extra_data_column_v1.py
···
+
import json
+
import sqlite3
+
+
+
def migrate(conn: sqlite3.Connection):
+
columns = conn.execute("PRAGMA table_info(posts)")
+
column_names = [col[1] for col in columns]
+
if "extra_data" not in column_names:
+
_ = conn.execute("""
+
ALTER TABLE posts
+
ADD COLUMN extra_data TEXT NULL
+
""")
+
+
# migrate old bsky identifiers from json to uri as id and cid in extra_data
+
data = conn.execute("SELECT id, identifier FROM posts WHERE service = 'https://bsky.app';").fetchall()
+
rewrites: list[tuple[str, str, int]] = []
+
for row in data:
+
if row[1][0] == '{' and row[1][-1] == '}':
+
data = json.loads(row[1])
+
rewrites.append((data['uri'], json.dumps({'cid': data['cid']}), row[0]))
+
if rewrites:
+
_ = conn.executemany("UPDATE posts SET identifier = ?, extra_data = ? WHERE id = ?;", rewrites)
+52
migrations/004_initdb_next.py
···
+
import sqlite3
+
+
+
def migrate(conn: sqlite3.Connection):
+
cursor = conn.cursor()
+
+
old_posts = cursor.execute("SELECT * FROM posts;").fetchall()
+
old_mappings = cursor.execute("SELECT * FROM mappings;").fetchall()
+
+
_ = cursor.execute("DROP TABLE posts;")
+
_ = cursor.execute("DROP TABLE mappings;")
+
+
_ = cursor.execute("""
+
CREATE TABLE posts (
+
id INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
+
user TEXT NOT NULL,
+
service TEXT NOT NULL,
+
identifier TEXT NOT NULL,
+
parent INTEGER NULL REFERENCES posts(id),
+
root INTEGER NULL REFERENCES posts(id),
+
reposted INTEGER NULL REFERENCES posts(id),
+
extra_data TEXT NULL
+
);
+
""")
+
+
_ = cursor.execute("""
+
CREATE TABLE mappings (
+
original INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
+
mapped INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
+
UNIQUE(original, mapped)
+
);
+
""")
+
+
for old_post in old_posts:
+
_ = cursor.execute(
+
"""
+
INSERT INTO posts (id, user, service, identifier, parent, root, reposted, extra_data)
+
VALUES (:id, :user_id, :service, :identifier, :parent_id, :root_id, :reposted_id, :extra_data)
+
""",
+
dict(old_post),
+
)
+
+
for mapping in old_mappings:
+
original, mapped = mapping["original_post_id"], mapping["mapped_post_id"]
+
_ = cursor.execute(
+
"INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?)",
+
(original, mapped),
+
)
+
_ = cursor.execute(
+
"INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?)",
+
(mapped, original),
+
)
+12
migrations/005_add_indexes.py
···
+
import sqlite3
+
+
+
def migrate(conn: sqlite3.Connection):
+
_ = conn.execute("""
+
CREATE INDEX IF NOT EXISTS idx_posts_service_user_identifier
+
ON posts (service, user, identifier);
+
""")
+
_ = conn.execute("""
+
CREATE UNIQUE INDEX IF NOT EXISTS ux_mappings_original_mapped
+
ON mappings (original, mapped);
+
""")
+35
migrations/_registry.py
···
+
import importlib.util
+
from pathlib import Path
+
import sqlite3
+
from typing import Callable
+
+
+
def load_migrations(path: Path) -> list[tuple[int, str, Callable[[sqlite3.Connection], None]]]:
+
migrations: list[tuple[int, str, Callable[[sqlite3.Connection], None]]] = []
+
migration_files = sorted(
+
[f for f in path.glob("*.py") if not f.stem.startswith("_")]
+
)
+
+
for filepath in migration_files:
+
filename = filepath.stem
+
version_str = filename.split("_")[0]
+
+
try:
+
version = int(version_str)
+
except ValueError:
+
raise ValueError('migrations must start with a number!!')
+
+
spec = importlib.util.spec_from_file_location(filepath.stem, filepath)
+
if not spec or not spec.loader:
+
raise Exception(f"Failed to load spec from file: {filepath}")
+
+
module = importlib.util.module_from_spec(spec)
+
spec.loader.exec_module(module)
+
+
if hasattr(module, "migrate"):
+
migrations.append((version, filename, module.migrate))
+
else:
+
raise ValueError(f"Migration {filepath.name} missing 'migrate' function")
+
+
migrations.sort(key=lambda x: x[0])
+
return migrations
+22
misskey/info.py
···
+
from abc import ABC, abstractmethod
+
+
import requests
+
+
from cross.service import Service
+
+
+
class MisskeyService(ABC, Service):
+
def verify_credentials(self):
+
response = requests.post(
+
f"{self.url}/api/i",
+
json={"i": self._get_token()},
+
headers={"Content-Type": "application/json"},
+
)
+
if response.status_code != 200:
+
self.log.error("Failed to validate user credentials!")
+
response.raise_for_status()
+
return dict(response.json())
+
+
@abstractmethod
+
def _get_token(self) -> str:
+
pass
+227
misskey/input.py
···
+
import asyncio
+
import json
+
import re
+
import uuid
+
from dataclasses import dataclass, field
+
from typing import Any, cast, override
+
+
import websockets
+
+
from cross.attachments import (
+
LabelsAttachment,
+
MediaAttachment,
+
QuoteAttachment,
+
RemoteUrlAttachment,
+
SensitiveAttachment,
+
)
+
from cross.media import Blob, download_blob
+
from cross.post import Post
+
from cross.service import InputService
+
from database.connection import DatabasePool
+
from misskey.info import MisskeyService
+
from util.markdown import MarkdownParser
+
from util.util import normalize_service_url
+
+
ALLOWED_VISIBILITY = ["public", "home"]
+
+
+
@dataclass
+
class MisskeyInputOptions:
+
token: str
+
instance: str
+
allowed_visibility: list[str] = field(
+
default_factory=lambda: ALLOWED_VISIBILITY.copy()
+
)
+
filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> "MisskeyInputOptions":
+
data["instance"] = normalize_service_url(data["instance"])
+
+
if "allowed_visibility" in data:
+
for vis in data.get("allowed_visibility", []):
+
if vis not in ALLOWED_VISIBILITY:
+
raise ValueError(f"Invalid visibility option {vis}!")
+
+
if "filters" in data:
+
data["filters"] = [re.compile(r) for r in data["filters"]]
+
+
return MisskeyInputOptions(**data)
+
+
+
class MisskeyInputService(MisskeyService, InputService):
+
def __init__(self, db: DatabasePool, options: MisskeyInputOptions) -> None:
+
super().__init__(options.instance, db)
+
self.options: MisskeyInputOptions = options
+
+
self.log.info("Verifying %s credentails...", self.url)
+
response = self.verify_credentials()
+
self.user_id: str = response["id"]
+
+
@override
+
def _get_token(self) -> str:
+
return self.options.token
+
+
def _on_note(self, note: dict[str, Any]):
+
if note["userId"] != self.user_id:
+
return
+
+
if note["visibility"] not in self.options.allowed_visibility:
+
return
+
+
if note.get("poll"):
+
self.log.info("Skipping '%s'! Contains a poll..", note["id"])
+
return
+
+
renote: dict[str, Any] | None = note.get("renote")
+
if renote:
+
if note.get("text") is None:
+
self._on_renote(note, renote)
+
return
+
+
if renote["userId"] != self.user_id:
+
return
+
+
rrenote = self._get_post(self.url, self.user_id, renote["id"])
+
if not rrenote:
+
self.log.info(
+
"Skipping %s, quote %s not found in db", note["id"], renote["id"]
+
)
+
return
+
+
reply: dict[str, Any] | None = note.get("reply")
+
if reply:
+
if reply.get("userId") != self.user_id:
+
self.log.info("Skipping '%s'! Reply to other user..", note["id"])
+
return
+
+
parent = None
+
if reply:
+
parent = self._get_post(self.url, self.user_id, reply["id"])
+
if not parent:
+
self.log.info(
+
"Skipping %s, parent %s not found in db", note["id"], reply["id"]
+
)
+
return
+
+
mention_handles: dict = note.get("mentionHandles") or {}
+
tags: list[str] = note.get("tags") or []
+
+
handles: list[tuple[str, str]] = []
+
for key, value in mention_handles.items():
+
handles.append((value, value))
+
+
parser = MarkdownParser() # TODO MFM parser
+
tokens = parser.parse(note.get("text", ""), tags, handles)
+
post = Post(id=note["id"], parent_id=reply["id"] if reply else None, tokens=tokens)
+
+
post.attachments.put(RemoteUrlAttachment(url=self.url + "/notes/" + note["id"]))
+
if renote:
+
post.attachments.put(QuoteAttachment(quoted_id=renote['id'], quoted_user=self.user_id))
+
if any([a.get("isSensitive", False) for a in note.get("files", [])]):
+
post.attachments.put(SensitiveAttachment(sensitive=True))
+
if note.get("cw"):
+
post.attachments.put(LabelsAttachment(labels=[note["cw"]]))
+
+
blobs: list[Blob] = []
+
for media in note.get("files", []):
+
self.log.info("Downloading %s...", media["url"])
+
blob: Blob | None = download_blob(media["url"], media.get("comment", ""))
+
if not blob:
+
self.log.error(
+
"Skipping %s! Failed to download media %s.",
+
note["id"],
+
media["url"],
+
)
+
return
+
blobs.append(blob)
+
+
if blobs:
+
post.attachments.put(MediaAttachment(blobs=blobs))
+
+
if parent:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": note["id"],
+
"parent": parent["id"],
+
"root": parent["id"] if not parent["root"] else parent["root"],
+
}
+
)
+
else:
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": note["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_post(post))
+
+
def _on_renote(self, note: dict[str, Any], renote: dict[str, Any]):
+
reposted = self._get_post(self.url, self.user_id, renote["id"])
+
if not reposted:
+
self.log.info(
+
"Skipping repost '%s' as reposted post '%s' was not found in the db.",
+
note["id"],
+
renote["id"],
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": note["id"],
+
"reposted": reposted["id"],
+
}
+
)
+
+
for out in self.outputs:
+
self.submitter(lambda: out.accept_repost(note["id"], renote["id"]))
+
+
def _accept_msg(self, msg: websockets.Data) -> None:
+
data: dict[str, Any] = cast(dict[str, Any], json.loads(msg))
+
+
if data["type"] == "channel":
+
type: str = cast(str, data["body"]["type"])
+
if type == "note" or type == "reply":
+
note_body = data["body"]["body"]
+
self._on_note(note_body)
+
return
+
+
async def _subscribe_to_home(self, ws: websockets.ClientConnection) -> None:
+
await ws.send(
+
json.dumps(
+
{
+
"type": "connect",
+
"body": {"channel": "homeTimeline", "id": str(uuid.uuid4())},
+
}
+
)
+
)
+
self.log.info("Subscribed to 'homeTimeline' channel...")
+
+
@override
+
async def listen(self):
+
streaming: str = f"{'wss' if self.url.startswith('https') else 'ws'}://{self.url.split('://', 1)[1]}"
+
url: str = f"{streaming}/streaming?i={self.options.token}"
+
+
async for ws in websockets.connect(url):
+
try:
+
self.log.info("Listening to %s...", streaming)
+
await self._subscribe_to_home(ws)
+
+
async def listen_for_messages():
+
async for msg in ws:
+
self.submitter(lambda: self._accept_msg(msg))
+
+
listen = asyncio.create_task(listen_for_messages())
+
+
_ = await asyncio.gather(listen)
+
except websockets.ConnectionClosedError as e:
+
self.log.error(e, stack_info=True, exc_info=True)
+
self.log.info("Reconnecting to %s...", streaming)
+
continue
+10
pyproject.toml
···
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
+
"dnspython>=2.8.0",
+
"grapheme>=0.6.0",
"python-magic>=0.4.27",
"requests>=2.32.5",
"websockets>=15.0.1",
]
+
+
[dependency-groups]
+
dev = [
+
"pytest>=8.4.2",
+
]
+
+
[tool.pytest.ini_options]
+
pythonpath = ["."]
+32
registry.py
···
+
from pathlib import Path
+
from typing import Any, Callable
+
+
from cross.service import InputService, OutputService
+
from database.connection import DatabasePool
+
+
input_factories: dict[str, Callable[[DatabasePool, dict[str, Any]], InputService]] = {}
+
output_factories: dict[str, Callable[[DatabasePool, dict[str, Any]], OutputService]] = {}
+
+
+
def create_input_service(db: DatabasePool, data: dict[str, Any]) -> InputService:
+
if "type" not in data:
+
raise ValueError("No `type` field in input data!")
+
type: str = str(data["type"])
+
del data["type"]
+
+
factory = input_factories.get(type)
+
if not factory:
+
raise KeyError(f"No such input service {type}!")
+
return factory(db, data)
+
+
+
def create_output_service(db: DatabasePool, data: dict[str, Any]) -> OutputService:
+
if "type" not in data:
+
raise ValueError("No `type` field in input data!")
+
type: str = str(data["type"])
+
del data["type"]
+
+
factory = output_factories.get(type)
+
if not factory:
+
raise KeyError(f"No such output service {type}!")
+
return factory(db, data)
+33
registry_bootstrap.py
···
+
from typing import Any
+
+
from database.connection import DatabasePool
+
from registry import input_factories, output_factories
+
+
+
class LazyFactory:
+
def __init__(self, module_path: str, class_name: str, options_class_name: str):
+
self.module_path: str = module_path
+
self.class_name: str = class_name
+
self.options_class_name: str = options_class_name
+
+
def __call__(self, db: DatabasePool, d: dict[str, Any]):
+
module = __import__(
+
self.module_path, fromlist=[self.class_name, self.options_class_name]
+
)
+
service_class = getattr(module, self.class_name)
+
options_class = getattr(module, self.options_class_name)
+
return service_class(db, options_class.from_dict(d))
+
+
def bootstrap():
+
input_factories["mastodon-wss"] = LazyFactory(
+
"mastodon.input", "MastodonInputService", "MastodonInputOptions"
+
)
+
input_factories["misskey-wss"] = LazyFactory(
+
"misskey.input", "MisskeyInputService", "MisskeyInputOptions"
+
)
+
input_factories["bluesky-jetstream"] = LazyFactory(
+
"bluesky.input", "BlueskyJetstreamInputService", "BlueskyJetstreamInputOptions"
+
)
+
output_factories['stderr'] = LazyFactory(
+
"util.dummy", "StderrOutputService", "DummyOptions"
+
)
+61
tests/util/util_test.py
···
+
import util.util as u
+
from unittest.mock import patch
+
import pytest
+
+
+
def test_normalize_service_url_http():
+
assert u.normalize_service_url("http://example.com") == "http://example.com"
+
assert u.normalize_service_url("http://example.com/") == "http://example.com"
+
+
+
def test_normalize_service_url_invalid_schemes():
+
with pytest.raises(ValueError, match="Invalid service url"):
+
_ = u.normalize_service_url("ftp://example.com")
+
with pytest.raises(ValueError, match="Invalid service url"):
+
_ = u.normalize_service_url("example.com")
+
with pytest.raises(ValueError, match="Invalid service url"):
+
_ = u.normalize_service_url("//example.com")
+
+
+
def test_read_env_missing_env_var():
+
data = {"token": "env:MISSING_VAR", "keep": "value"}
+
with patch.dict("os.environ", {}, clear=True):
+
u.read_env(data)
+
assert data == {"keep": "value"}
+
assert "token" not in data
+
+
+
def test_read_env_no_env_prefix():
+
data = {"token": "literal_value", "number": 123}
+
u.read_env(data)
+
assert data == {"token": "literal_value", "number": 123}
+
+
+
def test_read_env_deeply_nested():
+
data = {"level1": {"level2": {"token": "env:DEEP_TOKEN"}}}
+
with patch.dict("os.environ", {"DEEP_TOKEN": "deep_secret"}):
+
u.read_env(data)
+
assert data["level1"]["level2"]["token"] == "deep_secret"
+
+
+
def test_read_env_mixed_types():
+
data = {
+
"string": "env:TOKEN",
+
"number": 42,
+
"list": [1, 2, 3],
+
"none": None,
+
"bool": True,
+
}
+
with patch.dict("os.environ", {"TOKEN": "secret"}):
+
u.read_env(data)
+
assert data["string"] == "secret"
+
assert data["number"] == 42
+
assert data["list"] == [1, 2, 3]
+
assert data["none"] is None
+
assert data["bool"] is True
+
+
+
def test_read_env_empty_dict():
+
data = {}
+
u.read_env(data)
+
assert data == {}
+49
util/cache.py
···
+
from abc import ABC, abstractmethod
+
from pathlib import Path
+
import time
+
from typing import Generic, TypeVar, override
+
import pickle
+
+
K = TypeVar("K")
+
V = TypeVar("V")
+
+
class Cacheable(ABC):
+
@abstractmethod
+
def dump_cache(self, path: Path):
+
pass
+
+
@abstractmethod
+
def load_cache(self, path: Path):
+
pass
+
+
class TTLCache(Generic[K, V], Cacheable):
+
def __init__(self, ttl_seconds: int = 3600) -> None:
+
self.ttl: int = ttl_seconds
+
self.__cache: dict[K, tuple[V, float]] = {}
+
+
def get(self, key: K) -> V | None:
+
if key in self.__cache:
+
value, timestamp = self.__cache[key]
+
if time.time() - timestamp < self.ttl:
+
return value
+
else:
+
del self.__cache[key]
+
return None
+
+
def set(self, key: K, value: V) -> None:
+
self.__cache[key] = (value, time.time())
+
+
def clear(self) -> None:
+
self.__cache.clear()
+
+
@override
+
def dump_cache(self, path: Path) -> None:
+
path.parent.mkdir(parents=True, exist_ok=True)
+
with open(path, 'wb') as f:
+
pickle.dump(self.__cache, f)
+
+
@override
+
def load_cache(self, path: Path):
+
if path.exists():
+
with open(path, 'rb') as f:
+
self.__cache = pickle.load(f)
+29
util/dummy.py
···
+
from typing import override
+
from cross.post import Post
+
from cross.service import OutputService
+
from database.connection import DatabasePool
+
+
class DummyOptions:
+
@classmethod
+
def from_dict(cls, obj) -> 'DummyOptions':
+
return DummyOptions()
+
+
class StderrOutputService(OutputService):
+
def __init__(self, db: DatabasePool, options: DummyOptions) -> None:
+
super().__init__("http://localhost", db)
+
+
@override
+
def accept_post(self, post: Post):
+
self.log.info("%s", post)
+
+
@override
+
def accept_repost(self, repost_id: str, reposted_id: str):
+
self.log.info("%s, %s", repost_id, reposted_id)
+
+
@override
+
def delete_post(self, post_id: str):
+
self.log.info("%s", post_id)
+
+
@override
+
def delete_repost(self, repost_id: str):
+
self.log.info("%s", repost_id)
+150
util/html.py
···
+
from html.parser import HTMLParser
+
from typing import override
+
+
from cross.tokens import LinkToken, TextToken, Token
+
from util.splitter import canonical_label
+
+
+
class HTMLToTokensParser(HTMLParser):
+
def __init__(self) -> None:
+
super().__init__()
+
self.tokens: list[Token] = []
+
+
self._tag_stack: dict[str, tuple[str, dict[str, str | None]]] = {}
+
self.in_pre: bool = False
+
self.in_code: bool = False
+
self.invisible: bool = False
+
+
def handle_a_endtag(self):
+
label, _attr = self._tag_stack.pop("a")
+
+
href = _attr.get("href")
+
if href:
+
if canonical_label(label, href):
+
self.tokens.append(LinkToken(href=href))
+
else:
+
self.tokens.append(LinkToken(href=href, label=label))
+
+
def append_text(self, text: str):
+
self.tokens.append(TextToken(text=text))
+
+
def append_newline(self):
+
if self.tokens:
+
last_token = self.tokens[-1]
+
if isinstance(last_token, TextToken) and not last_token.text.endswith("\n"):
+
self.tokens.append(TextToken(text="\n"))
+
+
@override
+
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
+
_attr = dict(attrs)
+
+
if self.invisible:
+
return
+
+
match tag:
+
case "p":
+
cls = _attr.get("class", "")
+
if cls and "quote-inline" in cls:
+
self.invisible = True
+
case "a":
+
self._tag_stack["a"] = ("", _attr)
+
case "code":
+
if not self.in_pre:
+
self.append_text("`")
+
self.in_code = True
+
case "pre":
+
self.append_newline()
+
self.append_text("```\n")
+
self.in_pre = True
+
case "blockquote":
+
self.append_newline()
+
self.append_text("> ")
+
case "strong" | "b":
+
self.append_text("**")
+
case "em" | "i":
+
self.append_text("*")
+
case "del" | "s":
+
self.append_text("~~")
+
case "br":
+
self.append_text("\n")
+
case "h1" | "h2" | "h3" | "h4" | "h5" | "h6":
+
level = int(tag[1])
+
self.append_text("\n" + "#" * level + " ")
+
case _:
+
# self.builder.extend(f"<{tag}>".encode("utf-8"))
+
pass
+
+
@override
+
def handle_endtag(self, tag: str) -> None:
+
if self.invisible:
+
if tag == "p":
+
self.invisible = False
+
return
+
+
match tag:
+
case "a":
+
if "a" in self._tag_stack:
+
self.handle_a_endtag()
+
case "code":
+
if not self.in_pre and self.in_code:
+
self.append_text("`")
+
self.in_code = False
+
case "pre":
+
self.append_newline()
+
self.append_text("```\n")
+
self.in_pre = False
+
case "blockquote":
+
self.append_text("\n")
+
case "strong" | "b":
+
self.append_text("**")
+
case "em" | "i":
+
self.append_text("*")
+
case "del" | "s":
+
self.append_text("~~")
+
case "p":
+
self.append_text("\n\n")
+
case "h1" | "h2" | "h3" | "h4" | "h5" | "h6":
+
self.append_text("\n")
+
case _:
+
# self.builder.extend(f"</{tag}>".encode("utf-8"))
+
pass
+
+
@override
+
def handle_data(self, data: str) -> None:
+
if self.invisible:
+
return
+
+
if self._tag_stack.get('a'):
+
label, _attr = self._tag_stack.pop("a")
+
self._tag_stack["a"] = (label + data, _attr)
+
return
+
+
def get_result(self) -> list[Token]:
+
if not self.tokens:
+
return []
+
+
combined: list[Token] = []
+
buffer: list[str] = []
+
+
def flush_buffer():
+
if buffer:
+
merged = "".join(buffer)
+
combined.append(TextToken(text=merged))
+
buffer.clear()
+
+
for token in self.tokens:
+
if isinstance(token, TextToken):
+
buffer.append(token.text)
+
else:
+
flush_buffer()
+
combined.append(token)
+
+
flush_buffer()
+
+
if combined and isinstance(combined[-1], TextToken):
+
if combined[-1].text.endswith("\n\n"):
+
combined[-1] = TextToken(text=combined[-1].text[:-2])
+
+
if combined[-1].text.endswith("\n"):
+
combined[-1] = TextToken(text=combined[-1].text[:-1])
+
return combined
+126
util/markdown.py
···
+
import re
+
+
from cross.tokens import LinkToken, MentionToken, TagToken, TextToken, Token
+
from util.html import HTMLToTokensParser
+
from util.splitter import canonical_label
+
+
URL = re.compile(r"(?:(?:[A-Za-z][A-Za-z0-9+.-]*://)|mailto:)[^\s]+", re.IGNORECASE)
+
MD_INLINE_LINK = re.compile(
+
r"\[([^\]]+)\]\(\s*((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s\)]+)\s*\)",
+
re.IGNORECASE,
+
)
+
MD_AUTOLINK = re.compile(
+
r"<((?:(?:[A-Za-z][A-Za-z0-9+.\-]*://)|mailto:)[^\s>]+)>", re.IGNORECASE
+
)
+
HASHTAG = re.compile(r"(?<!\w)\#([\w]+)")
+
FEDIVERSE_HANDLE = re.compile(r"(?<![\w@])@([\w\.-]+)(?:@([\w\.-]+\.[\w\.-]+))?")
+
+
REGEXES = [URL, MD_INLINE_LINK, MD_AUTOLINK, HASHTAG, FEDIVERSE_HANDLE]
+
+
+
# TODO autolinks are broken by the html parser
+
class MarkdownParser:
+
def parse(
+
self, text: str, tags: list[str], handles: list[tuple[str, str]]
+
) -> list[Token]:
+
if not text:
+
return []
+
+
tokenizer = HTMLToTokensParser()
+
tokenizer.feed(text)
+
html_tokens = tokenizer.get_result()
+
+
tokens: list[Token] = []
+
+
for tk in html_tokens:
+
if isinstance(tk, TextToken):
+
tokens.extend(self.__tokenize_md(tk.text, tags, handles))
+
elif isinstance(tk, LinkToken):
+
if not tk.label or canonical_label(tk.label, tk.href):
+
tokens.append(tk)
+
continue
+
+
tokens.extend(
+
self.__tokenize_md(f"[{tk.label}]({tk.href})", tags, handles)
+
)
+
else:
+
tokens.append(tk)
+
+
return tokens
+
+
def __tokenize_md(
+
self, text: str, tags: list[str], handles: list[tuple[str, str]]
+
) -> list[Token]:
+
index: int = 0
+
total: int = len(text)
+
buffer: list[str] = []
+
+
tokens: list[Token] = []
+
+
def flush():
+
nonlocal buffer
+
if buffer:
+
tokens.append(TextToken(text="".join(buffer)))
+
buffer = []
+
+
while index < total:
+
if text[index] == "[":
+
md_inline = MD_INLINE_LINK.match(text, index)
+
if md_inline:
+
flush()
+
label = md_inline.group(1)
+
href = md_inline.group(2)
+
tokens.append(LinkToken(href=href, label=label))
+
index = md_inline.end()
+
continue
+
+
if text[index] == "<":
+
md_auto = MD_AUTOLINK.match(text, index)
+
if md_auto:
+
flush()
+
href = md_auto.group(1)
+
tokens.append(LinkToken(href=href, label=None))
+
index = md_auto.end()
+
continue
+
+
if text[index] == "#":
+
tag = HASHTAG.match(text, index)
+
if tag:
+
tag_text = tag.group(1)
+
if tag_text.lower() in tags:
+
flush()
+
tokens.append(TagToken(tag=tag_text))
+
index = tag.end()
+
continue
+
+
if text[index] == "@":
+
handle = FEDIVERSE_HANDLE.match(text, index)
+
if handle:
+
handle_text = handle.group(0)
+
stripped_handle = handle_text.strip()
+
+
match = next(
+
(pair for pair in handles if stripped_handle in pair), None
+
)
+
+
if match:
+
flush()
+
tokens.append(
+
MentionToken(username=match[1], uri=None)
+
) # TODO: misskey doesnโ€™t provide a uri
+
index = handle.end()
+
continue
+
+
url = URL.match(text, index)
+
if url:
+
flush()
+
href = url.group(0)
+
tokens.append(LinkToken(href=href, label=None))
+
index = url.end()
+
continue
+
+
buffer.append(text[index])
+
index += 1
+
+
flush()
+
return tokens
+120
util/splitter.py
···
+
import re
+
from dataclasses import replace
+
+
import grapheme
+
+
from cross.tokens import LinkToken, TagToken, TextToken, Token
+
+
+
def canonical_label(label: str | None, href: str):
+
if not label or label == href:
+
return True
+
+
split = href.split("://", 1)
+
if len(split) > 1:
+
if split[1] == label:
+
return True
+
+
return False
+
+
+
ALTERNATE = re.compile(r"\S+|\s+")
+
+
+
def split_tokens(
+
tokens: list[Token],
+
max_chars: int,
+
max_link_len: int = 35,
+
) -> list[list[Token]]:
+
def new_block() -> None:
+
nonlocal blocks, block, length
+
if block:
+
blocks.append(block)
+
block, length = [], 0
+
+
def append_text(text: str) -> None:
+
nonlocal block
+
if block and isinstance(block[-1], TextToken):
+
block[-1] = replace(block[-1], text=block[-1].text + text)
+
else:
+
block.append(TextToken(text=text))
+
+
blocks: list[list[Token]] = []
+
block: list[Token] = []
+
length: int = 0
+
+
for tk in tokens:
+
if isinstance(tk, TagToken):
+
tag_len = 1 + grapheme.length(tk.tag)
+
if length + tag_len > max_chars:
+
new_block()
+
block.append(tk)
+
length += tag_len
+
continue
+
if isinstance(tk, LinkToken):
+
label_text = tk.label or ""
+
link_len = grapheme.length(label_text)
+
+
if canonical_label(tk.label, tk.href):
+
link_len = min(link_len, max_link_len)
+
+
if length + link_len <= max_chars:
+
block.append(tk)
+
length += link_len
+
continue
+
+
if length:
+
new_block()
+
+
remaining = label_text
+
while remaining:
+
room = (
+
max_chars
+
- length
+
- (0 if grapheme.length(remaining) <= max_chars else 1)
+
)
+
chunk = grapheme.slice(remaining, 0, room)
+
if grapheme.length(remaining) > room:
+
chunk += "-"
+
+
block.append(replace(tk, label=chunk))
+
length += grapheme.length(chunk)
+
+
remaining = grapheme.slice(remaining, room, grapheme.length(remaining))
+
if remaining:
+
new_block()
+
continue
+
if isinstance(tk, TextToken):
+
for seg in ALTERNATE.findall(tk.text):
+
seg_len = grapheme.length(seg)
+
+
if length + seg_len <= max_chars - (0 if seg.isspace() else 1):
+
append_text(seg)
+
length += seg_len
+
continue
+
+
if length:
+
new_block()
+
+
if not seg.isspace():
+
while grapheme.length(seg) > max_chars - 1:
+
chunk = grapheme.slice(seg, 0, max_chars - 1) + "-"
+
append_text(chunk)
+
new_block()
+
seg = grapheme.slice(seg, max_chars - 1, grapheme.length(seg))
+
else:
+
while grapheme.length(seg) > max_chars:
+
chunk = grapheme.slice(seg, 0, max_chars)
+
append_text(chunk)
+
new_block()
+
seg = grapheme.slice(seg, max_chars, grapheme.length(seg))
+
+
if seg:
+
append_text(seg)
+
length = grapheme.length(seg)
+
continue
+
block.append(tk)
+
if block:
+
blocks.append(block)
+
+
return blocks
+30 -1
util/util.py
···
import logging
import sys
+
import os
+
from typing import Any, Callable
-
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
+
import env
+
+
shutdown_hook: list[Callable[[], None]] = []
+
+
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG if env.DEV else logging.INFO)
LOGGER = logging.getLogger("XPost")
+
+
def normalize_service_url(url: str) -> str:
+
if not url.startswith("https://") and not url.startswith("http://"):
+
raise ValueError(f"Invalid service url {url}! Only http/https are supported.")
+
+
return url[:-1] if url.endswith('/') else url
+
+
def read_env(data: dict[str, Any]) -> None:
+
keys = list(data.keys())
+
for key in keys:
+
val = data[key]
+
match val:
+
case str():
+
if val.startswith('env:'):
+
envval = os.environ.get(val[4:])
+
if envval is None:
+
del data[key]
+
else:
+
data[key] = envval
+
case dict():
+
read_env(val)
+
case _:
+
pass
+88
uv.lock
···
]
[[package]]
+
name = "colorama"
+
version = "0.4.6"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/d8/53/6f443c9a4a8358a93a6792e2acffb9d9d5cb0a5cfd8802644b7b1c9a02e4/colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44", size = 27697, upload-time = "2022-10-25T02:36:22.414Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
+
]
+
+
[[package]]
+
name = "dnspython"
+
version = "2.8.0"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/8c/8b/57666417c0f90f08bcafa776861060426765fdb422eb10212086fb811d26/dnspython-2.8.0.tar.gz", hash = "sha256:181d3c6996452cb1189c4046c61599b84a5a86e099562ffde77d26984ff26d0f", size = 368251, upload-time = "2025-09-07T18:58:00.022Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/ba/5a/18ad964b0086c6e62e2e7500f7edc89e3faa45033c71c1893d34eed2b2de/dnspython-2.8.0-py3-none-any.whl", hash = "sha256:01d9bbc4a2d76bf0db7c1f729812ded6d912bd318d3b1cf81d30c0f845dbf3af", size = 331094, upload-time = "2025-09-07T18:57:58.071Z" },
+
]
+
+
[[package]]
+
name = "grapheme"
+
version = "0.6.0"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/ce/e7/bbaab0d2a33e07c8278910c1d0d8d4f3781293dfbc70b5c38197159046bf/grapheme-0.6.0.tar.gz", hash = "sha256:44c2b9f21bbe77cfb05835fec230bd435954275267fea1858013b102f8603cca", size = 207306, upload-time = "2020-03-07T17:13:55.492Z" }
+
+
[[package]]
name = "idna"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
···
]
[[package]]
+
name = "iniconfig"
+
version = "2.3.0"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" },
+
]
+
+
[[package]]
+
name = "packaging"
+
version = "25.0"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/a1/d4/1fc4078c65507b51b96ca8f8c3ba19e6a61c8253c72794544580a7b6c24d/packaging-25.0.tar.gz", hash = "sha256:d443872c98d677bf60f6a1f2f8c1cb748e8fe762d2bf9d3148b5599295b0fc4f", size = 165727, upload-time = "2025-04-19T11:48:59.673Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/20/12/38679034af332785aac8774540895e234f4d07f7545804097de4b666afd8/packaging-25.0-py3-none-any.whl", hash = "sha256:29572ef2b1f17581046b3a2227d5c611fb25ec70ca1ba8554b24b0e69331a484", size = 66469, upload-time = "2025-04-19T11:48:57.875Z" },
+
]
+
+
[[package]]
+
name = "pluggy"
+
version = "1.6.0"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
+
]
+
+
[[package]]
+
name = "pygments"
+
version = "2.19.2"
+
source = { registry = "https://pypi.org/simple" }
+
sdist = { url = "https://files.pythonhosted.org/packages/b0/77/a5b8c569bf593b0140bde72ea885a803b82086995367bf2037de0159d924/pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887", size = 4968631, upload-time = "2025-06-21T13:39:12.283Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/c7/21/705964c7812476f378728bdf590ca4b771ec72385c533964653c68e86bdc/pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b", size = 1225217, upload-time = "2025-06-21T13:39:07.939Z" },
+
]
+
+
[[package]]
+
name = "pytest"
+
version = "8.4.2"
+
source = { registry = "https://pypi.org/simple" }
+
dependencies = [
+
{ name = "colorama", marker = "sys_platform == 'win32'" },
+
{ name = "iniconfig" },
+
{ name = "packaging" },
+
{ name = "pluggy" },
+
{ name = "pygments" },
+
]
+
sdist = { url = "https://files.pythonhosted.org/packages/a3/5c/00a0e072241553e1a7496d638deababa67c5058571567b92a7eaa258397c/pytest-8.4.2.tar.gz", hash = "sha256:86c0d0b93306b961d58d62a4db4879f27fe25513d4b969df351abdddb3c30e01", size = 1519618, upload-time = "2025-09-04T14:34:22.711Z" }
+
wheels = [
+
{ url = "https://files.pythonhosted.org/packages/a8/a4/20da314d277121d6534b3a980b29035dcd51e6744bd79075a6ce8fa4eb8d/pytest-8.4.2-py3-none-any.whl", hash = "sha256:872f880de3fc3a5bdc88a11b39c9710c3497a547cfa9320bc3c5e62fbf272e79", size = 365750, upload-time = "2025-09-04T14:34:20.226Z" },
+
]
+
+
[[package]]
name = "python-magic"
version = "0.4.27"
source = { registry = "https://pypi.org/simple" }
···
version = "0.1.0"
source = { virtual = "." }
dependencies = [
+
{ name = "dnspython" },
+
{ name = "grapheme" },
{ name = "python-magic" },
{ name = "requests" },
{ name = "websockets" },
]
+
[package.dev-dependencies]
+
dev = [
+
{ name = "pytest" },
+
]
+
[package.metadata]
requires-dist = [
+
{ name = "dnspython", specifier = ">=2.8.0" },
+
{ name = "grapheme", specifier = ">=0.6.0" },
{ name = "python-magic", specifier = ">=0.4.27" },
{ name = "requests", specifier = ">=2.32.5" },
{ name = "websockets", specifier = ">=15.0.1" },
]
+
+
[package.metadata.requires-dev]
+
dev = [{ name = "pytest", specifier = ">=8.4.2" }]