social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import sqlite3
2from concurrent.futures import Future
3import threading
4import queue
5
6class DataBaseWorker():
7 def __init__(self, database: str) -> None:
8 super(DataBaseWorker, self).__init__()
9 self.database = database
10 self.queue = queue.Queue()
11 self.thread = threading.Thread(target=self._run, daemon=True)
12 self.shutdown_event = threading.Event()
13 self.conn = sqlite3.connect(self.database, check_same_thread=False)
14 self.lock = threading.Lock()
15 self.thread.start()
16
17 def _run(self):
18 while not self.shutdown_event.is_set():
19 try:
20 task, future = self.queue.get(timeout=1)
21 try:
22 with self.lock:
23 result = task(self.conn)
24 future.set_result(result)
25 except Exception as e:
26 future.set_exception(e)
27 finally:
28 self.queue.task_done()
29 except queue.Empty:
30 continue
31
32 def execute(self, sql: str, params = ()):
33 def task(conn: sqlite3.Connection):
34 cursor = conn.execute(sql, params)
35 conn.commit()
36 return cursor.fetchall()
37
38 future = Future()
39 self.queue.put((task, future))
40 return future.result()
41
42 def close(self):
43 self.shutdown_event.set()
44 self.thread.join()
45 with self.lock:
46 self.conn.close()
47
48def find_mapped_thread(
49 db: DataBaseWorker,
50 parent_id: str,
51 input_user: str,
52 input_service: str,
53 output_user: str,
54 output_service: str):
55
56 reply_data: dict | None = find_post(db, parent_id, input_user, input_service)
57 if not reply_data:
58 return None
59
60 reply_mappings: list[str] | None = find_mappings(db, reply_data['id'], output_service, output_user)
61 if not reply_mappings:
62 return None
63
64 reply_identifier: str = reply_mappings[-1]
65 root_identifier: str = reply_mappings[0]
66 if reply_data['root_id']:
67 root_data = find_post_by_id(db, reply_data['root_id'])
68 if not root_data:
69 return None
70
71 root_mappings = find_mappings(db, reply_data['root_id'], output_service, output_user)
72 if not root_mappings:
73 return None
74 root_identifier = root_mappings[0]
75
76 return (
77 root_identifier[0], # real ids
78 reply_identifier[0],
79 reply_data['root_id'], # db ids
80 reply_data['id']
81 )
82
83
84def insert_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str) -> int:
85 db.execute(
86 """
87 INSERT INTO posts (user_id, service, identifier)
88 VALUES (?, ?, ?);
89 """, (user_id, serivce, identifier))
90 return db.execute("SELECT last_insert_rowid();", ())[0][0]
91
92def insert_reply(db: DataBaseWorker, identifier: str, user_id: str, serivce: str, parent: int, root: int) -> int:
93 db.execute(
94 """
95 INSERT INTO posts (user_id, service, identifier, parent_id, root_id)
96 VALUES (?, ?, ?, ?, ?);
97 """, (user_id, serivce, identifier, parent, root))
98 return db.execute("SELECT last_insert_rowid();", ())[0][0]
99
100def insert_mapping(db: DataBaseWorker, original: int, mapped: int):
101 db.execute("""
102 INSERT INTO mappings (original_post_id, mapped_post_id)
103 VALUES (?, ?);
104 """, (original, mapped))
105
106def delete_post(db: DataBaseWorker, identifier: str, user_id: str, serivce: str):
107 db.execute(
108 """
109 DELETE FROM posts
110 WHERE identifier = ?
111 AND service = ?
112 AND user_id = ?
113 """, (identifier, serivce, user_id))
114
115
116def find_mappings(db: DataBaseWorker, original_post: int, service: str, user_id: str) -> list[str]:
117 return db.execute(
118 """
119 SELECT p.identifier
120 FROM posts AS p
121 JOIN mappings AS m
122 ON p.id = m.mapped_post_id
123 WHERE m.original_post_id = ?
124 AND p.service = ?
125 AND p.user_id = ?
126 ORDER BY p.id;
127 """,
128 (original_post, service, user_id))
129
130def find_post_by_id(db: DataBaseWorker, id: int) -> dict | None:
131 result = db.execute(
132 """
133 SELECT user_id, service, identifier, parent_id, root_id
134 FROM posts
135 WHERE id = ?
136 """, (id,))
137 if not result:
138 return None
139 user_id, service, identifier, parent_id, root_id = result[0]
140 return {
141 'user_id': user_id,
142 'service': service,
143 'identifier': identifier,
144 'parent_id': parent_id,
145 'root_id': root_id
146 }
147
148def find_post(db: DataBaseWorker, identifier: str, user_id: str, service: str) -> dict | None:
149 result = db.execute(
150 """
151 SELECT id, parent_id, root_id
152 FROM posts
153 WHERE identifier = ?
154 AND user_id = ?
155 AND service = ?
156 """, (identifier, user_id, service))
157 if not result:
158 return None
159 id, parent_id, root_id = result[0]
160 return {
161 'id': id,
162 'parent_id': parent_id,
163 'root_id': root_id
164 }