social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky

add dumb cache

zenfyr.dev cc2ac65d d1c38c76

verified
Changed files
+70 -12
atproto
util
+35 -7
atproto/identity.py
···
-
from typing import Any
+
from pathlib import Path
+
from typing import Any, override
import dns.resolver
import requests
import env
-
from util.cache import TTLCache
-
from util.util import LOGGER, normalize_service_url
+
from util.cache import Cacheable, TTLCache
+
from util.util import LOGGER, normalize_service_url, shutdown_hook
-
class DidDocument:
+
class DidDocument():
def __init__(self, raw_doc: dict[str, Any]) -> None:
self.raw: dict[str, Any] = raw_doc
self.atproto_pds: str | None = None
···
return None
-
class DidResolver:
+
class DidResolver(Cacheable):
def __init__(self, plc_host: str) -> None:
self.plc_host: str = plc_host
self.__cache: TTLCache[str, DidDocument] = TTLCache(ttl_seconds=12 * 60 * 60)
···
return from_web
raise Exception(f"Failed to resolve {did}!")
+
@override
+
def dump_cache(self, path: Path):
+
self.__cache.dump_cache(path)
-
class HandleResolver:
+
@override
+
def load_cache(self, path: Path):
+
self.__cache.load_cache(path)
+
+
class HandleResolver(Cacheable):
def __init__(self) -> None:
-
self.__cache: TTLCache[str, str] = TTLCache()
+
self.__cache: TTLCache[str, str] = TTLCache(ttl_seconds=12 * 60 * 60)
def try_resolve_dns(self, handle: str) -> str | None:
try:
···
raise Exception(f"Failed to resolve handle {handle}!")
+
@override
+
def dump_cache(self, path: Path):
+
self.__cache.dump_cache(path)
+
+
@override
+
def load_cache(self, path: Path):
+
self.__cache.load_cache(path)
+
handle_resolver = HandleResolver()
did_resolver = DidResolver(env.PLC_HOST)
+
+
did_cache = Path(env.CACHE_DIR).joinpath('did.cache')
+
handle_cache = Path(env.CACHE_DIR).joinpath('handle.cache')
+
+
did_resolver.load_cache(did_cache)
+
handle_resolver.load_cache(handle_cache)
+
+
def cache_dump():
+
did_resolver.dump_cache(did_cache)
+
handle_resolver.dump_cache(handle_cache)
+
+
shutdown_hook.append(cache_dump)
+2 -1
env.py
···
DEV = bool(os.environ.get("DEV")) or False
DATA_DIR = os.environ.get("DATA_DIR") or "./data"
+
CACHE_DIR = os.environ.get("CACHE_DIR") or "./data/cache"
MIGRATIONS_DIR = os.environ.get("MIGRATIONS_DIR") or "./migrations"
-
PLC_HOST = os.environ.get("PLC_HOST") or "https://plc.wtf"
+
PLC_HOST = os.environ.get("PLC_HOST") or "https://plc.directory"
+4 -1
main.py
···
from database.migrations import DatabaseMigrator
from registry import create_input_service, create_output_service
from registry_bootstrap import bootstrap
-
from util.util import LOGGER, read_env
+
from util.util import LOGGER, read_env, shutdown_hook
def main() -> None:
···
task_queue.put(None)
thread.join()
db_pool.close()
+
+
for shook in shutdown_hook:
+
shook()
if __name__ == "__main__":
+26 -2
util/cache.py
···
+
from abc import ABC, abstractmethod
+
from pathlib import Path
import time
-
from typing import Generic, TypeVar
+
from typing import Generic, TypeVar, override
+
import pickle
K = TypeVar("K")
V = TypeVar("V")
-
class TTLCache(Generic[K, V]):
+
class Cacheable(ABC):
+
@abstractmethod
+
def dump_cache(self, path: Path):
+
pass
+
+
@abstractmethod
+
def load_cache(self, path: Path):
+
pass
+
+
class TTLCache(Generic[K, V], Cacheable):
def __init__(self, ttl_seconds: int = 3600) -> None:
self.ttl: int = ttl_seconds
self.__cache: dict[K, tuple[V, float]] = {}
···
def clear(self) -> None:
self.__cache.clear()
+
+
@override
+
def dump_cache(self, path: Path) -> None:
+
path.parent.mkdir(parents=True, exist_ok=True)
+
with open(path, 'wb') as f:
+
pickle.dump(self.__cache, f)
+
+
@override
+
def load_cache(self, path: Path):
+
if path.exists():
+
with open(path, 'rb') as f:
+
self.__cache = pickle.load(f)
+3 -1
util/util.py
···
import logging
import sys
import os
-
from typing import Any
+
from typing import Any, Callable
import env
+
+
shutdown_hook: list[Callable[[], None]] = []
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG if env.DEV else logging.INFO)
LOGGER = logging.getLogger("XPost")