import logging import sqlite3 from abc import ABC, abstractmethod from typing import Any, Callable, cast from cross.post import Post from database.connection import DatabasePool columns: list[str] = [ "user", "service", "identifier", "parent", "root", "reposted", "extra_data", ] placeholders: str = ", ".join(["?" for _ in columns]) column_names: str = ", ".join(columns) class Service: def __init__(self, url: str, db: DatabasePool) -> None: self.url: str = url self.db: DatabasePool = db self.log: logging.Logger = logging.getLogger(self.__class__.__name__) # self._lock: threading.Lock = threading.Lock() def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None: cursor = self.db.get_conn().cursor() _ = cursor.execute( """ SELECT * FROM posts WHERE service = ? AND user = ? AND identifier = ? """, (url, user, identifier), ) return cast(sqlite3.Row, cursor.fetchone()) def _get_post_by_id(self, id: int) -> sqlite3.Row | None: cursor = self.db.get_conn().cursor() _ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,)) return cast(sqlite3.Row, cursor.fetchone()) def _get_mappings( self, original: int, service: str, user: str ) -> list[sqlite3.Row]: cursor = self.db.get_conn().cursor() _ = cursor.execute( """ SELECT * FROM posts AS p JOIN mappings AS m ON p.id = m.mapped WHERE m.original = ? AND p.service = ? AND p.user = ? ORDER BY p.id; """, (original, service, user), ) return cursor.fetchall() def _find_mapped_thread( self, parent: str, iservice: str, iuser: str, oservice: str, ouser: str ): reply_data = self._get_post(iservice, iuser, parent) if not reply_data: return None reply_mappings: list[sqlite3.Row] | None = self._get_mappings( reply_data["id"], oservice, ouser ) if not reply_mappings: return None reply_identifier: sqlite3.Row = reply_mappings[-1] root_identifier: sqlite3.Row = reply_mappings[0] if reply_data["root_id"]: root_data = self._get_post_by_id(reply_data["root_id"]) if not root_data: return None root_mappings = self._get_mappings(reply_data["root_id"], oservice, ouser) if not root_mappings: return None root_identifier = root_mappings[0] return ( root_identifier[0], # real ids reply_identifier[0], reply_data["root_id"], # db ids reply_data["id"], ) def _insert_post(self, post_data: dict[str, Any]): values = [post_data.get(col) for col in columns] cursor = self.db.get_conn().cursor() _ = cursor.execute( f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values ) def _insert_post_mapping(self, original: int, mapped: int): cursor = self.db.get_conn().cursor() _ = cursor.execute( "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", (original, mapped), ) _ = cursor.execute( "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", (mapped, original), ) def _delete_post(self, url: str, user: str, identifier: str): cursor = self.db.get_conn().cursor() _ = cursor.execute( """ DELETE FROM posts WHERE identifier = ? AND service = ? AND user = ? """, (identifier, url, user), ) def _delete_post_by_id(self, id: int): cursor = self.db.get_conn().cursor() _ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,)) class OutputService(Service): def accept_post(self, service: str, user: str, post: Post): self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id) def delete_post(self, service: str, user: str, post_id: str): self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id) def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str): self.log.warning( "NOT IMPLEMENTED (%s), accept_repost %s of %s", self.url, repost_id, reposted_id, ) def delete_repost(self, service: str, user: str, repost_id: str): self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id) class InputService(ABC, Service): outputs: list[OutputService] submitter: Callable[[Callable[[], None]], None] @abstractmethod async def listen(self): pass