social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import queue
4import threading
5from pathlib import Path
6from typing import Callable
7
8from database.connection import DatabasePool
9import env
10from database.migrations import DatabaseMigrator
11from registry import create_input_service, create_output_service
12from registry_bootstrap import bootstrap
13from util.util import LOGGER, read_env, shutdown_hook
14
15
16def main() -> None:
17 if not env.DATA_DIR.exists():
18 env.DATA_DIR.mkdir(parents=True)
19
20 if not env.SETTINGS_DIR.exists():
21 LOGGER.info("First launch detected! Creating %s and exiting!", env.SETTINGS_DIR)
22 return
23
24 migrator = DatabaseMigrator(env.DATABASE_DIR, env.MIGRATIONS_DIR)
25 try:
26 migrator.migrate()
27 except Exception:
28 LOGGER.exception("Failed to migrate database!")
29 return
30 finally:
31 migrator.close()
32
33 db_pool = DatabasePool(env.DATABASE_DIR)
34
35 LOGGER.info("Bootstrapping registries...")
36 bootstrap()
37
38 LOGGER.info("Loading settings...")
39
40 with open(env.SETTINGS_DIR) as f:
41 settings = json.load(f)
42 read_env(settings)
43
44 if "input" not in settings:
45 raise KeyError("No `input` sepcified in settings!")
46 if "outputs" not in settings:
47 raise KeyError("No `outputs` spicified in settings!")
48
49 input = create_input_service(db_pool, settings["input"])
50 outputs = [create_output_service(db_pool, data) for data in settings["outputs"]]
51
52 LOGGER.info("Starting task worker...")
53
54 def worker(task_queue: queue.Queue[Callable[[], None] | None]):
55 while True:
56 task = task_queue.get()
57 if task is None:
58 break
59
60 try:
61 task()
62 except Exception:
63 LOGGER.exception("Exception in worker thread!")
64 finally:
65 task_queue.task_done()
66
67 task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue()
68 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
69 thread.start()
70
71 LOGGER.info("Connecting to %s...", input.url)
72 input.outputs = outputs
73 input.submitter = lambda c: task_queue.put(c)
74 try:
75 asyncio.run(input.listen())
76 except KeyboardInterrupt:
77 LOGGER.info("Stopping...")
78
79 task_queue.join()
80 task_queue.put(None)
81 thread.join()
82 db_pool.close()
83
84 for shook in shutdown_hook:
85 shook()
86
87
88if __name__ == "__main__":
89 main()