social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add settings

zenfyr.dev ed796641 bfd693d6

verified
+30 -12
main.py
···
+
import asyncio
+
import json
import queue
import threading
from pathlib import Path
-
from time import sleep
from typing import Callable
import env
from database.migrations import DatabaseMigrator
-
from util.util import LOGGER
+
from registry import create_input_service, create_output_service
+
from registry_bootstrap import bootstrap
+
from util.util import LOGGER, read_env
def main() -> None:
···
if not data.exists():
data.mkdir(parents=True)
-
settings = data.joinpath("settings.json")
-
database = data.joinpath("db.sqlite")
+
settings_path = data.joinpath("settings.json")
+
database_path = data.joinpath("db.sqlite")
-
if not settings.exists():
-
LOGGER.info("First launch detected! Creating %s and exiting!", settings)
+
if not settings_path.exists():
+
LOGGER.info("First launch detected! Creating %s and exiting!", settings_path)
return
-
LOGGER.info("Loading settings...")
-
# TODO
-
-
migrator = DatabaseMigrator(database, Path(env.MIGRATIONS_DIR))
+
migrator = DatabaseMigrator(database_path, Path(env.MIGRATIONS_DIR))
try:
migrator.migrate()
except Exception:
···
finally:
migrator.close()
+
LOGGER.info("Bootstrapping registries...")
+
bootstrap()
+
+
LOGGER.info("Loading settings...")
+
+
with open(settings_path) as f:
+
settings = json.load(f)
+
read_env(settings)
+
+
if "input" not in settings:
+
raise KeyError("No `input` sepcified in settings!")
+
if "outputs" not in settings:
+
raise KeyError("No `outputs` spicified in settings!")
+
+
input = create_input_service(database_path, settings["input"])
+
outputs = [
+
create_output_service(database_path, data) for data in settings["outputs"]
+
]
+
LOGGER.info("Starting task worker...")
def worker(task_queue: queue.Queue[Callable[[], None] | None]):
···
LOGGER.info("Connecting to %s...", "TODO") # TODO
try:
-
task_queue.put(lambda: print("hi"))
-
sleep(10) # TODO
+
asyncio.run(input.listen(outputs, lambda c: task_queue.put(c)))
except KeyboardInterrupt:
LOGGER.info("Stopping...")
+31
registry.py
···
+
from pathlib import Path
+
from typing import Any, Callable
+
+
from cross.service import InputService, OutputService
+
+
input_factories: dict[str, Callable[[Path, dict[str, Any]], InputService]] = {}
+
output_factories: dict[str, Callable[[Path, dict[str, Any]], OutputService]] = {}
+
+
+
def create_input_service(db: Path, data: dict[str, Any]) -> InputService:
+
if "type" not in data:
+
raise ValueError("No `type` field in input data!")
+
type: str = str(data["type"])
+
del data["type"]
+
+
factory = input_factories.get(type)
+
if not factory:
+
raise KeyError(f"No such input service {type}!")
+
return factory(db, data)
+
+
+
def create_output_service(db: Path, data: dict[str, Any]) -> OutputService:
+
if "type" not in data:
+
raise ValueError("No `type` field in input data!")
+
type: str = str(data["type"])
+
del data["type"]
+
+
factory = output_factories.get(type)
+
if not factory:
+
raise KeyError(f"No such output service {type}!")
+
return factory(db, data)
+28
registry_bootstrap.py
···
+
from pathlib import Path
+
from typing import Any
+
+
from registry import input_factories, output_factories
+
+
+
class LazyFactory:
+
def __init__(self, module_path: str, class_name: str, options_class_name: str):
+
self.module_path: str = module_path
+
self.class_name: str = class_name
+
self.options_class_name: str = options_class_name
+
+
def __call__(self, db: Path, d: dict[str, Any]):
+
module = __import__(
+
self.module_path, fromlist=[self.class_name, self.options_class_name]
+
)
+
service_class = getattr(module, self.class_name)
+
options_class = getattr(module, self.options_class_name)
+
return service_class(db, options_class.from_dict(d))
+
+
+
def bootstrap():
+
input_factories["mastodon-wss"] = LazyFactory(
+
"mastodon.input", "MastodonInputService", "MastodonInputOptions"
+
)
+
input_factories["misskey-wss"] = LazyFactory(
+
"misskey.input", "MisskeyInputService", "MisskeyInputOptions"
+
)
+19
util/util.py
···
import logging
import sys
+
import os
+
from typing import Any
logging.basicConfig(stream=sys.stderr, level=logging.INFO)
LOGGER = logging.getLogger("XPost")
+
+
def read_env(data: dict[str, Any]) -> None:
+
keys = list(data.keys())
+
for key in keys:
+
val = data[key]
+
match val:
+
case str():
+
if val.startswith('env:'):
+
envval = os.environ.get(val[4:])
+
if envval is None:
+
del data[key]
+
else:
+
data[key] = envval
+
case dict():
+
read_env(val)
+
case _:
+
pass