social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import asyncio 2import json 3import os 4import queue 5import threading 6import traceback 7 8import cross 9import util.database as database 10from bluesky.input import BlueskyJetstreamInput 11from bluesky.output import BlueskyOutput, BlueskyOutputOptions 12from mastodon.input import MastodonInput, MastodonInputOptions 13from mastodon.output import MastodonOutput 14from misskey.input import MisskeyInput 15from util.util import LOGGER, as_json 16 17DEFAULT_SETTINGS: dict = { 18 "input": { 19 "type": "mastodon-wss", 20 "instance": "env:MASTODON_INSTANCE", 21 "token": "env:MASTODON_TOKEN", 22 "options": MastodonInputOptions({}), 23 }, 24 "outputs": [ 25 { 26 "type": "bluesky", 27 "handle": "env:BLUESKY_HANDLE", 28 "app-password": "env:BLUESKY_APP_PASSWORD", 29 "options": BlueskyOutputOptions({}), 30 } 31 ], 32} 33 34INPUTS = { 35 "mastodon-wss": lambda settings, db: MastodonInput(settings, db), 36 "misskey-wss": lambda settigs, db: MisskeyInput(settigs, db), 37 "bluesky-jetstream-wss": lambda settings, db: BlueskyJetstreamInput(settings, db), 38} 39 40OUTPUTS = { 41 "bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db), 42 "mastodon": lambda input, settings, db: MastodonOutput(input, settings, db), 43} 44 45 46def execute(data_dir): 47 if not os.path.exists(data_dir): 48 os.makedirs(data_dir) 49 50 settings_path = os.path.join(data_dir, "settings.json") 51 database_path = os.path.join(data_dir, "data.db") 52 53 if not os.path.exists(settings_path): 54 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) 55 56 with open(settings_path, "w") as f: 57 f.write(as_json(DEFAULT_SETTINGS, indent=2)) 58 return 0 59 60 LOGGER.info("Loading settings...") 61 with open(settings_path, "rb") as f: 62 settings = json.load(f) 63 64 LOGGER.info("Starting database worker...") 65 db_worker = database.DataBaseWorker(os.path.abspath(database_path)) 66 67 db_worker.execute("PRAGMA foreign_keys = ON;") 68 69 # create the posts table 70 # id - internal id of the post 71 # user_id - user id on the service (e.g. a724sknj5y9ydk0w) 72 # service - the service (e.g. https://shrimp.melontini.me) 73 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) 74 # parent_id - the internal id of the parent 75 db_worker.execute( 76 """ 77 CREATE TABLE IF NOT EXISTS posts ( 78 id INTEGER PRIMARY KEY AUTOINCREMENT, 79 user_id TEXT NOT NULL, 80 service TEXT NOT NULL, 81 identifier TEXT NOT NULL, 82 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, 83 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 84 ); 85 """ 86 ) 87 88 columns = db_worker.execute("PRAGMA table_info(posts)") 89 column_names = [col[1] for col in columns] 90 if "reposted_id" not in column_names: 91 db_worker.execute(""" 92 ALTER TABLE posts 93 ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 94 """) 95 if "extra_data" not in column_names: 96 db_worker.execute(""" 97 ALTER TABLE posts 98 ADD COLUMN extra_data TEXT NULL 99 """) 100 101 # create the mappings table 102 # original_post_id - the post this was mapped from 103 # mapped_post_id - the post this was mapped to 104 db_worker.execute( 105 """ 106 CREATE TABLE IF NOT EXISTS mappings ( 107 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, 108 mapped_post_id INTEGER NOT NULL 109 ); 110 """ 111 ) 112 113 input_settings = settings.get("input") 114 if not input_settings: 115 raise Exception("No input specified!") 116 outputs_settings = settings.get("outputs", []) 117 118 input = INPUTS[input_settings["type"]](input_settings, db_worker) 119 120 if not outputs_settings: 121 LOGGER.warning("No outputs specified! Check the config!") 122 123 outputs: list[cross.Output] = [] 124 for output_settings in outputs_settings: 125 outputs.append( 126 OUTPUTS[output_settings["type"]](input, output_settings, db_worker) 127 ) 128 129 LOGGER.info("Starting task worker...") 130 131 def worker(queue: queue.Queue): 132 while True: 133 task = queue.get() 134 if task is None: 135 break 136 137 try: 138 task() 139 except Exception as e: 140 LOGGER.error(f"Exception in worker thread!\n{e}") 141 traceback.print_exc() 142 finally: 143 queue.task_done() 144 145 task_queue = queue.Queue() 146 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) 147 thread.start() 148 149 LOGGER.info("Connecting to %s...", input.service) 150 try: 151 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) 152 except KeyboardInterrupt: 153 LOGGER.info("Stopping...") 154 155 task_queue.join() 156 task_queue.put(None) 157 thread.join() 158 159 160if __name__ == "__main__": 161 execute("./data")