social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

more work

zenfyr.dev a32d63d2 f7b87959

verified
+22
cross/attachments.py
···
···
+
from dataclasses import dataclass
+
+
+
@dataclass
+
class Attachment():
+
pass
+
+
@dataclass
+
class SpoilerAttachment(Attachment):
+
spoiler: str
+
+
@dataclass
+
class LanguagesAttachment(Attachment):
+
langs: list[str]
+
+
@dataclass
+
class SensitiveAttachment(Attachment):
+
sensitive: bool
+
+
@dataclass
+
class RemoteUrlAttachment(Attachment):
+
url: str
+18
cross/fragments.py
···
···
+
from dataclasses import dataclass
+
+
@dataclass
+
class Fragment:
+
start: int
+
end: int
+
+
@dataclass
+
class LinkFragment(Fragment):
+
url: str
+
+
@dataclass
+
class TagFragment(Fragment):
+
tag: str
+
+
@dataclass
+
class MentionFragment(Fragment):
+
uri: str
+29
cross/post.py
···
···
+
from dataclasses import dataclass, field
+
from cross.attachments import Attachment
+
from cross.fragments import Fragment
+
from typing import TypeVar
+
+
T = TypeVar('T', bound=Attachment)
+
+
class AttachmentKeeper:
+
def __init__(self) -> None:
+
self._map: dict[type, Attachment] = {}
+
+
def put(self, cls: type[T], attachment: T) -> None:
+
self._map[cls] = attachment
+
+
def get(self, cls: type[T]) -> T | None:
+
instance = self._map.get(cls)
+
if instance is None:
+
return None
+
if not isinstance(instance, cls):
+
raise TypeError(f"Expected {cls.__name__}, got {type(instance).__name__}")
+
return instance
+
+
@dataclass
+
class Post:
+
id: str
+
parent_id: str | None
+
text: bytes # utf-8 text bytes
+
attachments: AttachmentKeeper
+
fragments: list[Fragment] = field(default_factory=list)
+29
cross/service.py
···
···
+
from pathlib import Path
+
import sqlite3
+
from typing import cast
+
+
from database.connection import get_conn
+
+
+
class Service:
+
def __init__(self, url: str, db: Path) -> None:
+
self.url: str = url
+
self.conn: sqlite3.Connection = get_conn(db)
+
+
def get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None:
+
cursor = self.conn.cursor()
+
_ = cursor.execute(
+
"""
+
SELECT * FROM posts
+
WHERE service = ?
+
AND user_id = ?
+
AND identifier = ?
+
""",
+
(url, user, identifier),
+
)
+
return cast(sqlite3.Row, cursor.fetchone())
+
+
def get_post_by_id(self, id: int) -> sqlite3.Row | None:
+
cursor = self.conn.cursor()
+
_ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
+
return cast(sqlite3.Row, cursor.fetchone())
+14
database/connection.py
···
···
+
from pathlib import Path
+
import sqlite3
+
+
def get_conn(db: Path) -> sqlite3.Connection:
+
conn = sqlite3.connect(db, autocommit=True)
+
conn.row_factory = sqlite3.Row
+
_ = conn.executescript("""
+
PRAGMA journal_mode = WAL;
+
PRAGMA mmap_size = 134217728;
+
PRAGMA cache_size = 4000;
+
PRAGMA synchronous = NORMAL;
+
PRAGMA foreign_keys = ON;
+
""")
+
return conn
+4 -4
database/migrations.py
···
from pathlib import Path
from util.util import LOGGER
class DatabaseMigrator:
def __init__(self, db_path: Path, migrations_folder: Path) -> None:
self.db_path: Path = db_path
self.migrations_folder: Path = migrations_folder
-
self.conn: sqlite3.Connection = sqlite3.connect(db_path, autocommit=False)
-
self.conn.row_factory = sqlite3.Row
def close(self):
self.conn.close()
···
version = int(f.stem.split("_")[0])
files.append((version, f))
except (ValueError, IndexError):
-
LOGGER.warning(f"Warning: Skipping invalid migration file: {f.name}")
return sorted(files, key=lambda x: x[0])
···
try:
_ = cursor.executescript(sql)
self.set_version(version)
-
LOGGER.info(f"Applied migration: {path.name}")
except sqlite3.Error as e:
self.conn.rollback()
raise Exception(f"Error applying migration {version}: {e}")
···
from pathlib import Path
from util.util import LOGGER
+
from database.connection import get_conn
class DatabaseMigrator:
def __init__(self, db_path: Path, migrations_folder: Path) -> None:
self.db_path: Path = db_path
self.migrations_folder: Path = migrations_folder
+
self.conn: sqlite3.Connection = get_conn(db_path)
def close(self):
self.conn.close()
···
version = int(f.stem.split("_")[0])
files.append((version, f))
except (ValueError, IndexError):
+
LOGGER.warning("Warning: Skipping invalid migration file: %", f.name)
return sorted(files, key=lambda x: x[0])
···
try:
_ = cursor.executescript(sql)
self.set_version(version)
+
LOGGER.info("Applied migration: %s", path.name)
except sqlite3.Error as e:
self.conn.rollback()
raise Exception(f"Error applying migration {version}: {e}")
+4
env.py
···
···
+
import os
+
+
DATA_DIR = os.environ.get('DATA_DIR') or "./data"
+
MIGRATIONS_DIR = os.environ.get('MIGRATIONS_DIR') or "./migrations"
+40 -4
main.py
···
from pathlib import Path
from database.migrations import DatabaseMigrator
from util.util import LOGGER
-
def main(data: Path):
if not data.exists():
data.mkdir(parents=True)
···
if not settings.exists():
LOGGER.info("First launch detected! Creating %s and exiting!", settings)
-
return 0
LOGGER.info("Loading settings...")
# TODO
-
migrator = DatabaseMigrator(database, Path("./migrations"))
try:
migrator.migrate()
except Exception:
···
finally:
migrator.close()
if __name__ == "__main__":
-
main(Path("./data"))
···
+
import queue
+
import threading
from pathlib import Path
+
from time import sleep
+
from typing import Callable
+
import env
from database.migrations import DatabaseMigrator
from util.util import LOGGER
+
def main() -> None:
+
data = Path(env.DATA_DIR)
+
if not data.exists():
data.mkdir(parents=True)
···
if not settings.exists():
LOGGER.info("First launch detected! Creating %s and exiting!", settings)
+
return
LOGGER.info("Loading settings...")
# TODO
+
migrator = DatabaseMigrator(database, Path(env.MIGRATIONS_DIR))
try:
migrator.migrate()
except Exception:
···
finally:
migrator.close()
+
LOGGER.info("Starting task worker...")
+
+
def worker(task_queue: queue.Queue[Callable[[], None] | None]):
+
while True:
+
task = task_queue.get()
+
if task is None:
+
break
+
+
try:
+
task()
+
except Exception:
+
LOGGER.exception("Exception in worker thread!")
+
finally:
+
task_queue.task_done()
+
+
task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue()
+
thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
+
thread.start()
+
+
LOGGER.info("Connecting to %s...", 'TODO') # TODO
+
try:
+
task_queue.put(lambda: print("hi"))
+
sleep(10) # TODO
+
except KeyboardInterrupt:
+
LOGGER.info("Stopping...")
+
+
task_queue.join()
+
task_queue.put(None)
+
thread.join()
if __name__ == "__main__":
+
main()
-2
migrations/001_initdb.sql
···
-
PRAGMA foreign_keys = ON;
-
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
···
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,