social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import sqlite3
2from concurrent.futures import Future
3import threading
4import queue
5
6class DataBaseWorker():
7 def __init__(self, database: str) -> None:
8 super(DataBaseWorker, self).__init__()
9 self.database = database
10 self.queue = queue.Queue()
11 self.thread = threading.Thread(target=self._run, daemon=True)
12 self.shutdown_event = threading.Event()
13 self.conn = sqlite3.connect(self.database, check_same_thread=False)
14 self.lock = threading.Lock()
15 self.thread.start()
16
17 def _run(self):
18 while not self.shutdown_event.is_set():
19 try:
20 task, future = self.queue.get(timeout=1)
21 try:
22 with self.lock:
23 result = task(self.conn)
24 future.set_result(result)
25 except Exception as e:
26 future.set_exception(e)
27 finally:
28 self.queue.task_done()
29 except queue.Empty:
30 continue
31
32 def execute(self, sql: str, params = ()):
33 def task(conn: sqlite3.Connection):
34 cursor = conn.execute(sql, params)
35 conn.commit()
36 return cursor.fetchall()
37
38 future = Future()
39 self.queue.put((task, future))
40 return future.result()
41
42 def close(self):
43 self.shutdown_event.set()
44 self.thread.join()
45 with self.lock:
46 self.conn.close()
47
48def try_insert_post(
49 db: DataBaseWorker,
50 post_id: str,
51 in_reply: str | None,
52 input_user: str,
53 input_service: str) -> bool:
54 root_id = None
55 parent_id = None
56
57 if in_reply:
58 parent_post = find_post(db, in_reply, input_user, input_service)
59 if not parent_post:
60 return False
61
62 root_id = parent_post['id']
63 parent_id = root_id
64 if parent_post['root_id']:
65 root_id = parent_post['root_id']
66
67 if root_id and parent_id:
68 insert_reply(db,post_id, input_user, input_service, parent_id, root_id)
69 else:
70 insert_post(db, post_id, input_user, input_service)
71
72 return True
73
74
75def find_mapped_thread(
76 db: DataBaseWorker,
77 parent_id: str,
78 input_user: str,
79 input_service: str,
80 output_user: str,
81 output_service: str):
82
83 reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
84 if not reply_data:
85 return None
86
87 reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user)
88 if not reply_mappings:
89 return None
90
91 reply_identifier: str = reply_mappings[-1]
92 root_identifier: str = reply_mappings[0]
93 if reply_data['root_id']:
94 root_data = find_post_by_id(db, reply_data['root_id'])
95 if not root_data:
96 return None
97
98 root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user)
99 if not root_mappings:
100 return None
101 root_identifier = root_mappings[0]
102
103 return (
104 root_identifier[0], # real ids
105 reply_identifier[0],
106 reply_data['root_id'], # db ids
107 reply_data['id']
108 )
109
110
111def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
112 db.execute(
113 """
114 INSERT INTO posts (user_id, service, identifier)
115 VALUES (?, ?, ?);
116 """, (user_id, serivce, identifier))
117 return db.execute("SELECT last_insert_rowid();", ())[0][0]
118
119def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int:
120 db.execute(
121 """
122 INSERT INTO posts (user_id, service, identifier, parent_id, root_id)
123 VALUES (?, ?, ?, ?, ?);
124 """, (user_id, serivce, identifier, parent, root))
125 return db.execute("SELECT last_insert_rowid();", ())[0][0]
126
127def insert_mapping(db: DataBaseWorker, original: int, mapped: int):
128 db.execute("""
129 INSERT INTO mappings (original_post_id, mapped_post_id)
130 VALUES (?, ?);
131 """, (original, mapped))
132
133def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str):
134 db.execute(
135 """
136 DELETE FROM posts
137 WHERE identifier = ?
138 AND service = ?
139 AND user_id = ?
140 """, (identifier, serivce, user_id))
141
142
143def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]:
144 return db.execute(
145 """
146 SELECT p.identifier
147 FROM posts AS p
148 JOIN mappings AS m
149 ON p.id = m.mapped_post_id
150 WHERE m.original_post_id = ?
151 AND p.service = ?
152 AND p.user_id = ?
153 ORDER BY p.id;
154 """,
155 (original_post, service, user_id))
156
157def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
158 result = db.execute(
159 """
160 SELECT user_id, service, identifier, parent_id, root_id
161 FROM posts
162 WHERE id = ?
163 """, (id,))
164 if not result:
165 return None
166 user_id, service, identifier, parent_id, root_id = result[0]
167 return {
168 'user_id': user_id,
169 'service': service,
170 'identifier': identifier,
171 'parent_id': parent_id,
172 'root_id': root_id
173 }
174
175def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None:
176 result = db.execute(
177 """
178 SELECT id, parent_id, root_id
179 FROM posts
180 WHERE identifier = ?
181 AND user_id = ?
182 AND service = ?
183 """, (identifier, user_id, service))
184 if not result:
185 return None
186 id, parent_id, root_id = result[0]
187 return {
188 'id': id,
189 'parent_id': parent_id,
190 'root_id': root_id
191 }