social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import asyncio
2import json
3import os
4import queue
5import threading
6import traceback
7
8import cross
9import util.database as database
10from bluesky.input import BlueskyJetstreamInput
11from bluesky.output import BlueskyOutput, BlueskyOutputOptions
12from mastodon.input import MastodonInput, MastodonInputOptions
13from mastodon.output import MastodonOutput
14from misskey.input import MisskeyInput
15from util.util import LOGGER, as_json
16
17DEFAULT_SETTINGS: dict = {
18 "input": {
19 "type": "mastodon-wss",
20 "instance": "env:MASTODON_INSTANCE",
21 "token": "env:MASTODON_TOKEN",
22 "options": MastodonInputOptions({}),
23 },
24 "outputs": [
25 {
26 "type": "bluesky",
27 "handle": "env:BLUESKY_HANDLE",
28 "app-password": "env:BLUESKY_APP_PASSWORD",
29 "options": BlueskyOutputOptions({}),
30 }
31 ],
32}
33
34INPUTS = {
35 "mastodon-wss": lambda settings, db: MastodonInput(settings, db),
36 "misskey-wss": lambda settigs, db: MisskeyInput(settigs, db),
37 "bluesky-jetstream-wss": lambda settings, db: BlueskyJetstreamInput(settings, db),
38}
39
40OUTPUTS = {
41 "bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db),
42 "mastodon": lambda input, settings, db: MastodonOutput(input, settings, db),
43}
44
45
46def execute(data_dir):
47 if not os.path.exists(data_dir):
48 os.makedirs(data_dir)
49
50 settings_path = os.path.join(data_dir, "settings.json")
51 database_path = os.path.join(data_dir, "data.db")
52
53 if not os.path.exists(settings_path):
54 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
55
56 with open(settings_path, "w") as f:
57 f.write(as_json(DEFAULT_SETTINGS, indent=2))
58 return 0
59
60 LOGGER.info("Loading settings...")
61 with open(settings_path, "rb") as f:
62 settings = json.load(f)
63
64 LOGGER.info("Starting database worker...")
65 db_worker = database.DataBaseWorker(os.path.abspath(database_path))
66
67 db_worker.execute("PRAGMA foreign_keys = ON;")
68
69 # create the posts table
70 # id - internal id of the post
71 # user_id - user id on the service (e.g. a724sknj5y9ydk0w)
72 # service - the service (e.g. https://shrimp.melontini.me)
73 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
74 # parent_id - the internal id of the parent
75 db_worker.execute(
76 """
77 CREATE TABLE IF NOT EXISTS posts (
78 id INTEGER PRIMARY KEY AUTOINCREMENT,
79 user_id TEXT NOT NULL,
80 service TEXT NOT NULL,
81 identifier TEXT NOT NULL,
82 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
83 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
84 );
85 """
86 )
87
88 columns = db_worker.execute("PRAGMA table_info(posts)")
89 column_names = [col[1] for col in columns]
90 if "reposted_id" not in column_names:
91 db_worker.execute("""
92 ALTER TABLE posts
93 ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
94 """)
95 if "extra_data" not in column_names:
96 db_worker.execute("""
97 ALTER TABLE posts
98 ADD COLUMN extra_data TEXT NULL
99 """)
100
101 # create the mappings table
102 # original_post_id - the post this was mapped from
103 # mapped_post_id - the post this was mapped to
104 db_worker.execute(
105 """
106 CREATE TABLE IF NOT EXISTS mappings (
107 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
108 mapped_post_id INTEGER NOT NULL
109 );
110 """
111 )
112
113 input_settings = settings.get("input")
114 if not input_settings:
115 raise Exception("No input specified!")
116 outputs_settings = settings.get("outputs", [])
117
118 input = INPUTS[input_settings["type"]](input_settings, db_worker)
119
120 if not outputs_settings:
121 LOGGER.warning("No outputs specified! Check the config!")
122
123 outputs: list[cross.Output] = []
124 for output_settings in outputs_settings:
125 outputs.append(
126 OUTPUTS[output_settings["type"]](input, output_settings, db_worker)
127 )
128
129 LOGGER.info("Starting task worker...")
130
131 def worker(queue: queue.Queue):
132 while True:
133 task = queue.get()
134 if task is None:
135 break
136
137 try:
138 task()
139 except Exception as e:
140 LOGGER.error(f"Exception in worker thread!\n{e}")
141 traceback.print_exc()
142 finally:
143 queue.task_done()
144
145 task_queue = queue.Queue()
146 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
147 thread.start()
148
149 LOGGER.info("Connecting to %s...", input.service)
150 try:
151 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
152 except KeyboardInterrupt:
153 LOGGER.info("Stopping...")
154
155 task_queue.join()
156 task_queue.put(None)
157 thread.join()
158
159
160if __name__ == "__main__":
161 execute("./data")