social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import os
2import json
3import asyncio, threading, queue, traceback
4
5from util.util import LOGGER, as_json
6import cross, util.database as database
7
8from bluesky.input import BlueskyJetstreamInput
9from bluesky.output import BlueskyOutputOptions, BlueskyOutput
10
11from mastodon.input import MastodonInputOptions, MastodonInput
12from mastodon.output import MastodonOutput
13
14from misskey.input import MisskeyInput
15
16DEFAULT_SETTINGS: dict = {
17 'input': {
18 'type': 'mastodon-wss',
19 'instance': 'env:MASTODON_INSTANCE',
20 'token': 'env:MASTODON_TOKEN',
21 "options": MastodonInputOptions({})
22 },
23 'outputs': [
24 {
25 'type': 'bluesky',
26 'handle': 'env:BLUESKY_HANDLE',
27 'app-password': 'env:BLUESKY_APP_PASSWORD',
28 'options': BlueskyOutputOptions({})
29 }
30 ]
31}
32
33INPUTS = {
34 "mastodon-wss": lambda settings, db: MastodonInput(settings, db),
35 "misskey-wss": lambda settigs, db: MisskeyInput(settigs, db),
36 "bluesky-jetstream-wss": lambda settings, db: BlueskyJetstreamInput(settings, db)
37}
38
39OUTPUTS = {
40 "bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db),
41 "mastodon": lambda input, settings, db: MastodonOutput(input, settings, db)
42}
43
44def execute(data_dir):
45 if not os.path.exists(data_dir):
46 os.makedirs(data_dir)
47
48 settings_path = os.path.join(data_dir, 'settings.json')
49 database_path = os.path.join(data_dir, 'data.db')
50
51 if not os.path.exists(settings_path):
52 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
53
54 with open(settings_path, 'w') as f:
55 f.write(as_json(DEFAULT_SETTINGS, indent=2))
56 return 0
57
58 LOGGER.info('Loading settings...')
59 with open(settings_path, 'rb') as f:
60 settings = json.load(f)
61
62 LOGGER.info('Starting database worker...')
63 db_worker = database.DataBaseWorker(os.path.abspath(database_path))
64
65 db_worker.execute('PRAGMA foreign_keys = ON;')
66
67 # create the posts table
68 # id - internal id of the post
69 # user_id - user id on the service (e.g. a724sknj5y9ydk0w)
70 # service - the service (e.g. https://shrimp.melontini.me)
71 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
72 # parent_id - the internal id of the parent
73 db_worker.execute(
74 """
75 CREATE TABLE IF NOT EXISTS posts (
76 id INTEGER PRIMARY KEY AUTOINCREMENT,
77 user_id TEXT NOT NULL,
78 service TEXT NOT NULL,
79 identifier TEXT NOT NULL,
80 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
81 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
82 );
83 """
84 )
85
86 columns = db_worker.execute("PRAGMA table_info(posts)")
87 column_names = [col[1] for col in columns]
88 if "reposted_id" not in column_names:
89 db_worker.execute("""
90 ALTER TABLE posts
91 ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
92 """)
93 if "extra_data" not in column_names:
94 db_worker.execute("""
95 ALTER TABLE posts
96 ADD COLUMN extra_data TEXT NULL
97 """)
98
99 # create the mappings table
100 # original_post_id - the post this was mapped from
101 # mapped_post_id - the post this was mapped to
102 db_worker.execute(
103 """
104 CREATE TABLE IF NOT EXISTS mappings (
105 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
106 mapped_post_id INTEGER NOT NULL
107 );
108 """
109 )
110
111 input_settings = settings.get('input')
112 if not input_settings:
113 raise Exception("No input specified!")
114 outputs_settings = settings.get('outputs', [])
115
116 input = INPUTS[input_settings['type']](input_settings, db_worker)
117
118 if not outputs_settings:
119 LOGGER.warning("No outputs specified! Check the config!")
120
121 outputs: list[cross.Output] = []
122 for output_settings in outputs_settings:
123 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
124
125 LOGGER.info('Starting task worker...')
126 def worker(queue: queue.Queue):
127 while True:
128 task = queue.get()
129 if task is None:
130 break
131
132 try:
133 task()
134 except Exception as e:
135 LOGGER.error(f"Exception in worker thread!\n{e}")
136 traceback.print_exc()
137 finally:
138 queue.task_done()
139
140 task_queue = queue.Queue()
141 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
142 thread.start()
143
144 LOGGER.info('Connecting to %s...', input.service)
145 try:
146 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
147 except KeyboardInterrupt:
148 LOGGER.info("Stopping...")
149
150 task_queue.join()
151 task_queue.put(None)
152 thread.join()
153
154
155if __name__ == "__main__":
156 execute('./data')