import asyncio import json import os import queue import threading import traceback import cross import util.database as database from bluesky.input import BlueskyJetstreamInput from bluesky.output import BlueskyOutput, BlueskyOutputOptions from mastodon.input import MastodonInput, MastodonInputOptions from mastodon.output import MastodonOutput from misskey.input import MisskeyInput from util.util import LOGGER, as_json DEFAULT_SETTINGS: dict = { "input": { "type": "mastodon-wss", "instance": "env:MASTODON_INSTANCE", "token": "env:MASTODON_TOKEN", "options": MastodonInputOptions({}), }, "outputs": [ { "type": "bluesky", "handle": "env:BLUESKY_HANDLE", "app-password": "env:BLUESKY_APP_PASSWORD", "options": BlueskyOutputOptions({}), } ], } INPUTS = { "mastodon-wss": lambda settings, db: MastodonInput(settings, db), "misskey-wss": lambda settigs, db: MisskeyInput(settigs, db), "bluesky-jetstream-wss": lambda settings, db: BlueskyJetstreamInput(settings, db), } OUTPUTS = { "bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db), "mastodon": lambda input, settings, db: MastodonOutput(input, settings, db), } def execute(data_dir): if not os.path.exists(data_dir): os.makedirs(data_dir) settings_path = os.path.join(data_dir, "settings.json") database_path = os.path.join(data_dir, "data.db") if not os.path.exists(settings_path): LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) with open(settings_path, "w") as f: f.write(as_json(DEFAULT_SETTINGS, indent=2)) return 0 LOGGER.info("Loading settings...") with open(settings_path, "rb") as f: settings = json.load(f) LOGGER.info("Starting database worker...") db_worker = database.DataBaseWorker(os.path.abspath(database_path)) db_worker.execute("PRAGMA foreign_keys = ON;") # create the posts table # id - internal id of the post # user_id - user id on the service (e.g. a724sknj5y9ydk0w) # service - the service (e.g. https://shrimp.melontini.me) # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) # parent_id - the internal id of the parent db_worker.execute( """ CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, service TEXT NOT NULL, identifier TEXT NOT NULL, parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL ); """ ) columns = db_worker.execute("PRAGMA table_info(posts)") column_names = [col[1] for col in columns] if "reposted_id" not in column_names: db_worker.execute(""" ALTER TABLE posts ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL """) if "extra_data" not in column_names: db_worker.execute(""" ALTER TABLE posts ADD COLUMN extra_data TEXT NULL """) # create the mappings table # original_post_id - the post this was mapped from # mapped_post_id - the post this was mapped to db_worker.execute( """ CREATE TABLE IF NOT EXISTS mappings ( original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, mapped_post_id INTEGER NOT NULL ); """ ) input_settings = settings.get("input") if not input_settings: raise Exception("No input specified!") outputs_settings = settings.get("outputs", []) input = INPUTS[input_settings["type"]](input_settings, db_worker) if not outputs_settings: LOGGER.warning("No outputs specified! Check the config!") outputs: list[cross.Output] = [] for output_settings in outputs_settings: outputs.append( OUTPUTS[output_settings["type"]](input, output_settings, db_worker) ) LOGGER.info("Starting task worker...") def worker(queue: queue.Queue): while True: task = queue.get() if task is None: break try: task() except Exception as e: LOGGER.error(f"Exception in worker thread!\n{e}") traceback.print_exc() finally: queue.task_done() task_queue = queue.Queue() thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) thread.start() LOGGER.info("Connecting to %s...", input.service) try: asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) except KeyboardInterrupt: LOGGER.info("Stopping...") task_queue.join() task_queue.put(None) thread.join() if __name__ == "__main__": execute("./data")