import sqlite3 from abc import ABC, abstractmethod from typing import Any, Callable, cast import logging from cross.post import Post from database.connection import DatabasePool columns: list[str] = [ "user", "service", "identifier", "parent", "root", "reposted", "extra_data", ] placeholders: str = ", ".join(["?" for _ in columns]) column_names: str = ", ".join(columns) class Service: def __init__(self, url: str, db: DatabasePool) -> None: self.url: str = url self.db: DatabasePool = db self.log: logging.Logger = logging.getLogger(self.__class__.__name__) # self._lock: threading.Lock = threading.Lock() def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None: cursor = self.db.get_conn().cursor() _ = cursor.execute( """ SELECT * FROM posts WHERE service = ? AND user = ? AND identifier = ? """, (url, user, identifier), ) return cast(sqlite3.Row, cursor.fetchone()) def _get_post_by_id(self, id: int) -> sqlite3.Row | None: cursor = self.db.get_conn().cursor() _ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,)) return cast(sqlite3.Row, cursor.fetchone()) def _insert_post(self, post_data: dict[str, Any]): values = [post_data.get(col) for col in columns] cursor = self.db.get_conn().cursor() _ = cursor.execute( f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values ) def _insert_post_mapping(self, original: int, mapped: int): cursor = self.db.get_conn().cursor() _ = cursor.execute( "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", (original, mapped), ) _ = cursor.execute( "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", (mapped, original), ) def _delete_post(self, url: str, user: str, identifier: str): cursor = self.db.get_conn().cursor() _ = cursor.execute( """ DELETE FROM posts WHERE identifier = ? AND service = ? AND user = ? """, (identifier, url, user), ) def _delete_post_by_id(self, id: int): cursor = self.db.get_conn().cursor() _ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,)) class OutputService(Service): def accept_post(self, post: Post): self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id) def delete_post(self, post_id: str): self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id) def accept_repost(self, repost_id: str, reposted_id: str): self.log.warning( "NOT IMPLEMENTED (%s), accept_repost %s of %s", self.url, repost_id, reposted_id, ) def delete_repost(self, repost_id: str): self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id) class InputService(ABC, Service): outputs: list[OutputService] submitter: Callable[[Callable[[], None]], None] @abstractmethod async def listen(self): pass