social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import os 3import json 4import database 5import mastodon, misskey, bluesky, cross 6import asyncio, threading, queue, traceback 7import util 8 9DEFAULT_SETTINGS: dict = { 10 'input': { 11 'type': 'mastodon-wss', 12 'instance': 'env:MASTODON_INSTANCE', 13 'token': 'env:MASTODON_TOKEN', 14 "options": mastodon.MastodonInputOptions({}) 15 }, 16 'outputs': [ 17 { 18 'type': 'bluesky', 19 'handle': 'env:BLUESKY_HANDLE', 20 'app-password': 'env:BLUESKY_APP_PASSWORD', 21 'options': bluesky.BlueskyOutputOptions({}) 22 } 23 ] 24} 25 26INPUTS = { 27 "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db), 28 "misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db), 29 "bluesky-pds-wss": lambda settings, db: bluesky.BlueskyPdsInput(settings, db) 30} 31 32OUTPUTS = { 33 "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db), 34 "mastodon": lambda input, settings, db: mastodon.MastodonOutput(input, settings, db) 35} 36 37def execute(data_dir): 38 if not os.path.exists(data_dir): 39 os.makedirs(data_dir) 40 41 settings_path = os.path.join(data_dir, 'settings.json') 42 database_path = os.path.join(data_dir, 'data.db') 43 44 if not os.path.exists(settings_path): 45 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) 46 47 with open(settings_path, 'w') as f: 48 f.write(util.as_json(DEFAULT_SETTINGS, indent=2)) 49 return 0 50 51 LOGGER.info('Loading settings...') 52 with open(settings_path, 'rb') as f: 53 settings = json.load(f) 54 55 LOGGER.info('Starting database worker...') 56 db_worker = database.DataBaseWorker(os.path.abspath(database_path)) 57 58 db_worker.execute('PRAGMA foreign_keys = ON;') 59 60 # create the posts table 61 # id - internal id of the post 62 # user_id - user id on the service (e.g. a724sknj5y9ydk0w) 63 # service - the service (e.g. https://shrimp.melontini.me) 64 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) 65 # parent_id - the internal id of the parent 66 db_worker.execute( 67 """ 68 CREATE TABLE IF NOT EXISTS posts ( 69 id INTEGER PRIMARY KEY AUTOINCREMENT, 70 user_id TEXT NOT NULL, 71 service TEXT NOT NULL, 72 identifier TEXT NOT NULL UNIQUE, 73 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, 74 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 75 ); 76 """ 77 ) 78 79 # create the mappings table 80 # original_post_id - the post this was mapped from 81 # mapped_post_id - the post this was mapped to 82 db_worker.execute( 83 """ 84 CREATE TABLE IF NOT EXISTS mappings ( 85 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, 86 mapped_post_id INTEGER NOT NULL 87 ); 88 """ 89 ) 90 91 input_settings = settings.get('input') 92 if not input_settings: 93 raise Exception("No input specified!") 94 outputs_settings = settings.get('outputs', []) 95 96 input = INPUTS[input_settings['type']](input_settings, db_worker) 97 98 if not outputs_settings: 99 LOGGER.warning("No outputs specified! Check the config!") 100 101 outputs: list[cross.Output] = [] 102 for output_settings in outputs_settings: 103 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker)) 104 105 LOGGER.info('Starting task worker...') 106 def worker(queue: queue.Queue): 107 while True: 108 task = queue.get() 109 if task is None: 110 break 111 112 try: 113 task() 114 except Exception as e: 115 LOGGER.error(f"Exception in worker thread!\n{e}") 116 traceback.print_exc() 117 finally: 118 queue.task_done() 119 120 task_queue = queue.Queue() 121 thread = threading.Thread(target=worker, args=(task_queue,), daemon=True) 122 thread.start() 123 124 LOGGER.info('Connecting to %s...', input.service) 125 try: 126 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) 127 except KeyboardInterrupt: 128 LOGGER.info("Stopping...") 129 130 task_queue.join() 131 task_queue.put(None) 132 thread.join() 133 134 135if __name__ == "__main__": 136 execute('./data')