social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import queue 4import threading 5from pathlib import Path 6from typing import Callable 7 8from database.connection import DatabasePool 9import env 10from database.migrations import DatabaseMigrator 11from registry import create_input_service, create_output_service 12from registry_bootstrap import bootstrap 13from util.util import LOGGER, read_env 14 15 16def main() -> None: 17 data = Path(env.DATA_DIR) 18 19 if not data.exists(): 20 data.mkdir(parents=True) 21 22 settings_path = data.joinpath("settings.json") 23 database_path = data.joinpath("db.sqlite") 24 25 if not settings_path.exists(): 26 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) 27 return 28 29 migrator = DatabaseMigrator(database_path, Path(env.MIGRATIONS_DIR)) 30 try: 31 migrator.migrate() 32 except Exception: 33 LOGGER.exception("Failed to migrate database!") 34 return 35 finally: 36 migrator.close() 37 38 db_pool = DatabasePool(database_path) 39 40 LOGGER.info("Bootstrapping registries...") 41 bootstrap() 42 43 LOGGER.info("Loading settings...") 44 45 with open(settings_path) as f: 46 settings = json.load(f) 47 read_env(settings) 48 49 if "input" not in settings: 50 raise KeyError("No `input` sepcified in settings!") 51 if "outputs" not in settings: 52 raise KeyError("No `outputs` spicified in settings!") 53 54 input = create_input_service(db_pool, settings["input"]) 55 outputs = [ 56 create_output_service(db_pool, data) for data in settings["outputs"] 57 ] 58 59 LOGGER.info("Starting task worker...") 60 61 def worker(task_queue: queue.Queue[Callable[[], None] | None]): 62 while True: 63 task = task_queue.get() 64 if task is None: 65 break 66 67 try: 68 task() 69 except Exception: 70 LOGGER.exception("Exception in worker thread!") 71 finally: 72 task_queue.task_done() 73 74 task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue() 75 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) 76 thread.start() 77 78 LOGGER.info("Connecting to %s...", input.url) 79 try: 80 asyncio.run(input.listen(outputs, lambda c: task_queue.put(c))) 81 except KeyboardInterrupt: 82 LOGGER.info("Stopping...") 83 84 task_queue.join() 85 task_queue.put(None) 86 thread.join() 87 db_pool.close() 88 89 90if __name__ == "__main__": 91 main()