social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import os 2import json 3import asyncio, threading, queue, traceback 4 5from util.util import LOGGER, as_json 6import cross, util.database as database 7 8from bluesky.input import BlueskyJetstreamInput 9from bluesky.output import BlueskyOutputOptions, BlueskyOutput 10 11from mastodon.input import MastodonInputOptions, MastodonInput 12from mastodon.output import MastodonOutput 13 14from misskey.input import MisskeyInput 15 16DEFAULT_SETTINGS: dict = { 17 'input': { 18 'type': 'mastodon-wss', 19 'instance': 'env:MASTODON_INSTANCE', 20 'token': 'env:MASTODON_TOKEN', 21 "options": MastodonInputOptions({}) 22 }, 23 'outputs': [ 24 { 25 'type': 'bluesky', 26 'handle': 'env:BLUESKY_HANDLE', 27 'app-password': 'env:BLUESKY_APP_PASSWORD', 28 'options': BlueskyOutputOptions({}) 29 } 30 ] 31} 32 33INPUTS = { 34 "mastodon-wss": lambda settings, db: MastodonInput(settings, db), 35 "misskey-wss": lambda settigs, db: MisskeyInput(settigs, db), 36 "bluesky-jetstream-wss": lambda settings, db: BlueskyJetstreamInput(settings, db) 37} 38 39OUTPUTS = { 40 "bluesky": lambda input, settings, db: BlueskyOutput(input, settings, db), 41 "mastodon": lambda input, settings, db: MastodonOutput(input, settings, db) 42} 43 44def execute(data_dir): 45 if not os.path.exists(data_dir): 46 os.makedirs(data_dir) 47 48 settings_path = os.path.join(data_dir, 'settings.json') 49 database_path = os.path.join(data_dir, 'data.db') 50 51 if not os.path.exists(settings_path): 52 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) 53 54 with open(settings_path, 'w') as f: 55 f.write(as_json(DEFAULT_SETTINGS, indent=2)) 56 return 0 57 58 LOGGER.info('Loading settings...') 59 with open(settings_path, 'rb') as f: 60 settings = json.load(f) 61 62 LOGGER.info('Starting database worker...') 63 db_worker = database.DataBaseWorker(os.path.abspath(database_path)) 64 65 db_worker.execute('PRAGMA foreign_keys = ON;') 66 67 # create the posts table 68 # id - internal id of the post 69 # user_id - user id on the service (e.g. a724sknj5y9ydk0w) 70 # service - the service (e.g. https://shrimp.melontini.me) 71 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) 72 # parent_id - the internal id of the parent 73 db_worker.execute( 74 """ 75 CREATE TABLE IF NOT EXISTS posts ( 76 id INTEGER PRIMARY KEY AUTOINCREMENT, 77 user_id TEXT NOT NULL, 78 service TEXT NOT NULL, 79 identifier TEXT NOT NULL, 80 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, 81 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 82 ); 83 """ 84 ) 85 86 columns = db_worker.execute("PRAGMA table_info(posts)") 87 column_names = [col[1] for col in columns] 88 if "reposted_id" not in column_names: 89 db_worker.execute(""" 90 ALTER TABLE posts 91 ADD COLUMN reposted_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 92 """) 93 if "extra_data" not in column_names: 94 db_worker.execute(""" 95 ALTER TABLE posts 96 ADD COLUMN extra_data TEXT NULL 97 """) 98 99 # create the mappings table 100 # original_post_id - the post this was mapped from 101 # mapped_post_id - the post this was mapped to 102 db_worker.execute( 103 """ 104 CREATE TABLE IF NOT EXISTS mappings ( 105 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, 106 mapped_post_id INTEGER NOT NULL 107 ); 108 """ 109 ) 110 111 input_settings = settings.get('input') 112 if not input_settings: 113 raise Exception("No input specified!") 114 outputs_settings = settings.get('outputs', []) 115 116 input = INPUTS[input_settings['type']](input_settings, db_worker) 117 118 if not outputs_settings: 119 LOGGER.warning("No outputs specified! Check the config!") 120 121 outputs: list[cross.Output] = [] 122 for output_settings in outputs_settings: 123 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker)) 124 125 LOGGER.info('Starting task worker...') 126 def worker(queue: queue.Queue): 127 while True: 128 task = queue.get() 129 if task is None: 130 break 131 132 try: 133 task() 134 except Exception as e: 135 LOGGER.error(f"Exception in worker thread!\n{e}") 136 traceback.print_exc() 137 finally: 138 queue.task_done() 139 140 task_queue = queue.Queue() 141 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) 142 thread.start() 143 144 LOGGER.info('Connecting to %s...', input.service) 145 try: 146 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) 147 except KeyboardInterrupt: 148 LOGGER.info("Stopping...") 149 150 task_queue.join() 151 task_queue.put(None) 152 thread.join() 153 154 155if __name__ == "__main__": 156 execute('./data')