social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
1import logging 2import sqlite3 3from abc import ABC, abstractmethod 4from typing import Any, Callable, cast 5 6from cross.post import Post 7from database.connection import DatabasePool 8 9columns: list[str] = [ 10 "user", 11 "service", 12 "identifier", 13 "parent", 14 "root", 15 "reposted", 16 "extra_data", 17] 18placeholders: str = ", ".join(["?" for _ in columns]) 19column_names: str = ", ".join(columns) 20 21 22class Service: 23 def __init__(self, url: str, db: DatabasePool) -> None: 24 self.url: str = url 25 self.db: DatabasePool = db 26 self.log: logging.Logger = logging.getLogger(self.__class__.__name__) 27 # self._lock: threading.Lock = threading.Lock() 28 29 def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None: 30 cursor = self.db.get_conn().cursor() 31 _ = cursor.execute( 32 """ 33 SELECT * FROM posts 34 WHERE service = ? 35 AND user = ? 36 AND identifier = ? 37 """, 38 (url, user, identifier), 39 ) 40 return cast(sqlite3.Row, cursor.fetchone()) 41 42 def _get_post_by_id(self, id: int) -> sqlite3.Row | None: 43 cursor = self.db.get_conn().cursor() 44 _ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,)) 45 return cast(sqlite3.Row, cursor.fetchone()) 46 47 def _get_mappings( 48 self, original: int, service: str, user: str 49 ) -> list[sqlite3.Row]: 50 cursor = self.db.get_conn().cursor() 51 _ = cursor.execute( 52 """ 53 SELECT * 54 FROM posts AS p 55 JOIN mappings AS m 56 ON p.id = m.mapped 57 WHERE m.original = ? 58 AND p.service = ? 59 AND p.user = ? 60 ORDER BY p.id; 61 """, 62 (original, service, user), 63 ) 64 return cursor.fetchall() 65 66 def _find_mapped_thread( 67 self, parent: str, iservice: str, iuser: str, oservice: str, ouser: str 68 ): 69 reply_data = self._get_post(iservice, iuser, parent) 70 if not reply_data: 71 return None 72 73 reply_mappings: list[sqlite3.Row] | None = self._get_mappings( 74 reply_data["id"], oservice, ouser 75 ) 76 if not reply_mappings: 77 return None 78 79 reply_identifier: sqlite3.Row = reply_mappings[-1] 80 root_identifier: sqlite3.Row = reply_mappings[0] 81 82 if reply_data["root_id"]: 83 root_data = self._get_post_by_id(reply_data["root_id"]) 84 if not root_data: 85 return None 86 87 root_mappings = self._get_mappings(reply_data["root_id"], oservice, ouser) 88 if not root_mappings: 89 return None 90 root_identifier = root_mappings[0] 91 92 return ( 93 root_identifier[0], # real ids 94 reply_identifier[0], 95 reply_data["root_id"], # db ids 96 reply_data["id"], 97 ) 98 99 def _insert_post(self, post_data: dict[str, Any]): 100 values = [post_data.get(col) for col in columns] 101 cursor = self.db.get_conn().cursor() 102 _ = cursor.execute( 103 f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values 104 ) 105 106 def _insert_post_mapping(self, original: int, mapped: int): 107 cursor = self.db.get_conn().cursor() 108 _ = cursor.execute( 109 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", 110 (original, mapped), 111 ) 112 _ = cursor.execute( 113 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", 114 (mapped, original), 115 ) 116 117 def _delete_post(self, url: str, user: str, identifier: str): 118 cursor = self.db.get_conn().cursor() 119 _ = cursor.execute( 120 """ 121 DELETE FROM posts 122 WHERE identifier = ? 123 AND service = ? 124 AND user = ? 125 """, 126 (identifier, url, user), 127 ) 128 129 def _delete_post_by_id(self, id: int): 130 cursor = self.db.get_conn().cursor() 131 _ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,)) 132 133 134class OutputService(Service): 135 def accept_post(self, service: str, user: str, post: Post): 136 self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id) 137 138 def delete_post(self, service: str, user: str, post_id: str): 139 self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id) 140 141 def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str): 142 self.log.warning( 143 "NOT IMPLEMENTED (%s), accept_repost %s of %s", 144 self.url, 145 repost_id, 146 reposted_id, 147 ) 148 149 def delete_repost(self, service: str, user: str, repost_id: str): 150 self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id) 151 152 153class InputService(ABC, Service): 154 outputs: list[OutputService] 155 submitter: Callable[[Callable[[], None]], None] 156 157 @abstractmethod 158 async def listen(self): 159 pass