social media crossposting tool. 3rd time's the charm
mastodon misskey crossposting bluesky
at next 5.0 kB view raw
1import logging 2import sqlite3 3from abc import ABC, abstractmethod 4from typing import Any, Callable, cast 5 6from cross.post import Post 7from database.connection import DatabasePool 8 9columns: list[str] = [ 10 "user", 11 "service", 12 "identifier", 13 "parent", 14 "root", 15 "reposted", 16 "extra_data", 17] 18placeholders: str = ", ".join(["?" for _ in columns]) 19column_names: str = ", ".join(columns) 20 21 22class Service: 23 def __init__(self, url: str, db: DatabasePool) -> None: 24 self.url: str = url 25 self.db: DatabasePool = db 26 self.log: logging.Logger = logging.getLogger(self.__class__.__name__) 27 # self._lock: threading.Lock = threading.Lock() 28 29 def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None: 30 cursor = self.db.get_conn().cursor() 31 _ = cursor.execute( 32 """ 33 SELECT * FROM posts 34 WHERE service = ? 35 AND user = ? 36 AND identifier = ? 37 """, 38 (url, user, identifier), 39 ) 40 return cast(sqlite3.Row, cursor.fetchone()) 41 42 def _get_post_by_id(self, id: int) -> sqlite3.Row | None: 43 cursor = self.db.get_conn().cursor() 44 _ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,)) 45 return cast(sqlite3.Row, cursor.fetchone()) 46 47 def _get_mappings( 48 self, original: int, service: str, user: str 49 ) -> list[sqlite3.Row]: 50 cursor = self.db.get_conn().cursor() 51 _ = cursor.execute( 52 """ 53 SELECT * 54 FROM posts AS p 55 JOIN mappings AS m 56 ON p.id = m.mapped 57 WHERE m.original = ? 58 AND p.service = ? 59 AND p.user = ? 60 ORDER BY p.id; 61 """, 62 (original, service, user), 63 ) 64 return cursor.fetchall() 65 66 def _find_mapped_thread( 67 self, parent: str, iservice: str, iuser: str, oservice: str, ouser: str 68 ): 69 reply_data = self._get_post(iservice, iuser, parent) 70 if not reply_data: 71 return None 72 73 reply_mappings: list[sqlite3.Row] | None = self._get_mappings( 74 reply_data["id"], oservice, ouser 75 ) 76 if not reply_mappings: 77 return None 78 79 reply_identifier: sqlite3.Row = reply_mappings[-1] 80 root_identifier: sqlite3.Row = reply_mappings[0] 81 82 if reply_data["root_id"]: 83 root_data = self._get_post_by_id(reply_data["root_id"]) 84 if not root_data: 85 return None 86 87 root_mappings = self._get_mappings(reply_data["root_id"], oservice, ouser) 88 if not root_mappings: 89 return None 90 root_identifier = root_mappings[0] 91 92 return ( 93 root_identifier[0], # real ids 94 reply_identifier[0], 95 reply_data["root_id"], # db ids 96 reply_data["id"], 97 ) 98 99 def _insert_post(self, post_data: dict[str, Any]): 100 values = [post_data.get(col) for col in columns] 101 cursor = self.db.get_conn().cursor() 102 _ = cursor.execute( 103 f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values 104 ) 105 106 def _insert_post_mapping(self, original: int, mapped: int): 107 cursor = self.db.get_conn().cursor() 108 _ = cursor.execute( 109 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", 110 (original, mapped), 111 ) 112 _ = cursor.execute( 113 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);", 114 (mapped, original), 115 ) 116 117 def _delete_post(self, url: str, user: str, identifier: str): 118 cursor = self.db.get_conn().cursor() 119 _ = cursor.execute( 120 """ 121 DELETE FROM posts 122 WHERE identifier = ? 123 AND service = ? 124 AND user = ? 125 """, 126 (identifier, url, user), 127 ) 128 129 def _delete_post_by_id(self, id: int): 130 cursor = self.db.get_conn().cursor() 131 _ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,)) 132 133 134class OutputService(Service): 135 def accept_post(self, service: str, user: str, post: Post): 136 self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id) 137 138 def delete_post(self, service: str, user: str, post_id: str): 139 self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id) 140 141 def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str): 142 self.log.warning( 143 "NOT IMPLEMENTED (%s), accept_repost %s of %s", 144 self.url, 145 repost_id, 146 reposted_id, 147 ) 148 149 def delete_repost(self, service: str, user: str, repost_id: str): 150 self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id) 151 152 153class InputService(ABC, Service): 154 outputs: list[OutputService] 155 submitter: Callable[[Callable[[], None]], None] 156 157 @abstractmethod 158 async def listen(self): 159 pass