social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import os
3import json
4import database
5import mastodon, misskey, bluesky, cross
6import asyncio, threading, queue, traceback
7
8DEFAULT_SETTINGS: dict = {
9 'input': {
10 'type': 'mastodon-wss',
11 'instance': 'env:MASTODON_INSTANCE',
12 'token': 'env:MASTODON_TOKEN',
13 "options": {
14 "allowed_visibility": [
15 "public",
16 "unlisted"
17 ]
18 }
19 },
20 'output': [
21 {
22 'type': 'bluesky',
23 'handle': 'env:BLUESKY_HANDLE',
24 'app-password': 'env:BLUESKY_APP_PASSWORD',
25 'options': {
26 'quote_gate': False,
27 'thread_gate': [
28 'everybody'
29 ]
30 }
31 }
32 ]
33}
34
35INPUTS = {
36 "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db),
37 "misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db)
38}
39
40OUTPUTS = {
41 "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db)
42}
43
44def execute(data_dir):
45 if not os.path.exists(data_dir):
46 os.makedirs(data_dir)
47
48 settings_path = os.path.join(data_dir, 'settings.json')
49 database_path = os.path.join(data_dir, 'data.db')
50
51 if not os.path.exists(settings_path):
52 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
53
54 with open(settings_path, 'w') as f:
55 json.dump(DEFAULT_SETTINGS, f, indent=2)
56 return 0
57
58 LOGGER.info('Loading settings...')
59 with open(settings_path, 'rb') as f:
60 settings = json.load(f)
61
62 LOGGER.info('Starting database worker...')
63 db_worker = database.DataBaseWorker(os.path.abspath(database_path))
64
65 db_worker.execute('PRAGMA foreign_keys = ON;')
66
67 # create the posts table
68 # id - internal id of the post
69 # user_id - user id on the service (e.g. a724sknj5y9ydk0w)
70 # service - the service (e.g. https://shrimp.melontini.me)
71 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
72 # parent_id - the internal id of the parent
73 db_worker.execute(
74 """
75 CREATE TABLE IF NOT EXISTS posts (
76 id INTEGER PRIMARY KEY AUTOINCREMENT,
77 user_id TEXT NOT NULL,
78 service TEXT NOT NULL,
79 identifier TEXT NOT NULL UNIQUE,
80 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
81 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
82 );
83 """
84 )
85
86 # create the mappings table
87 # original_post_id - the post this was mapped from
88 # mapped_post_id - the post this was mapped to
89 db_worker.execute(
90 """
91 CREATE TABLE IF NOT EXISTS mappings (
92 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
93 mapped_post_id INTEGER NOT NULL
94 );
95 """
96 )
97
98 input_settings = settings.get('input')
99 if not input_settings:
100 raise Exception("No input specified!")
101 outputs_settings = settings.get('outputs', [])
102
103 input = INPUTS[input_settings['type']](input_settings, db_worker)
104
105 outputs: list[cross.Output] = []
106 for output_settings in outputs_settings:
107 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
108
109 LOGGER.info('Starting task worker...')
110 task_queue = queue.Queue()
111 def worker():
112 while True:
113 task = task_queue.get()
114 if task is None:
115 break
116 try:
117 task()
118 except Exception as e:
119 LOGGER.error(f"Exception in worker thread!\n{e}")
120 traceback.print_exc()
121 thread = threading.Thread(target=worker, daemon=True)
122 thread.start()
123 LOGGER.info('Connecting to %s...', input.service)
124 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
125
126
127if __name__ == "__main__":
128 execute('./data')