social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 1.1 kB view raw
1from pathlib import Path 2from typing import Any, Callable 3 4from cross.service import InputService, OutputService 5from database.connection import DatabasePool 6 7input_factories: dict[str, Callable[[DatabasePool, dict[str, Any]], InputService]] = {} 8output_factories: dict[str, Callable[[DatabasePool, dict[str, Any]], OutputService]] = {} 9 10 11def create_input_service(db: DatabasePool, data: dict[str, Any]) -> InputService: 12 if "type" not in data: 13 raise ValueError("No `type` field in input data!") 14 type: str = str(data["type"]) 15 del data["type"] 16 17 factory = input_factories.get(type) 18 if not factory: 19 raise KeyError(f"No such input service {type}!") 20 return factory(db, data) 21 22 23def create_output_service(db: DatabasePool, data: dict[str, Any]) -> OutputService: 24 if "type" not in data: 25 raise ValueError("No `type` field in input data!") 26 type: str = str(data["type"]) 27 del data["type"] 28 29 factory = output_factories.get(type) 30 if not factory: 31 raise KeyError(f"No such output service {type}!") 32 return factory(db, data)