social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

more work

zenfyr.dev dba88a76 b7230ce7

verified
Changed files
+108 -9
cross
database
mastodon
migrations
+4
cross/attachments.py
···
@dataclass
class RemoteUrlAttachment(Attachment):
url: str
···
@dataclass
class RemoteUrlAttachment(Attachment):
url: str
+
+
@dataclass
+
class QuoteAttachment(Attachment):
+
quoted_id: str
+1 -1
cross/post.py
···
class Post:
id: str
parent_id: str | None
-
text: bytes # utf-8 text bytes
attachments: AttachmentKeeper
fragments: list[Fragment] = field(default_factory=list)
···
class Post:
id: str
parent_id: str | None
+
text: str # utf-8 text
attachments: AttachmentKeeper
fragments: list[Fragment] = field(default_factory=list)
+29 -1
cross/service.py
···
from pathlib import Path
import sqlite3
-
from typing import cast
from database.connection import get_conn
class Service:
def __init__(self, url: str, db: Path) -> None:
···
cursor = self.conn.cursor()
_ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
return cast(sqlite3.Row, cursor.fetchone())
···
from pathlib import Path
import sqlite3
+
from typing import Callable, cast
from database.connection import get_conn
+
from util.util import LOGGER
+
from cross.post import Post
class Service:
def __init__(self, url: str, db: Path) -> None:
···
cursor = self.conn.cursor()
_ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
return cast(sqlite3.Row, cursor.fetchone())
+
+
def close(self):
+
self.conn.close()
+
+
class OutputService(Service):
+
def __init__(self, url: str, db: Path) -> None:
+
super().__init__(url, db)
+
+
def accept_post(self, post: Post):
+
LOGGER.warning("NOT IMPLEMENTED, accept_post %s", post.id)
+
+
def delete_post(self, post_id: str):
+
LOGGER.warning("NOT IMPLEMENTED, delete_post %s", post_id)
+
+
def accept_repost(self, repost_id: str, reposted_id: str):
+
LOGGER.warning("NOT IMPLEMENTED, accept_repost %s", repost_id)
+
+
def delete_repost(self, repost_id: str):
+
LOGGER.warning("NOT IMPLEMENTED, delete_repost %s", repost_id)
+
+
class InputService(Service):
+
def __init__(self, url: str, db: Path) -> None:
+
super().__init__(url, db)
+
+
async def listen(self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None]):
+
pass
+1 -1
database/connection.py
···
import sqlite3
def get_conn(db: Path) -> sqlite3.Connection:
-
conn = sqlite3.connect(db, autocommit=True)
conn.row_factory = sqlite3.Row
_ = conn.executescript("""
PRAGMA journal_mode = WAL;
···
import sqlite3
def get_conn(db: Path) -> sqlite3.Connection:
+
conn = sqlite3.connect(db, autocommit=True, check_same_thread=False)
conn.row_factory = sqlite3.Row
_ = conn.executescript("""
PRAGMA journal_mode = WAL;
+40
mastodon/input.py
···
···
+
import re
+
from dataclasses import dataclass, field
+
from pathlib import Path
+
from typing import Any, Callable, override
+
+
from cross.service import InputService, OutputService
+
from util.util import LOGGER
+
+
ALLOWED_VISIBILITY: list[str] = ["public", "unlisted"]
+
+
@dataclass(kw_only=True)
+
class MastodonInputOptions:
+
token: str
+
instance: str
+
allowed_visibility: list[str] = field(default_factory=lambda: ALLOWED_VISIBILITY.copy())
+
regex_filters: list[re.Pattern[str]] = field(default_factory=lambda: [])
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> 'MastodonInputOptions':
+
data['instance'] = data['instance'][:-1] if data['instance'].endswith("/") else data['instance']
+
+
if 'allowed_visibility' in data:
+
for vis in data.get('allowed_visibility', []):
+
if vis not in ALLOWED_VISIBILITY:
+
raise ValueError(f"Invalid visibility option {vis}!")
+
+
if 'regex_filters' in data:
+
data['regex_filters'] = [re.compile(r) for r in data['regex_filters']]
+
+
return MastodonInputOptions(**data)
+
+
class MastodonInputService(InputService):
+
def __init__(self, db: Path, options: MastodonInputOptions) -> None:
+
super().__init__(options.instance, db)
+
+
LOGGER.info("Verifying %s credentails...", self.url)
+
+
@override
+
async def listen(self, outputs: list[OutputService], submitter: Callable[[Callable[[], None]], None]):
+
return await super().listen(outputs, submitter) # TODO
+27
mastodon/output.py
···
···
+
from dataclasses import dataclass
+
from pathlib import Path
+
from typing import Any
+
from cross.service import OutputService
+
+
ALLOWED_POSTING_VISIBILITY: list[str] = ["public", "unlisted", "private"]
+
+
@dataclass(kw_only=True)
+
class MastodonOutputOptions:
+
token: str
+
instance: str
+
visibility: str = "public"
+
+
@classmethod
+
def from_dict(cls, data: dict[str, Any]) -> 'MastodonOutputOptions':
+
data['instance'] = data['instance'][:-1] if data['instance'].endswith("/") else data['instance']
+
+
if 'visibility' in data:
+
if data['visibility'] not in ALLOWED_POSTING_VISIBILITY:
+
raise ValueError(f"Invalid visibility option {data['visibility']}!")
+
+
return MastodonOutputOptions(**data)
+
+
# TODO
+
class MastodonOutputService(OutputService):
+
def __init__(self, db: Path, options: MastodonOutputOptions) -> None:
+
super().__init__(options.instance, db)
+6 -6
migrations/001_initdb.sql
···
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
-
user_id TEXT NOT NULL,
service TEXT NOT NULL,
identifier TEXT NOT NULL,
-
parent_id INTEGER NULL REFERENCES posts(id),
-
root_id INTEGER NULL REFERENCES posts(id),
-
reposted_id INTEGER NULL REFERENCES posts(id),
extra_data TEXT NULL
);
CREATE TABLE IF NOT EXISTS mappings (
-
original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
-
mapped_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE
);
···
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
+
user TEXT NOT NULL,
service TEXT NOT NULL,
identifier TEXT NOT NULL,
+
parent INTEGER NULL REFERENCES posts(id),
+
root INTEGER NULL REFERENCES posts(id),
+
reposted INTEGER NULL REFERENCES posts(id),
extra_data TEXT NULL
);
CREATE TABLE IF NOT EXISTS mappings (
+
original INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
+
mapped INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE
);