social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import os
2import json
3import asyncio, threading, queue, traceback
4
5from util.util import LOGGER, as_json
6import cross, util.database as database
7
8from bluesky.input import BlueskyPdsInput
9from bluesky.output import BlueskyOutputOptions, BlueskyOutput
10
11from mastodon.input import MastodonInputOptions, MastodonInput
12from mastodon.output import MastodonOutput
13
14from misskey.input import MisskeyInput
15
16DEFAULT_SETTINGS: dict = {
17 'input': {
18 'type': 'mastodon-wss',
19 'instance': 'env:MASTODON_INSTANCE',
20 'token': 'env:MASTODON_TOKEN',
21 "options": MastodonInputOptions({})
22 },
23 'outputs': [
24 {
25 'type': 'bluesky',
26 'handle': 'env:BLUESKY_HANDLE',
27 'app-password': 'env:BLUESKY_APP_PASSWORD',
28 'options': BlueskyOutputOptions({})
29 }
30 ]
31}
32
33INPUTS = {
34 "mastodon-wss": lambda settings, db: MastodonInput(settings, db),
35 "misskey-wss": lambda settigs, db: MisskeyInput(settigs, db),
36 "bluesky-pds-wss": lambda settings, db: BlueskyPdsInput(settings, db)
37}
38
39OUTPUTS = {
40 "bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db),
41 "mastodon": lambda input, settings, db: MastodonOutput(input, settings, db)
42}
43
44def execute(data_dir):
45 if not os.path.exists(data_dir):
46 os.makedirs(data_dir)
47
48 settings_path = os.path.join(data_dir, 'settings.json')
49 database_path = os.path.join(data_dir, 'data.db')
50
51 if not os.path.exists(settings_path):
52 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
53
54 with open(settings_path, 'w') as f:
55 f.write(as_json(DEFAULT_SETTINGS, indent=2))
56 return 0
57
58 LOGGER.info('Loading settings...')
59 with open(settings_path, 'rb') as f:
60 settings = json.load(f)
61
62 LOGGER.info('Starting database worker...')
63 db_worker = database.DataBaseWorker(os.path.abspath(database_path))
64
65 db_worker.execute('PRAGMA foreign_keys = ON;')
66
67 # create the posts table
68 # id - internal id of the post
69 # user_id - user id on the service (e.g. a724sknj5y9ydk0w)
70 # service - the service (e.g. https://shrimp.melontini.me)
71 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
72 # parent_id - the internal id of the parent
73 db_worker.execute(
74 """
75 CREATE TABLE IF NOT EXISTS posts (
76 id INTEGER PRIMARY KEY AUTOINCREMENT,
77 user_id TEXT NOT NULL,
78 service TEXT NOT NULL,
79 identifier TEXT NOT NULL UNIQUE,
80 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
81 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
82 );
83 """
84 )
85
86 # create the mappings table
87 # original_post_id - the post this was mapped from
88 # mapped_post_id - the post this was mapped to
89 db_worker.execute(
90 """
91 CREATE TABLE IF NOT EXISTS mappings (
92 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
93 mapped_post_id INTEGER NOT NULL
94 );
95 """
96 )
97
98 input_settings = settings.get('input')
99 if not input_settings:
100 raise Exception("No input specified!")
101 outputs_settings = settings.get('outputs', [])
102
103 input = INPUTS[input_settings['type']](input_settings, db_worker)
104
105 if not outputs_settings:
106 LOGGER.warning("No outputs specified! Check the config!")
107
108 outputs: list[cross.Output] = []
109 for output_settings in outputs_settings:
110 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
111
112 LOGGER.info('Starting task worker...')
113 def worker(queue: queue.Queue):
114 while True:
115 task = queue.get()
116 if task is None:
117 break
118
119 try:
120 task()
121 except Exception as e:
122 LOGGER.error(f"Exception in worker thread!\n{e}")
123 traceback.print_exc()
124 finally:
125 queue.task_done()
126
127 task_queue = queue.Queue()
128 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True)
129 thread.start()
130
131 LOGGER.info('Connecting to %s...', input.service)
132 try:
133 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
134 except KeyboardInterrupt:
135 LOGGER.info("Stopping...")
136
137 task_queue.join()
138 task_queue.put(None)
139 thread.join()
140
141
142if __name__ == "__main__":
143 execute('./data')