social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

Compare changes

Choose any two refs to compare.

Changed files
+237 -25
cross
mastodon
misskey
+1
cross/post.py
···
id: str
parent_id: str | None
tokens: list[Token]
+
text_type: str = "text/plain"
attachments: AttachmentKeeper = field(default_factory=AttachmentKeeper)
+57 -5
cross/service.py
···
+
import logging
import sqlite3
from abc import ABC, abstractmethod
from typing import Any, Callable, cast
-
import logging
from cross.post import Post
from database.connection import DatabasePool
···
_ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
return cast(sqlite3.Row, cursor.fetchone())
+
def _get_mappings(
+
self, original: int, service: str, user: str
+
) -> list[sqlite3.Row]:
+
cursor = self.db.get_conn().cursor()
+
_ = cursor.execute(
+
"""
+
SELECT *
+
FROM posts AS p
+
JOIN mappings AS m
+
ON p.id = m.mapped
+
WHERE m.original = ?
+
AND p.service = ?
+
AND p.user = ?
+
ORDER BY p.id;
+
""",
+
(original, service, user),
+
)
+
return cursor.fetchall()
+
+
def _find_mapped_thread(
+
self, parent: str, iservice: str, iuser: str, oservice: str, ouser: str
+
):
+
reply_data = self._get_post(iservice, iuser, parent)
+
if not reply_data:
+
return None
+
+
reply_mappings: list[sqlite3.Row] | None = self._get_mappings(
+
reply_data["id"], oservice, ouser
+
)
+
if not reply_mappings:
+
return None
+
+
reply_identifier: sqlite3.Row = reply_mappings[-1]
+
root_identifier: sqlite3.Row = reply_mappings[0]
+
+
if reply_data["root_id"]:
+
root_data = self._get_post_by_id(reply_data["root_id"])
+
if not root_data:
+
return None
+
+
root_mappings = self._get_mappings(reply_data["root_id"], oservice, ouser)
+
if not root_mappings:
+
return None
+
root_identifier = root_mappings[0]
+
+
return (
+
root_identifier[0], # real ids
+
reply_identifier[0],
+
reply_data["root_id"], # db ids
+
reply_data["id"],
+
)
+
def _insert_post(self, post_data: dict[str, Any]):
values = [post_data.get(col) for col in columns]
cursor = self.db.get_conn().cursor()
···
class OutputService(Service):
-
def accept_post(self, post: Post):
+
def accept_post(self, service: str, user: str, post: Post):
self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
-
def delete_post(self, post_id: str):
+
def delete_post(self, service: str, user: str, post_id: str):
self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
-
def accept_repost(self, repost_id: str, reposted_id: str):
+
def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
self.log.warning(
"NOT IMPLEMENTED (%s), accept_repost %s of %s",
self.url,
···
reposted_id,
)
-
def delete_repost(self, repost_id: str):
+
def delete_repost(self, service: str, user: str, repost_id: str):
self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
+31 -6
mastodon/info.py
···
from cross.service import Service
from util.util import normalize_service_url
+
def validate_and_transform(data: dict[str, Any]):
-
if 'token' not in data or 'instance' not in data:
+
if "token" not in data or "instance" not in data:
raise KeyError("Missing required values 'token' or 'instance'")
data["instance"] = normalize_service_url(data["instance"])
+
@dataclass(kw_only=True)
class InstanceInfo:
···
image_size_limit: int = 16777216
video_size_limit: int = 103809024
+
text_format: str = "text/plain"
+
@classmethod
def from_api(cls, data: dict[str, Any]) -> "InstanceInfo":
config: dict[str, Any] = {}
···
"characters_reserved_per_url"
]
+
# glitch content type
+
if "supported_mime_types" in statuses_config:
+
text_mimes: list[str] = statuses_config["supported_mime_types"]
+
+
if "text/x.misskeymarkdown" in text_mimes:
+
config["text_format"] = "text/x.misskeymarkdown"
+
elif "text/markdown" in text_mimes:
+
config["text_format"] = "text/markdown"
+
if "media_attachments" in data:
-
media_config: dict[str, Any] = data.get("media_attachments", {})
+
media_config: dict[str, Any] = data["media_attachments"]
if "image_size_limit" in media_config:
config["image_size_limit"] = media_config["image_size_limit"]
if "video_size_limit" in media_config:
···
config["image_size_limit"] = data["upload_limit"]
config["video_size_limit"] = data["upload_limit"]
+
if "pleroma" in data:
+
pleroma: dict[str, Any] = data["pleroma"]
+
if "metadata" in pleroma:
+
metadata: dict[str, Any] = pleroma["metadata"]
+
if "post_formats" in metadata:
+
post_formats: list[str] = metadata["post_formats"]
+
+
if "text/x.misskeymarkdown" in post_formats:
+
config["text_format"] = "text/x.misskeymarkdown"
+
elif "text/markdown" in post_formats:
+
config["text_format"] = "text/markdown"
+
return InstanceInfo(**config)
class MastodonService(ABC, Service):
def verify_credentials(self):
token = self._get_token()
-
responce = requests.get(
+
response = requests.get(
f"{self.url}/api/v1/accounts/verify_credentials",
headers={"Authorization": f"Bearer {token}"},
)
-
if responce.status_code != 200:
+
if response.status_code != 200:
self.log.error("Failed to validate user credentials!")
-
responce.raise_for_status()
-
return dict(responce.json())
+
response.raise_for_status()
+
return dict(response.json())
def fetch_instance_info(self):
token = self._get_token()
+4 -4
mastodon/input.py
···
self.options: MastodonInputOptions = options
self.log.info("Verifying %s credentails...", self.url)
-
responce = self.verify_credentials()
-
self.user_id: str = responce["id"]
+
response = self.verify_credentials()
+
self.user_id: str = response["id"]
self.log.info("Getting %s configuration...", self.url)
-
responce = self.fetch_instance_info()
-
self.streaming_url: str = responce["urls"]["streaming_api"]
+
response = self.fetch_instance_info()
+
self.streaming_url: str = response["urls"]["streaming_api"]
@override
def _get_token(self) -> str:
+138 -4
mastodon/output.py
···
from dataclasses import dataclass
from typing import Any, override
+
import requests
+
+
from cross.attachments import (
+
LanguagesAttachment,
+
QuoteAttachment,
+
RemoteUrlAttachment,
+
SensitiveAttachment,
+
)
+
from cross.post import Post
from cross.service import OutputService
from database.connection import DatabasePool
from mastodon.info import InstanceInfo, MastodonService, validate_and_transform
···
self.options: MastodonOutputOptions = options
self.log.info("Verifying %s credentails...", self.url)
-
responce = self.verify_credentials()
-
self.user_id: str = responce["id"]
+
response = self.verify_credentials()
+
self.user_id: str = response["id"]
self.log.info("Getting %s configuration...", self.url)
-
responce = self.fetch_instance_info()
-
self.instance_info: InstanceInfo = InstanceInfo.from_api(responce)
+
response = self.fetch_instance_info()
+
self.instance_info: InstanceInfo = InstanceInfo.from_api(response)
+
+
def accept_post(self, service: str, user: str, post: Post):
+
new_root_id: int | None = None
+
new_parent_id: int | None = None
+
+
reply_ref: str | None = None
+
if post.parent_id:
+
thread = self._find_mapped_thread(
+
post.parent_id, service, user, self.url, self.user_id
+
)
+
+
if not thread:
+
self.log.error("Failed to find thread tuple in the database!")
+
return
+
_, reply_ref, new_root_id, new_parent_id = thread
+
+
quote = post.attachments.get(QuoteAttachment)
+
if quote:
+
if quote.quoted_user != user:
+
self.log.info("Quoted other user, skipping!")
+
return
+
+
quoted_post = self._get_post(service, user, quote.quoted_id)
+
if not quoted_post:
+
self.log.error("Failed to find quoted post in the database!")
+
return
+
+
quoted_mappings = self._get_mappings(quoted_post["id"], self.url, self.user_id)
+
if not quoted_mappings:
+
self.log.error("Failed to find mappings for quoted post!")
+
return
+
+
quoted_local_id = quoted_mappings[-1][0]
+
# TODO resolve service identifier
+
+
post_tokens = post.tokens.copy()
+
+
remote_url = post.attachments.get(RemoteUrlAttachment)
+
if remote_url and remote_url.url and post.text_type == "text/x.misskeymarkdown":
+
# TODO stip mfm
+
pass
+
+
raw_statuses = [] # TODO split tokens and media across posts
+
if not raw_statuses:
+
self.log.error("Failed to split post into statuses!")
+
return
+
+
langs = post.attachments.get(LanguagesAttachment)
+
sensitive = post.attachments.get(SensitiveAttachment)
+
+
if langs and langs.langs:
+
pass # TODO
+
+
if sensitive and sensitive.sensitive:
+
pass # TODO
+
+
def delete_post(self, service: str, user: str, post_id: str):
+
post = self._get_post(service, user, post_id)
+
if not post:
+
self.log.info("Post not found in db, skipping delete..")
+
return
+
+
mappings = self._get_mappings(post["id"], self.url, self.user_id)
+
for mapping in mappings[::-1]:
+
self.log.info("Deleting '%s'...", mapping["identifier"])
+
requests.delete(
+
f"{self.url}/api/v1/statuses/{mapping['identifier']}",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
)
+
self._delete_post_by_id(mapping["id"])
+
+
def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
+
reposted = self._get_post(service, user, reposted_id)
+
if not reposted:
+
self.log.info("Post not found in db, skipping repost..")
+
return
+
+
mappings = self._get_mappings(reposted["id"], self.url, self.user_id)
+
if mappings:
+
rsp = requests.post(
+
f"{self.url}/api/v1/statuses/{mappings[0]['identifier']}/reblog",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
)
+
+
if rsp.status_code != 200:
+
self.log.error(
+
"Failed to boost status! status_code: %s, msg: %s",
+
rsp.status_code,
+
rsp.content,
+
)
+
return
+
+
self._insert_post(
+
{
+
"user": self.user_id,
+
"service": self.url,
+
"identifier": rsp.json()["id"],
+
"reposted": mappings[0]["id"],
+
}
+
)
+
inserted = self._get_post(self.url, self.user_id, rsp.json()["id"])
+
if not inserted:
+
raise ValueError("Inserted post not found!")
+
self._insert_post_mapping(reposted["id"], inserted["id"])
+
+
def delete_repost(self, service: str, user: str, repost_id: str):
+
repost = self._get_post(service, user, repost_id)
+
if not repost:
+
self.log.info("Repost not found in db, skipping delete..")
+
return
+
+
mappings = self._get_mappings(repost["id"], self.url, self.user_id)
+
rmappings = self._get_mappings(repost["reposted"], self.url, self.user_id)
+
+
if mappings and rmappings:
+
self.log.info(
+
"Removing '%s' Repost of '%s'...",
+
mappings[0]["identifier"],
+
rmappings[0]["identifier"],
+
)
+
requests.post(
+
f"{self.url}/api/v1/statuses/{rmappings[0]['identifier']}/unreblog",
+
headers={"Authorization": f"Bearer {self._get_token()}"},
+
)
+
self._delete_post_by_id(mappings[0]["id"])
@override
def _get_token(self) -> str:
+4 -4
misskey/info.py
···
class MisskeyService(ABC, Service):
def verify_credentials(self):
-
responce = requests.post(
+
response = requests.post(
f"{self.url}/api/i",
json={"i": self._get_token()},
headers={"Content-Type": "application/json"},
)
-
if responce.status_code != 200:
+
if response.status_code != 200:
self.log.error("Failed to validate user credentials!")
-
responce.raise_for_status()
-
return dict(responce.json())
+
response.raise_for_status()
+
return dict(response.json())
@abstractmethod
def _get_token(self) -> str:
+2 -2
misskey/input.py
···
self.options: MisskeyInputOptions = options
self.log.info("Verifying %s credentails...", self.url)
-
responce = self.verify_credentials()
-
self.user_id: str = responce["id"]
+
response = self.verify_credentials()
+
self.user_id: str = response["id"]
@override
def _get_token(self) -> str: