social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import logging
2import sqlite3
3from abc import ABC, abstractmethod
4from typing import Any, Callable, cast
5
6from cross.post import Post
7from database.connection import DatabasePool
8
9columns: list[str] = [
10 "user",
11 "service",
12 "identifier",
13 "parent",
14 "root",
15 "reposted",
16 "extra_data",
17]
18placeholders: str = ", ".join(["?" for _ in columns])
19column_names: str = ", ".join(columns)
20
21
22class Service:
23 def __init__(self, url: str, db: DatabasePool) -> None:
24 self.url: str = url
25 self.db: DatabasePool = db
26 self.log: logging.Logger = logging.getLogger(self.__class__.__name__)
27 # self._lock: threading.Lock = threading.Lock()
28
29 def _get_post(self, url: str, user: str, identifier: str) -> sqlite3.Row | None:
30 cursor = self.db.get_conn().cursor()
31 _ = cursor.execute(
32 """
33 SELECT * FROM posts
34 WHERE service = ?
35 AND user = ?
36 AND identifier = ?
37 """,
38 (url, user, identifier),
39 )
40 return cast(sqlite3.Row, cursor.fetchone())
41
42 def _get_post_by_id(self, id: int) -> sqlite3.Row | None:
43 cursor = self.db.get_conn().cursor()
44 _ = cursor.execute("SELECT * FROM posts WHERE id = ?", (id,))
45 return cast(sqlite3.Row, cursor.fetchone())
46
47 def _get_mappings(
48 self, original: int, service: str, user: str
49 ) -> list[sqlite3.Row]:
50 cursor = self.db.get_conn().cursor()
51 _ = cursor.execute(
52 """
53 SELECT *
54 FROM posts AS p
55 JOIN mappings AS m
56 ON p.id = m.mapped
57 WHERE m.original = ?
58 AND p.service = ?
59 AND p.user = ?
60 ORDER BY p.id;
61 """,
62 (original, service, user),
63 )
64 return cursor.fetchall()
65
66 def _find_mapped_thread(
67 self, parent: str, iservice: str, iuser: str, oservice: str, ouser: str
68 ):
69 reply_data = self._get_post(iservice, iuser, parent)
70 if not reply_data:
71 return None
72
73 reply_mappings: list[sqlite3.Row] | None = self._get_mappings(
74 reply_data["id"], oservice, ouser
75 )
76 if not reply_mappings:
77 return None
78
79 reply_identifier: sqlite3.Row = reply_mappings[-1]
80 root_identifier: sqlite3.Row = reply_mappings[0]
81
82 if reply_data["root_id"]:
83 root_data = self._get_post_by_id(reply_data["root_id"])
84 if not root_data:
85 return None
86
87 root_mappings = self._get_mappings(reply_data["root_id"], oservice, ouser)
88 if not root_mappings:
89 return None
90 root_identifier = root_mappings[0]
91
92 return (
93 root_identifier[0], # real ids
94 reply_identifier[0],
95 reply_data["root_id"], # db ids
96 reply_data["id"],
97 )
98
99 def _insert_post(self, post_data: dict[str, Any]):
100 values = [post_data.get(col) for col in columns]
101 cursor = self.db.get_conn().cursor()
102 _ = cursor.execute(
103 f"INSERT INTO posts ({column_names}) VALUES ({placeholders})", values
104 )
105
106 def _insert_post_mapping(self, original: int, mapped: int):
107 cursor = self.db.get_conn().cursor()
108 _ = cursor.execute(
109 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);",
110 (original, mapped),
111 )
112 _ = cursor.execute(
113 "INSERT OR IGNORE INTO mappings (original, mapped) VALUES (?, ?);",
114 (mapped, original),
115 )
116
117 def _delete_post(self, url: str, user: str, identifier: str):
118 cursor = self.db.get_conn().cursor()
119 _ = cursor.execute(
120 """
121 DELETE FROM posts
122 WHERE identifier = ?
123 AND service = ?
124 AND user = ?
125 """,
126 (identifier, url, user),
127 )
128
129 def _delete_post_by_id(self, id: int):
130 cursor = self.db.get_conn().cursor()
131 _ = cursor.execute("DELETE FROM posts WHERE id = ?", (id,))
132
133
134class OutputService(Service):
135 def accept_post(self, service: str, user: str, post: Post):
136 self.log.warning("NOT IMPLEMENTED (%s), accept_post %s", self.url, post.id)
137
138 def delete_post(self, service: str, user: str, post_id: str):
139 self.log.warning("NOT IMPLEMENTED (%s), delete_post %s", self.url, post_id)
140
141 def accept_repost(self, service: str, user: str, repost_id: str, reposted_id: str):
142 self.log.warning(
143 "NOT IMPLEMENTED (%s), accept_repost %s of %s",
144 self.url,
145 repost_id,
146 reposted_id,
147 )
148
149 def delete_repost(self, service: str, user: str, repost_id: str):
150 self.log.warning("NOT IMPLEMENTED (%s), delete_repost %s", self.url, repost_id)
151
152
153class InputService(ABC, Service):
154 outputs: list[OutputService]
155 submitter: Callable[[Callable[[], None]], None]
156
157 @abstractmethod
158 async def listen(self):
159 pass