social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import json
2import queue
3import sqlite3
4import threading
5from concurrent.futures import Future
6
7
8class DataBaseWorker:
9 def __init__(self, database: str) -> None:
10 super(DataBaseWorker, self).__init__()
11 self.database = database
12 self.queue = queue.Queue()
13 self.thread = threading.Thread(target=self._run, daemon=True)
14 self.shutdown_event = threading.Event()
15 self.conn = sqlite3.connect(self.database, check_same_thread=False)
16 self.lock = threading.Lock()
17 self.thread.start()
18
19 def _run(self):
20 while not self.shutdown_event.is_set():
21 try:
22 task, future = self.queue.get(timeout=1)
23 try:
24 with self.lock:
25 result = task(self.conn)
26 future.set_result(result)
27 except Exception as e:
28 future.set_exception(e)
29 finally:
30 self.queue.task_done()
31 except queue.Empty:
32 continue
33
34 def execute(self, sql: str, params=()):
35 def task(conn: sqlite3.Connection):
36 cursor = conn.execute(sql, params)
37 conn.commit()
38 return cursor.fetchall()
39
40 future = Future()
41 self.queue.put((task, future))
42 return future.result()
43
44 def close(self):
45 self.shutdown_event.set()
46 self.thread.join()
47 with self.lock:
48 self.conn.close()
49
50
51def try_insert_repost(
52 db: DataBaseWorker,
53 post_id: str,
54 reposted_id: str,
55 input_user: str,
56 input_service: str,
57) -> bool:
58 reposted = find_post(db, reposted_id, input_user, input_service)
59 if not reposted:
60 return False
61
62 insert_repost(db, post_id, reposted["id"], input_user, input_service)
63 return True
64
65
66def try_insert_post(
67 db: DataBaseWorker,
68 post_id: str,
69 in_reply: str | None,
70 input_user: str,
71 input_service: str,
72) -> bool:
73 root_id = None
74 parent_id = None
75
76 if in_reply:
77 parent_post = find_post(db, in_reply, input_user, input_service)
78 if not parent_post:
79 return False
80
81 root_id = parent_post["id"]
82 parent_id = root_id
83 if parent_post["root_id"]:
84 root_id = parent_post["root_id"]
85
86 if root_id and parent_id:
87 insert_reply(db, post_id, input_user, input_service, parent_id, root_id)
88 else:
89 insert_post(db, post_id, input_user, input_service)
90
91 return True
92
93
94def insert_repost(
95 db: DataBaseWorker, identifier: str, reposted_id: int, user_id: str, serivce: str
96) -> int:
97 db.execute(
98 """
99 INSERT INTO posts (user_id, service, identifier, reposted_id)
100 VALUES (?, ?, ?, ?);
101 """,
102 (user_id, serivce, identifier, reposted_id),
103 )
104 return db.execute("SELECT last_insert_rowid();", ())[0][0]
105
106
107def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
108 db.execute(
109 """
110 INSERT INTO posts (user_id, service, identifier)
111 VALUES (?, ?, ?);
112 """,
113 (user_id, serivce, identifier),
114 )
115 return db.execute("SELECT last_insert_rowid();", ())[0][0]
116
117
118def insert_reply(
119 db: DataBaseWorker,
120 identifier: str,
121 user_id: str,
122 serivce: str,
123 parent: int,
124 root: int,
125) -> int:
126 db.execute(
127 """
128 INSERT INTO posts (user_id, service, identifier, parent_id, root_id)
129 VALUES (?, ?, ?, ?, ?);
130 """,
131 (user_id, serivce, identifier, parent, root),
132 )
133 return db.execute("SELECT last_insert_rowid();", ())[0][0]
134
135
136def insert_mapping(db: DataBaseWorker, original: int, mapped: int):
137 db.execute(
138 """
139 INSERT INTO mappings (original_post_id, mapped_post_id)
140 VALUES (?, ?);
141 """,
142 (original, mapped),
143 )
144
145
146def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str):
147 db.execute(
148 """
149 DELETE FROM posts
150 WHERE identifier = ?
151 AND service = ?
152 AND user_id = ?
153 """,
154 (identifier, serivce, user_id),
155 )
156
157
158def fetch_data(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict:
159 result = db.execute(
160 """
161 SELECT extra_data
162 FROM posts
163 WHERE identifier = ?
164 AND user_id = ?
165 AND service = ?
166 """,
167 (identifier, user_id, service),
168 )
169 if not result or not result[0]:
170 return {}
171 return json.loads(result[0][0])
172
173
174def store_data(
175 db: DataBaseWorker, identifier: str, user_id: str, service: str, extra_data: dict
176) -> None:
177 db.execute(
178 """
179 UPDATE posts
180 SET extra_data = ?
181 WHERE identifier = ?
182 AND user_id = ?
183 AND service = ?
184 """,
185 (json.dumps(extra_data), identifier, user_id, service),
186 )
187
188
189def find_mappings(
190 db: DataBaseWorker, original_post: int, service: str, user_id: str
191) -> list[str]:
192 return db.execute(
193 """
194 SELECT p.identifier
195 FROM posts AS p
196 JOIN mappings AS m
197 ON p.id = m.mapped_post_id
198 WHERE m.original_post_id = ?
199 AND p.service = ?
200 AND p.user_id = ?
201 ORDER BY p.id;
202 """,
203 (original_post, service, user_id),
204 )
205
206
207def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
208 result = db.execute(
209 """
210 SELECT user_id, service, identifier, parent_id, root_id, reposted_id
211 FROM posts
212 WHERE id = ?
213 """,
214 (id,),
215 )
216 if not result:
217 return None
218 user_id, service, identifier, parent_id, root_id, reposted_id = result[0]
219 return {
220 "user_id": user_id,
221 "service": service,
222 "identifier": identifier,
223 "parent_id": parent_id,
224 "root_id": root_id,
225 "reposted_id": reposted_id,
226 }
227
228
229def find_post(
230 db: DataBaseWorker, identifier: str, user_id: str, service: str
231) -> dict | None:
232 result = db.execute(
233 """
234 SELECT id, parent_id, root_id, reposted_id
235 FROM posts
236 WHERE identifier = ?
237 AND user_id = ?
238 AND service = ?
239 """,
240 (identifier, user_id, service),
241 )
242 if not result:
243 return None
244 id, parent_id, root_id, reposted_id = result[0]
245 return {
246 "id": id,
247 "parent_id": parent_id,
248 "root_id": root_id,
249 "reposted_id": reposted_id,
250 }
251
252
253def find_mapped_thread(
254 db: DataBaseWorker,
255 parent_id: str,
256 input_user: str,
257 input_service: str,
258 output_user: str,
259 output_service: str,
260):
261 reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
262 if not reply_data:
263 return None
264
265 reply_mappings: list[str] | None = find_mappings(
266 db, reply_data["id"], output_service, output_user
267 )
268 if not reply_mappings:
269 return None
270
271 reply_identifier: str = reply_mappings[-1]
272 root_identifier: str = reply_mappings[0]
273 if reply_data["root_id"]:
274 root_data = find_post_by_id(db, reply_data["root_id"])
275 if not root_data:
276 return None
277
278 root_mappings = find_mappings(
279 db, reply_data["root_id"], output_service, output_user
280 )
281 if not root_mappings:
282 return None
283 root_identifier = root_mappings[0]
284
285 return (
286 root_identifier[0], # real ids
287 reply_identifier[0],
288 reply_data["root_id"], # db ids
289 reply_data["id"],
290 )