social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import queue
4import threading
5from pathlib import Path
6from typing import Callable
7
8from database.connection import DatabasePool
9import env
10from database.migrations import DatabaseMigrator
11from registry import create_input_service, create_output_service
12from registry_bootstrap import bootstrap
13from util.util import LOGGER, read_env, shutdown_hook
14
15
16def main() -> None:
17 data = Path(env.DATA_DIR)
18
19 if not data.exists():
20 data.mkdir(parents=True)
21
22 settings_path = data.joinpath("settings.json")
23 database_path = data.joinpath("db.sqlite")
24
25 if not settings_path.exists():
26 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
27 return
28
29 migrator = DatabaseMigrator(database_path, Path(env.MIGRATIONS_DIR))
30 try:
31 migrator.migrate()
32 except Exception:
33 LOGGER.exception("Failed to migrate database!")
34 return
35 finally:
36 migrator.close()
37
38 db_pool = DatabasePool(database_path)
39
40 LOGGER.info("Bootstrapping registries...")
41 bootstrap()
42
43 LOGGER.info("Loading settings...")
44
45 with open(settings_path) as f:
46 settings = json.load(f)
47 read_env(settings)
48
49 if "input" not in settings:
50 raise KeyError("No `input` sepcified in settings!")
51 if "outputs" not in settings:
52 raise KeyError("No `outputs` spicified in settings!")
53
54 input = create_input_service(db_pool, settings["input"])
55 outputs = [
56 create_output_service(db_pool, data) for data in settings["outputs"]
57 ]
58
59 LOGGER.info("Starting task worker...")
60
61 def worker(task_queue: queue.Queue[Callable[[], None] | None]):
62 while True:
63 task = task_queue.get()
64 if task is None:
65 break
66
67 try:
68 task()
69 except Exception:
70 LOGGER.exception("Exception in worker thread!")
71 finally:
72 task_queue.task_done()
73
74 task_queue: queue.Queue[Callable[[], None] | None] = queue.Queue()
75 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
76 thread.start()
77
78 LOGGER.info("Connecting to %s...", input.url)
79 input.outputs = outputs
80 input.submitter = lambda c: task_queue.put(c)
81 try:
82 asyncio.run(input.listen())
83 except KeyboardInterrupt:
84 LOGGER.info("Stopping...")
85
86 task_queue.join()
87 task_queue.put(None)
88 thread.join()
89 db_pool.close()
90
91 for shook in shutdown_hook:
92 shook()
93
94
95if __name__ == "__main__":
96 main()