social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import os
3import json
4import database
5import mastodon, misskey, bluesky, cross
6import asyncio, threading, queue, traceback
7import util
8
9DEFAULT_SETTINGS: dict = {
10 'input': {
11 'type': 'mastodon-wss',
12 'instance': 'env:MASTODON_INSTANCE',
13 'token': 'env:MASTODON_TOKEN',
14 "options": mastodon.MastodonInputOptions({})
15 },
16 'outputs': [
17 {
18 'type': 'bluesky',
19 'handle': 'env:BLUESKY_HANDLE',
20 'app-password': 'env:BLUESKY_APP_PASSWORD',
21 'options': bluesky.BlueskyOutputOptions({})
22 }
23 ]
24}
25
26INPUTS = {
27 "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db),
28 "misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db),
29 "bluesky-pds-wss": lambda settings, db: bluesky.BlueskyPdsInput(settings, db)
30}
31
32OUTPUTS = {
33 "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db),
34 "mastodon": lambda input, settings, db: mastodon.MastodonOutput(input, settings, db)
35}
36
37def execute(data_dir):
38 if not os.path.exists(data_dir):
39 os.makedirs(data_dir)
40
41 settings_path = os.path.join(data_dir, 'settings.json')
42 database_path = os.path.join(data_dir, 'data.db')
43
44 if not os.path.exists(settings_path):
45 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
46
47 with open(settings_path, 'w') as f:
48 f.write(util.as_json(DEFAULT_SETTINGS, indent=2))
49 return 0
50
51 LOGGER.info('Loading settings...')
52 with open(settings_path, 'rb') as f:
53 settings = json.load(f)
54
55 LOGGER.info('Starting database worker...')
56 db_worker = database.DataBaseWorker(os.path.abspath(database_path))
57
58 db_worker.execute('PRAGMA foreign_keys = ON;')
59
60 # create the posts table
61 # id - internal id of the post
62 # user_id - user id on the service (e.g. a724sknj5y9ydk0w)
63 # service - the service (e.g. https://shrimp.melontini.me)
64 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
65 # parent_id - the internal id of the parent
66 db_worker.execute(
67 """
68 CREATE TABLE IF NOT EXISTS posts (
69 id INTEGER PRIMARY KEY AUTOINCREMENT,
70 user_id TEXT NOT NULL,
71 service TEXT NOT NULL,
72 identifier TEXT NOT NULL UNIQUE,
73 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
74 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
75 );
76 """
77 )
78
79 # create the mappings table
80 # original_post_id - the post this was mapped from
81 # mapped_post_id - the post this was mapped to
82 db_worker.execute(
83 """
84 CREATE TABLE IF NOT EXISTS mappings (
85 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
86 mapped_post_id INTEGER NOT NULL
87 );
88 """
89 )
90
91 input_settings = settings.get('input')
92 if not input_settings:
93 raise Exception("No input specified!")
94 outputs_settings = settings.get('outputs', [])
95
96 input = INPUTS[input_settings['type']](input_settings, db_worker)
97
98 if not outputs_settings:
99 LOGGER.warning("No outputs specified! Check the config!")
100
101 outputs: list[cross.Output] = []
102 for output_settings in outputs_settings:
103 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
104
105 LOGGER.info('Starting task worker...')
106 def worker(queue: queue.Queue):
107 while True:
108 task = queue.get()
109 if task is None:
110 break
111
112 try:
113 task()
114 except Exception as e:
115 LOGGER.error(f"Exception in worker thread!\n{e}")
116 traceback.print_exc()
117 finally:
118 queue.task_done()
119
120 task_queue = queue.Queue()
121 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
122 thread.start()
123
124 LOGGER.info('Connecting to %s...', input.service)
125 try:
126 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
127 except KeyboardInterrupt:
128 LOGGER.info("Stopping...")
129
130 task_queue.join()
131 task_queue.put(None)
132 thread.join()
133
134
135if __name__ == "__main__":
136 execute('./data')