from util import LOGGER import os import json import database import mastodon, misskey, bluesky, cross import asyncio, threading, queue, traceback import util DEFAULT_SETTINGS: dict = { 'input': { 'type': 'mastodon-wss', 'instance': 'env:MASTODON_INSTANCE', 'token': 'env:MASTODON_TOKEN', "options": mastodon.MastodonInputOptions({}) }, 'outputs': [ { 'type': 'bluesky', 'handle': 'env:BLUESKY_HANDLE', 'app-password': 'env:BLUESKY_APP_PASSWORD', 'options': bluesky.BlueskyOutputOptions({}) } ] } INPUTS = { "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db), "misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db), "bluesky-pds-wss": lambda settings, db: bluesky.BlueskyPdsInput(settings, db) } OUTPUTS = { "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db), "mastodon": lambda input, settings, db: mastodon.MastodonOutput(input, settings, db) } def execute(data_dir): if not os.path.exists(data_dir): os.makedirs(data_dir) settings_path = os.path.join(data_dir, 'settings.json') database_path = os.path.join(data_dir, 'data.db') if not os.path.exists(settings_path): LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) with open(settings_path, 'w') as f: f.write(util.as_json(DEFAULT_SETTINGS, indent=2)) return 0 LOGGER.info('Loading settings...') with open(settings_path, 'rb') as f: settings = json.load(f) LOGGER.info('Starting database worker...') db_worker = database.DataBaseWorker(os.path.abspath(database_path)) db_worker.execute('PRAGMA foreign_keys = ON;') # create the posts table # id - internal id of the post # user_id - user id on the service (e.g. a724sknj5y9ydk0w) # service - the service (e.g. https://shrimp.melontini.me) # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) # parent_id - the internal id of the parent db_worker.execute( """ CREATE TABLE IF NOT EXISTS posts ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL, service TEXT NOT NULL, identifier TEXT NOT NULL UNIQUE, parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL ); """ ) # create the mappings table # original_post_id - the post this was mapped from # mapped_post_id - the post this was mapped to db_worker.execute( """ CREATE TABLE IF NOT EXISTS mappings ( original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, mapped_post_id INTEGER NOT NULL ); """ ) input_settings = settings.get('input') if not input_settings: raise Exception("No input specified!") outputs_settings = settings.get('outputs', []) input = INPUTS[input_settings['type']](input_settings, db_worker) if not outputs_settings: LOGGER.warning("No outputs specified! Check the config!") outputs: list[cross.Output] = [] for output_settings in outputs_settings: outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker)) LOGGER.info('Starting task worker...') def worker(queue: queue.Queue): while True: task = queue.get() if task is None: break try: task() except Exception as e: LOGGER.error(f"Exception in worker thread!\n{e}") traceback.print_exc() finally: queue.task_done() task_queue = queue.Queue() thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) thread.start() LOGGER.info('Connecting to %s...', input.service) try: asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) except KeyboardInterrupt: LOGGER.info("Stopping...") task_queue.join() task_queue.put(None) thread.join() if __name__ == "__main__": execute('./data')