social media crossposting tool. 3rd time's the charm
mastodon
misskey
crossposting
bluesky
1import sqlite3
2import json
3
4import sqlite3
5import json
6
7class DataBase():
8
9 def __init__(self, path: str) -> None:
10 self.path = path
11 connection = sqlite3.connect(self.path, autocommit=True)
12 cursor = connection.cursor()
13 cursor.execute('''
14 CREATE TABLE IF NOT EXISTS posts (
15 id TEXT,
16 user_id TEXT,
17 data TEXT,
18 PRIMARY KEY (id, user_id)
19 )
20 ''')
21 cursor.close()
22
23 def connect(self) -> sqlite3.Connection:
24 return sqlite3.connect(self.path, autocommit=True)
25
26 def put_post(self, db: sqlite3.Connection, user_id: str, id: str, data: dict):
27 cursor = db.cursor()
28 cursor.execute('''
29 INSERT OR REPLACE INTO posts (id, user_id, data) VALUES (?, ?, ?)
30 ''', (id, user_id, json.dumps(data)))
31 cursor.close()
32
33 def del_post(self, db: sqlite3.Connection, user_id: str, id: str):
34 cursor = db.cursor()
35 cursor.execute('''
36 DELETE FROM posts WHERE id = ? AND user_id = ?
37 ''', (id, user_id))
38 cursor.close()
39
40 def read_data(self, db: sqlite3.Connection, user_id: str, id: str) -> dict | None:
41 cursor = db.cursor()
42 cursor.execute('''
43 SELECT data FROM posts WHERE id = ? AND user_id = ?
44 ''', (id, user_id))
45 row = cursor.fetchone()
46 cursor.close()
47 if row:
48 data_json = row[0]
49 return json.loads(data_json)
50 return None
51
52 def get_all_children(self, db: sqlite3.Connection, user_id: str, id: str) -> dict[str, dict]:
53 cursor = db.cursor()
54 cursor.execute('''
55 WITH RECURSIVE thread_cte (id, user_id, data, current_post_uri) AS (
56 SELECT
57 T1.id,
58 T1.user_id,
59 T1.data,
60 json_extract(
61 T1.data,
62 '$.mapped_post_refs[' || (json_array_length(T1.data, '$.mapped_post_refs') - 1) || '].uri'
63 ) AS current_post_uri
64 FROM
65 posts AS T1
66 WHERE
67 T1.id = ? AND T1.user_id = ?
68
69 UNION ALL
70
71 SELECT
72 C.id,
73 C.user_id,
74 C.data,
75 json_extract(
76 C.data,
77 '$.mapped_post_refs[' || (json_array_length(C.data, '$.mapped_post_refs') - 1) || '].uri'
78 ) AS current_post_uri
79 FROM
80 posts AS C
81 JOIN
82 thread_cte AS P ON json_extract(C.data, '$.parent_ref.uri') = P.current_post_uri
83 WHERE
84 C.user_id = ?
85 )
86 SELECT id, data FROM thread_cte;
87 ''', (id, user_id, user_id))
88 raw_data = cursor.fetchall()
89 cursor.close()
90
91 if not raw_data:
92 return {}
93
94 data: dict[str, dict] = {}
95 for post_id, post_data in raw_data:
96 data[post_id] = json.loads(post_data)
97
98 return data
99
100class UserScopedDB:
101 def __init__(self, db: DataBase, user_id: str):
102 self.db = db
103 self.user_id = user_id
104
105 def connect(self) -> sqlite3.Connection:
106 return self.db.connect()
107
108 def put_post(self, db: sqlite3.Connection, id: str, data: dict):
109 return self.db.put_post(db, self.user_id, id, data)
110
111 def del_post(self, db: sqlite3.Connection, id: str):
112 return self.db.del_post(db, self.user_id, id)
113
114 def read_data(self, db: sqlite3.Connection, id: str) -> dict | None:
115 return self.db.read_data(db, self.user_id, id)
116
117 def get_all_children(self, db: sqlite3.Connection, id: str) -> dict[str, dict]:
118 return self.db.get_all_children(db, self.user_id, id)