social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import sqlite3 2from abc import ABC, abstractmethod 3from typing import Any, Callable, cast 4import logging 5 6from cross.post import Post 7from database.connection import DatabasePool 8 9columns: list[str] = [ 10 "user", 11 "service", 12 "identifier", 13 "parent", 14 "root", 15 "reposted", 16 "extra_data", 17] 18placeholders: str = ", ".join(["?" for _ in columns]) 19column_names: str = ", ".join(columns) 20 21 22class Service: 23 def __init__(self, url: str, db: DatabasePool) -> None: 24 self.url: str = url 25 self.db: DatabasePool = db 26 self.log: logging.Logger = logging.getLogger(self.__class__.__name__) 27 # self._lock: threading.Lock = threading.Lock() 28 29 def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None: 30 cursor = self.db.get_conn().cursor() 31 _ = cursor.execute( 32 """ 33 SELECT * FROM posts 34 WHERE service = ? 35 AND user = ? 36 AND identifier = ? 37 """, 38 (url, user, identifier), 39 ) 40 return cast(sqlite3.Row, cursor.fetchone()) 41 42 def _get_post_by_id(self, id: int) -> sqlite3.Row | None: 43 cursor = self.db.get_conn().cursor() 44 _ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,)) 45 return cast(sqlite3.Row, cursor.fetchone()) 46 47 def _insert_post(self, post_data: dict[str, Any]): 48 values = [post_data.get(col) for col in columns] 49 cursor = self.db.get_conn().cursor() 50 _ = cursor.execute( 51 f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values 52 ) 53 54 def _insert_post_mapping(self, original: int, mapped: int): 55 cursor = self.db.get_conn().cursor() 56 _ = cursor.execute( 57 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", 58 (original, mapped), 59 ) 60 _ = cursor.execute( 61 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", 62 (mapped, original), 63 ) 64 65 def _delete_post(self, url: str, user: str, identifier: str): 66 cursor = self.db.get_conn().cursor() 67 _ = cursor.execute( 68 """ 69 DELETE FROM posts 70 WHERE identifier = ? 71 AND service = ? 72 AND user = ? 73 """, 74 (identifier, url, user), 75 ) 76 77 def _delete_post_by_id(self, id: int): 78 cursor = self.db.get_conn().cursor() 79 _ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,)) 80 81 82class OutputService(Service): 83 def accept_post(self, post: Post): 84 self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id) 85 86 def delete_post(self, post_id: str): 87 self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id) 88 89 def accept_repost(self, repost_id: str, reposted_id: str): 90 self.log.warning( 91 "NOT IMPLEMENTED (%s), accept_repost %s of %s", 92 self.url, 93 repost_id, 94 reposted_id, 95 ) 96 97 def delete_repost(self, repost_id: str): 98 self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id) 99 100 101class InputService(ABC, Service): 102 outputs: list[OutputService] 103 submitter: Callable[[Callable[[], None]], None] 104 105 @abstractmethod 106 async def listen(self): 107 pass