import asyncio import json import queue import threading from pathlib import Path from typing import Callable from database.connection import DatabasePool import env from database.migrations import DatabaseMigrator from registry import create_input_service, create_output_service from registry_bootstrap import bootstrap from util.util import LOGGER, read_env, shutdown_hook def main() -> None: if not env.DATA_DIR.exists(): env.DATA_DIR.mkdir(parents=True) if not env.SETTINGS_DIR.exists(): LOGGER.info("First launch detected! Creating %s and exiting!", env.SETTINGS_DIR) return migrator = DatabaseMigrator(env.DATABASE_DIR, env.MIGRATIONS_DIR) try: migrator.migrate() except Exception: LOGGER.exception("Failed to migrate database!") return finally: migrator.close() db_pool = DatabasePool(env.DATABASE_DIR) LOGGER.info("Bootstrapping registries...") bootstrap() LOGGER.info("Loading settings...") with open(env.SETTINGS_DIR) as f: settings = json.load(f) read_env(settings) if "input" not in settings: raise KeyError("No `input` sepcified in settings!") if "outputs" not in settings: raise KeyError("No `outputs` spicified in settings!") input = create_input_service(db_pool, settings["input"]) outputs = [create_output_service(db_pool, data) for data in settings["outputs"]] LOGGER.info("Starting task worker...") def worker(task_queue: queue.Queue[Callable[[], None] | None]): while True: task = task_queue.get() if task is None: break try: task() except Exception: LOGGER.exception("Exception in worker thread!") finally: task_queue.task_done() task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue() thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) thread.start() LOGGER.info("Connecting to %s...", input.url) input.outputs = outputs input.submitter = lambda c: task_queue.put(c) try: asyncio.run(input.listen()) except KeyboardInterrupt: LOGGER.info("Stopping...") task_queue.join() task_queue.put(None) thread.join() db_pool.close() for shook in shutdown_hook: shook() if __name__ == "__main__": main()