social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import queue 4import threading 5from pathlib import Path 6from typing import Callable 7 8from database.connection import DatabasePool 9import env 10from database.migrations import DatabaseMigrator 11from registry import create_input_service, create_output_service 12from registry_bootstrap import bootstrap 13from util.util import LOGGER, read_env, shutdown_hook 14 15 16def main() -> None: 17 if not env.DATA_DIR.exists(): 18 env.DATA_DIR.mkdir(parents=True) 19 20 if not env.SETTINGS_DIR.exists(): 21 LOGGER.info("First launch detected! Creating %s and exiting!", env.SETTINGS_DIR) 22 return 23 24 migrator = DatabaseMigrator(env.DATABASE_DIR, env.MIGRATIONS_DIR) 25 try: 26 migrator.migrate() 27 except Exception: 28 LOGGER.exception("Failed to migrate database!") 29 return 30 finally: 31 migrator.close() 32 33 db_pool = DatabasePool(env.DATABASE_DIR) 34 35 LOGGER.info("Bootstrapping registries...") 36 bootstrap() 37 38 LOGGER.info("Loading settings...") 39 40 with open(env.SETTINGS_DIR) as f: 41 settings = json.load(f) 42 read_env(settings) 43 44 if "input" not in settings: 45 raise KeyError("No `input` sepcified in settings!") 46 if "outputs" not in settings: 47 raise KeyError("No `outputs` spicified in settings!") 48 49 input = create_input_service(db_pool, settings["input"]) 50 outputs = [create_output_service(db_pool, data) for data in settings["outputs"]] 51 52 LOGGER.info("Starting task worker...") 53 54 def worker(task_queue: queue.Queue[Callable[[], None] | None]): 55 while True: 56 task = task_queue.get() 57 if task is None: 58 break 59 60 try: 61 task() 62 except Exception: 63 LOGGER.exception("Exception in worker thread!") 64 finally: 65 task_queue.task_done() 66 67 task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue() 68 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) 69 thread.start() 70 71 LOGGER.info("Connecting to %s...", input.url) 72 input.outputs = outputs 73 input.submitter = lambda c: task_queue.put(c) 74 try: 75 asyncio.run(input.listen()) 76 except KeyboardInterrupt: 77 LOGGER.info("Stopping...") 78 79 task_queue.join() 80 task_queue.put(None) 81 thread.join() 82 db_pool.close() 83 84 for shook in shutdown_hook: 85 shook() 86 87 88if __name__ == "__main__": 89 main()