social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import os 3import json 4import database 5import mastodon, misskey, bluesky, cross 6import asyncio, threading, queue, traceback 7 8DEFAULT_SETTINGS: dict = { 9 'input': { 10 'type': 'mastodon-wss', 11 'instance': 'env:MASTODON_INSTANCE', 12 'token': 'env:MASTODON_TOKEN', 13 "options": { 14 "allowed_visibility": [ 15 "public", 16 "unlisted" 17 ] 18 } 19 }, 20 'output': [ 21 { 22 'type': 'bluesky', 23 'handle': 'env:BLUESKY_HANDLE', 24 'app-password': 'env:BLUESKY_APP_PASSWORD', 25 'options': { 26 'quote_gate': False, 27 'thread_gate': [ 28 'everybody' 29 ] 30 } 31 } 32 ] 33} 34 35INPUTS = { 36 "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db), 37 "misskey-wss": lambda settigs, db: misskey.MisskeyInput(settigs, db) 38} 39 40OUTPUTS = { 41 "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db) 42} 43 44def execute(data_dir): 45 if not os.path.exists(data_dir): 46 os.makedirs(data_dir) 47 48 settings_path = os.path.join(data_dir, 'settings.json') 49 database_path = os.path.join(data_dir, 'data.db') 50 51 if not os.path.exists(settings_path): 52 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) 53 54 with open(settings_path, 'w') as f: 55 json.dump(DEFAULT_SETTINGS, f, indent=2) 56 return 0 57 58 LOGGER.info('Loading settings...') 59 with open(settings_path, 'rb') as f: 60 settings = json.load(f) 61 62 LOGGER.info('Starting database worker...') 63 db_worker = database.DataBaseWorker(os.path.abspath(database_path)) 64 65 db_worker.execute('PRAGMA foreign_keys = ON;') 66 67 # create the posts table 68 # id - internal id of the post 69 # user_id - user id on the service (e.g. a724sknj5y9ydk0w) 70 # service - the service (e.g. https://shrimp.melontini.me) 71 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) 72 # parent_id - the internal id of the parent 73 db_worker.execute( 74 """ 75 CREATE TABLE IF NOT EXISTS posts ( 76 id INTEGER PRIMARY KEY AUTOINCREMENT, 77 user_id TEXT NOT NULL, 78 service TEXT NOT NULL, 79 identifier TEXT NOT NULL UNIQUE, 80 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, 81 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 82 ); 83 """ 84 ) 85 86 # create the mappings table 87 # original_post_id - the post this was mapped from 88 # mapped_post_id - the post this was mapped to 89 db_worker.execute( 90 """ 91 CREATE TABLE IF NOT EXISTS mappings ( 92 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, 93 mapped_post_id INTEGER NOT NULL 94 ); 95 """ 96 ) 97 98 input_settings = settings.get('input') 99 if not input_settings: 100 raise Exception("No input specified!") 101 outputs_settings = settings.get('outputs', []) 102 103 input = INPUTS[input_settings['type']](input_settings, db_worker) 104 105 outputs: list[cross.Output] = [] 106 for output_settings in outputs_settings: 107 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker)) 108 109 LOGGER.info('Starting task worker...') 110 task_queue = queue.Queue() 111 def worker(): 112 while True: 113 task = task_queue.get() 114 if task is None: 115 break 116 try: 117 task() 118 except Exception as e: 119 LOGGER.error(f"Exception in worker thread!\n{e}") 120 traceback.print_exc() 121 thread = threading.Thread(target=worker, daemon=True) 122 thread.start() 123 LOGGER.info('Connecting to %s...', input.service) 124 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) 125 126 127if __name__ == "__main__": 128 execute('./data')