social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1from util import LOGGER 2import os 3import json 4import database 5import mastodon, bluesky, cross 6import asyncio, threading, queue, traceback 7 8DEFAULT_SETTINGS: dict = { 9 'input': { 10 'type': 'mastodon-wss', 11 'instance': 'env:MASTODON_INSTANCE', 12 'token': 'env:MASTODON_TOKEN', 13 "options": { 14 "allowed_visibility": [ 15 "public", 16 "unlisted" 17 ] 18 } 19 }, 20 'output': [ 21 { 22 'type': 'bluesky', 23 'handle': 'env:BLUESKY_HANDLE', 24 'app-password': 'env:BLUESKY_APP_PASSWORD', 25 'options': { 26 'quote_gate': False, 27 'thread_gate': [ 28 'everybody' 29 ] 30 } 31 } 32 ] 33} 34 35INPUTS = { 36 "mastodon-wss": lambda settings, db: mastodon.MastodonInput(settings, db) 37} 38 39OUTPUTS = { 40 "bluesky": lambda input, settings, db: bluesky.BlueskyOutput(input, settings, db) 41} 42 43def execute(data_dir): 44 settings_path = os.path.join(data_dir, 'settings.json') 45 database_path = os.path.join(data_dir, 'data.db') 46 47 if not os.path.exists(settings_path): 48 LOGGER.info("First launch detected! Creating %s and exiting!", settings_path) 49 50 with open(settings_path, 'w') as f: 51 json.dump(DEFAULT_SETTINGS, f, indent=2) 52 return 0 53 54 LOGGER.info('Loading settings...') 55 with open(settings_path, 'rb') as f: 56 settings = json.load(f) 57 58 LOGGER.info('Starting database worker...') 59 db_worker = database.DataBaseWorker(os.path.abspath(database_path)) 60 61 db_worker.execute('PRAGMA foreign_keys = ON;') 62 63 # create the posts table 64 # id - internal id of the post 65 # user_id - user id on the service (e.g. a724sknj5y9ydk0w) 66 # service - the service (e.g. https://shrimp.melontini.me) 67 # identifier - post id on the service (e.g. a8mpiyeej0fpjp0p) 68 # parent_id - the internal id of the parent 69 db_worker.execute( 70 """ 71 CREATE TABLE IF NOT EXISTS posts ( 72 id INTEGER PRIMARY KEY AUTOINCREMENT, 73 user_id TEXT NOT NULL, 74 service TEXT NOT NULL, 75 identifier TEXT NOT NULL UNIQUE, 76 parent_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL, 77 root_id INTEGER NULL REFERENCES posts(id) ON DELETE SET NULL 78 ); 79 """ 80 ) 81 82 # create the mappings table 83 # original_post_id - the post this was mapped from 84 # mapped_post_id - the post this was mapped to 85 db_worker.execute( 86 """ 87 CREATE TABLE IF NOT EXISTS mappings ( 88 original_post_id INTEGER NOT NULL REFERENCES posts(id) ON DELETE CASCADE, 89 mapped_post_id INTEGER NOT NULL 90 ); 91 """ 92 ) 93 94 input_settings = settings.get('input') 95 if not input_settings: 96 raise Exception("No input specified!") 97 outputs_settings = settings.get('outputs', []) 98 99 input = INPUTS[input_settings['type']](input_settings, db_worker) 100 101 outputs: list[cross.Output] = [] 102 for output_settings in outputs_settings: 103 outputs.append(OUTPUTS[output_settings['type']](input, output_settings, db_worker)) 104 105 LOGGER.info('Starting task worker...') 106 task_queue = queue.Queue() 107 def worker(): 108 while True: 109 task = task_queue.get() 110 if task is None: 111 break 112 try: 113 task() 114 except Exception as e: 115 LOGGER.error(f"Exception in worker thread!\n{e}") 116 traceback.print_exc() 117 thread = threading.Thread(target=worker, daemon=True) 118 thread.start() 119 LOGGER.info('Listening to %s...', input.service) 120 asyncio.run(input.listen(outputs, lambda x: task_queue.put(x))) 121 122 123if __name__ == "__main__": 124 execute('./data')