social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1from util import LOGGER
2import os
3import json
4import database
5import mastodon, bluesky, cross
6import asyncio, threading, queue, traceback
7
8DEFAULT_SETTINGS: dict = {
9 'input': {
10 'type': 'mastodon-wss',
11 'instance': 'env:MASTODON_INSTANCE',
12 'token': 'env:MASTODON_TOKEN',
13 "options": {
14 "allowed_visibility": [
15 "public",
16 "unlisted"
17 ]
18 }
19 },
20 'output': [
21 {
22 'type': 'bluesky',
23 'handle': 'env:BLUESKY_HANDLE',
24 'app-password': 'env:BLUESKY_APP_PASSWORD',
25 'options': {
26 'quote_gate': False,
27 'thread_gate': [
28 'everybody'
29 ]
30 }
31 }
32 ]
33}
34
35INPUTS = {
36 "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db)
37}
38
39OUTPUTS = {
40 "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db)
41}
42
43def execute(data_dir):
44 settings_path = os.path.join(data_dir, 'settings.json')
45 database_path = os.path.join(data_dir, 'data.db')
46
47 if not os.path.exists(settings_path):
48 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
49
50 with open(settings_path, 'w') as f:
51 json.dump(DEFAULT_SETTINGS, f, indent=2)
52 return 0
53
54 LOGGER.info('Loading settings...')
55 with open(settings_path, 'rb') as f:
56 settings = json.load(f)
57
58 LOGGER.info('Starting database worker...')
59 db_worker = database.DataBaseWorker(os.path.abspath(database_path))
60
61 db_worker.execute('PRAGMA foreign_keys = ON;')
62
63 # create the posts table
64 # id - internal id of the post
65 # user_id - user id on the service (e.g. a724sknj5y9ydk0w)
66 # service - the service (e.g. https://shrimp.melontini.me)
67 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p)
68 # parent_id - the internal id of the parent
69 db_worker.execute(
70 """
71 CREATE TABLE IF NOT EXISTS posts (
72 id INTEGER PRIMARY KEY AUTOINCREMENT,
73 user_id TEXT NOT NULL,
74 service TEXT NOT NULL,
75 identifier TEXT NOT NULL UNIQUE,
76 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL,
77 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL
78 );
79 """
80 )
81
82 # create the mappings table
83 # original_post_id - the post this was mapped from
84 # mapped_post_id - the post this was mapped to
85 db_worker.execute(
86 """
87 CREATE TABLE IF NOT EXISTS mappings (
88 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE,
89 mapped_post_id INTEGER NOT NULL
90 );
91 """
92 )
93
94 input_settings = settings.get('input')
95 if not input_settings:
96 raise Exception("No input specified!")
97 outputs_settings = settings.get('outputs', [])
98
99 input = INPUTS[input_settings['type']](input_settings, db_worker)
100
101 outputs: list[cross.Output] = []
102 for output_settings in outputs_settings:
103 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker))
104
105 LOGGER.info('Starting task worker...')
106 task_queue = queue.Queue()
107 def worker():
108 while True:
109 task = task_queue.get()
110 if task is None:
111 break
112 try:
113 task()
114 except Exception as e:
115 LOGGER.error(f"Exception in worker thread!\n{e}")
116 traceback.print_exc()
117 thread = threading.Thread(target=worker, daemon=True)
118 thread.start()
119 LOGGER.info('Listening to %s...', input.service)
120 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x)))
121
122
123if __name__ == "__main__":
124 execute('./data')