social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 2.4 kB view raw
1import asyncio 2import json 3import queue 4import threading 5from pathlib import Path 6from typing import Callable 7 8from database.connection import DatabasePool 9import env 10from database.migrations import DatabaseMigrator 11from registry import create_input_service, create_output_service 12from registry_bootstrap import bootstrap 13from util.util import LOGGER, read_env, shutdown_hook 14 15 16def main() -> None: 17 if not env.DATA_DIR.exists(): 18 env.DATA_DIR.mkdir(parents=True) 19 20 if not env.SETTINGS_DIR.exists(): 21 LOGGER.info("First launch detected! Creating %s and exiting!", env.SETTINGS_DIR) 22 return 23 24 migrator = DatabaseMigrator(env.DATABASE_DIR, env.MIGRATIONS_DIR) 25 try: 26 migrator.migrate() 27 except Exception: 28 LOGGER.exception("Failed to migrate database!") 29 return 30 finally: 31 migrator.close() 32 33 db_pool = DatabasePool(env.DATABASE_DIR) 34 35 LOGGER.info("Bootstrapping registries...") 36 bootstrap() 37 38 LOGGER.info("Loading settings...") 39 40 with open(env.SETTINGS_DIR) as f: 41 settings = json.load(f) 42 read_env(settings) 43 44 if "input" not in settings: 45 raise KeyError("No `input` sepcified in settings!") 46 if "outputs" not in settings: 47 raise KeyError("No `outputs` spicified in settings!") 48 49 input = create_input_service(db_pool, settings["input"]) 50 outputs = [create_output_service(db_pool, data) for data in settings["outputs"]] 51 52 LOGGER.info("Starting task worker...") 53 54 def worker(task_queue: queue.Queue[Callable[[], None] | None]): 55 while True: 56 task = task_queue.get() 57 if task is None: 58 break 59 60 try: 61 task() 62 except Exception: 63 LOGGER.exception("Exception in worker thread!") 64 finally: 65 task_queue.task_done() 66 67 task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue() 68 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) 69 thread.start() 70 71 LOGGER.info("Connecting to %s...", input.url) 72 input.outputs = outputs 73 input.submitter = lambda c: task_queue.put(c) 74 try: 75 asyncio.run(input.listen()) 76 except KeyboardInterrupt: 77 LOGGER.info("Stopping...") 78 79 task_queue.join() 80 task_queue.put(None) 81 thread.join() 82 db_pool.close() 83 84 for shook in shutdown_hook: 85 shook() 86 87 88if __name__ == "__main__": 89 main()