import asyncio import json import queue import threading from pathlib import Path from typing import Callable import env from database.migrations import DatabaseMigrator from registry import create_input_service, create_output_service from registry_bootstrap import bootstrap from util.util import LOGGER, read_env def main() -> None: data = Path(env.DATA_DIR) if not data.exists(): data.mkdir(parents=True) settings_path = data.joinpath("settings.json") database_path = data.joinpath("db.sqlite") if not settings_path.exists(): LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) return migrator = DatabaseMigrator(database_path, Path(env.MIGRATIONS_DIR)) try: migrator.migrate() except Exception: LOGGER.exception("Failed to migrate database!") return finally: migrator.close() LOGGER.info("Bootstrapping registries...") bootstrap() LOGGER.info("Loading settings...") with open(settings_path) as f: settings = json.load(f) read_env(settings) if "input" not in settings: raise KeyError("No `input` sepcified in settings!") if "outputs" not in settings: raise KeyError("No `outputs` spicified in settings!") input = create_input_service(database_path, settings["input"]) outputs = [ create_output_service(database_path, data) for data in settings["outputs"] ] LOGGER.info("Starting task worker...") def worker(task_queue: queue.Queue[Callable[[], None] | None]): while True: task = task_queue.get() if task is None: break try: task() except Exception: LOGGER.exception("Exception in worker thread!") finally: task_queue.task_done() task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue() thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) thread.start() LOGGER.info("Connecting to %s...", "TODO") # TODO try: asyncio.run(input.listen(outputs, lambda c: task_queue.put(c))) except KeyboardInterrupt: LOGGER.info("Stopping...") task_queue.join() task_queue.put(None) thread.join() if __name__ == "__main__": main()