social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import sqlite3
2from concurrent.futures import Future
3import threading
4import queue
5import json
6
7class DataBaseWorker():
8 def __init__(self, database: str) -> None:
9 super(DataBaseWorker, self).__init__()
10 self.database = database
11 self.queue = queue.Queue()
12 self.thread = threading.Thread(target=self._run, daemon=True)
13 self.shutdown_event = threading.Event()
14 self.conn = sqlite3.connect(self.database, check_same_thread=False)
15 self.lock = threading.Lock()
16 self.thread.start()
17
18 def _run(self):
19 while not self.shutdown_event.is_set():
20 try:
21 task, future = self.queue.get(timeout=1)
22 try:
23 with self.lock:
24 result = task(self.conn)
25 future.set_result(result)
26 except Exception as e:
27 future.set_exception(e)
28 finally:
29 self.queue.task_done()
30 except queue.Empty:
31 continue
32
33 def execute(self, sql: str, params = ()):
34 def task(conn: sqlite3.Connection):
35 cursor = conn.execute(sql, params)
36 conn.commit()
37 return cursor.fetchall()
38
39 future = Future()
40 self.queue.put((task, future))
41 return future.result()
42
43 def close(self):
44 self.shutdown_event.set()
45 self.thread.join()
46 with self.lock:
47 self.conn.close()
48
49def try_insert_repost(
50 db: DataBaseWorker,
51 post_id: str,
52 reposted_id: str,
53 input_user: str,
54 input_service: str) -> bool:
55
56 reposted = find_post(db, reposted_id, input_user, input_service)
57 if not reposted:
58 return False
59
60 insert_repost(db, post_id, reposted['id'], input_user, input_service)
61 return True
62
63
64def try_insert_post(
65 db: DataBaseWorker,
66 post_id: str,
67 in_reply: str | None,
68 input_user: str,
69 input_service: str) -> bool:
70 root_id = None
71 parent_id = None
72
73 if in_reply:
74 parent_post = find_post(db, in_reply, input_user, input_service)
75 if not parent_post:
76 return False
77
78 root_id = parent_post['id']
79 parent_id = root_id
80 if parent_post['root_id']:
81 root_id = parent_post['root_id']
82
83 if root_id and parent_id:
84 insert_reply(db,post_id, input_user, input_service, parent_id, root_id)
85 else:
86 insert_post(db, post_id, input_user, input_service)
87
88 return True
89
90def insert_repost(db: DataBaseWorker, identifier: str, reposted_id: int, user_id: str, serivce: str) -> int:
91 db.execute(
92 """
93 INSERT INTO posts (user_id, service, identifier, reposted_id)
94 VALUES (?, ?, ?, ?);
95 """, (user_id, serivce, identifier, reposted_id))
96 return db.execute("SELECT last_insert_rowid();", ())[0][0]
97
98def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
99 db.execute(
100 """
101 INSERT INTO posts (user_id, service, identifier)
102 VALUES (?, ?, ?);
103 """, (user_id, serivce, identifier))
104 return db.execute("SELECT last_insert_rowid();", ())[0][0]
105
106def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int:
107 db.execute(
108 """
109 INSERT INTO posts (user_id, service, identifier, parent_id, root_id)
110 VALUES (?, ?, ?, ?, ?);
111 """, (user_id, serivce, identifier, parent, root))
112 return db.execute("SELECT last_insert_rowid();", ())[0][0]
113
114def insert_mapping(db: DataBaseWorker, original: int, mapped: int):
115 db.execute("""
116 INSERT INTO mappings (original_post_id, mapped_post_id)
117 VALUES (?, ?);
118 """, (original, mapped))
119
120def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str):
121 db.execute(
122 """
123 DELETE FROM posts
124 WHERE identifier = ?
125 AND service = ?
126 AND user_id = ?
127 """, (identifier, serivce, user_id))
128
129def fetch_data(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict:
130 result = db.execute(
131 """
132 SELECT extra_data
133 FROM posts
134 WHERE identifier = ?
135 AND user_id = ?
136 AND service = ?
137 """, (identifier, user_id, service))
138 if not result or not result[0]:
139 return {}
140 return json.loads(result[0][0])
141
142def store_data(db: DataBaseWorker, identifier: str, user_id: str, service: str, extra_data: dict) -> None:
143 db.execute(
144 """
145 UPDATE posts
146 SET extra_data = ?
147 WHERE identifier = ?
148 AND user_id = ?
149 AND service = ?
150 """,
151 (json.dumps(extra_data), identifier, user_id, service)
152 )
153
154def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]:
155 return db.execute(
156 """
157 SELECT p.identifier
158 FROM posts AS p
159 JOIN mappings AS m
160 ON p.id = m.mapped_post_id
161 WHERE m.original_post_id = ?
162 AND p.service = ?
163 AND p.user_id = ?
164 ORDER BY p.id;
165 """,
166 (original_post, service, user_id))
167
168def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
169 result = db.execute(
170 """
171 SELECT user_id, service, identifier, parent_id, root_id, reposted_id
172 FROM posts
173 WHERE id = ?
174 """, (id,))
175 if not result:
176 return None
177 user_id, service, identifier, parent_id, root_id, reposted_id = result[0]
178 return {
179 'user_id': user_id,
180 'service': service,
181 'identifier': identifier,
182 'parent_id': parent_id,
183 'root_id': root_id,
184 'reposted_id': reposted_id
185 }
186
187def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None:
188 result = db.execute(
189 """
190 SELECT id, parent_id, root_id, reposted_id
191 FROM posts
192 WHERE identifier = ?
193 AND user_id = ?
194 AND service = ?
195 """, (identifier, user_id, service))
196 if not result:
197 return None
198 id, parent_id, root_id, reposted_id = result[0]
199 return {
200 'id': id,
201 'parent_id': parent_id,
202 'root_id': root_id,
203 'reposted_id': reposted_id
204 }
205
206def find_mapped_thread(
207 db: DataBaseWorker,
208 parent_id: str,
209 input_user: str,
210 input_service: str,
211 output_user: str,
212 output_service: str):
213
214 reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
215 if not reply_data:
216 return None
217
218 reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user)
219 if not reply_mappings:
220 return None
221
222 reply_identifier: str = reply_mappings[-1]
223 root_identifier: str = reply_mappings[0]
224 if reply_data['root_id']:
225 root_data = find_post_by_id(db, reply_data['root_id'])
226 if not root_data:
227 return None
228
229 root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user)
230 if not root_mappings:
231 return None
232 root_identifier = root_mappings[0]
233
234 return (
235 root_identifier[0], # real ids
236 reply_identifier[0],
237 reply_data['root_id'], # db ids
238 reply_data['id']
239 )