A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "database/sql"
5 "fmt"
6 "time"
7
8 "Coves/internal/core/repository"
9 "github.com/ipfs/go-cid"
10 "github.com/lib/pq"
11)
12
13// RepositoryRepo implements repository.RepositoryRepository using PostgreSQL
14type RepositoryRepo struct {
15 db *sql.DB
16}
17
18// NewRepositoryRepo creates a new PostgreSQL repository implementation
19func NewRepositoryRepo(db *sql.DB) *RepositoryRepo {
20 return &RepositoryRepo{db: db}
21}
22
23// Repository operations
24
25func (r *RepositoryRepo) Create(repo *repository.Repository) error {
26 query := `
27 INSERT INTO repositories (did, head_cid, revision, record_count, storage_size, created_at, updated_at)
28 VALUES ($1, $2, $3, $4, $5, $6, $7)`
29
30 _, err := r.db.Exec(query,
31 repo.DID,
32 repo.HeadCID.String(),
33 repo.Revision,
34 repo.RecordCount,
35 repo.StorageSize,
36 repo.CreatedAt,
37 repo.UpdatedAt,
38 )
39 if err != nil {
40 return fmt.Errorf("failed to create repository: %w", err)
41 }
42
43 return nil
44}
45
46func (r *RepositoryRepo) GetByDID(did string) (*repository.Repository, error) {
47 query := `
48 SELECT did, head_cid, revision, record_count, storage_size, created_at, updated_at
49 FROM repositories
50 WHERE did = $1`
51
52 var repo repository.Repository
53 var headCIDStr string
54
55 err := r.db.QueryRow(query, did).Scan(
56 &repo.DID,
57 &headCIDStr,
58 &repo.Revision,
59 &repo.RecordCount,
60 &repo.StorageSize,
61 &repo.CreatedAt,
62 &repo.UpdatedAt,
63 )
64 if err == sql.ErrNoRows {
65 return nil, nil
66 }
67 if err != nil {
68 return nil, fmt.Errorf("failed to get repository: %w", err)
69 }
70
71 repo.HeadCID, err = cid.Parse(headCIDStr)
72 if err != nil {
73 return nil, fmt.Errorf("failed to parse head CID: %w", err)
74 }
75
76 return &repo, nil
77}
78
79func (r *RepositoryRepo) Update(repo *repository.Repository) error {
80 query := `
81 UPDATE repositories
82 SET head_cid = $2, revision = $3, record_count = $4, storage_size = $5, updated_at = $6
83 WHERE did = $1`
84
85 result, err := r.db.Exec(query,
86 repo.DID,
87 repo.HeadCID.String(),
88 repo.Revision,
89 repo.RecordCount,
90 repo.StorageSize,
91 time.Now(),
92 )
93 if err != nil {
94 return fmt.Errorf("failed to update repository: %w", err)
95 }
96
97 rowsAffected, err := result.RowsAffected()
98 if err != nil {
99 return fmt.Errorf("failed to get rows affected: %w", err)
100 }
101 if rowsAffected == 0 {
102 return fmt.Errorf("repository not found: %s", repo.DID)
103 }
104
105 return nil
106}
107
108func (r *RepositoryRepo) Delete(did string) error {
109 query := `DELETE FROM repositories WHERE did = $1`
110
111 result, err := r.db.Exec(query, did)
112 if err != nil {
113 return fmt.Errorf("failed to delete repository: %w", err)
114 }
115
116 rowsAffected, err := result.RowsAffected()
117 if err != nil {
118 return fmt.Errorf("failed to get rows affected: %w", err)
119 }
120 if rowsAffected == 0 {
121 return fmt.Errorf("repository not found: %s", did)
122 }
123
124 return nil
125}
126
127// Commit operations
128
129func (r *RepositoryRepo) CreateCommit(commit *repository.Commit) error {
130 query := `
131 INSERT INTO commits (cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at)
132 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
133
134 var prevCID *string
135 if commit.PrevCID != nil {
136 s := commit.PrevCID.String()
137 prevCID = &s
138 }
139
140 _, err := r.db.Exec(query,
141 commit.CID.String(),
142 commit.DID,
143 commit.Version,
144 prevCID,
145 commit.DataCID.String(),
146 commit.Revision,
147 commit.Signature,
148 commit.SigningKeyID,
149 commit.CreatedAt,
150 )
151 if err != nil {
152 return fmt.Errorf("failed to create commit: %w", err)
153 }
154
155 return nil
156}
157
158func (r *RepositoryRepo) GetCommit(did string, commitCID cid.Cid) (*repository.Commit, error) {
159 query := `
160 SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at
161 FROM commits
162 WHERE did = $1 AND cid = $2`
163
164 var commit repository.Commit
165 var cidStr, dataCIDStr string
166 var prevCIDStr sql.NullString
167
168 err := r.db.QueryRow(query, did, commitCID.String()).Scan(
169 &cidStr,
170 &commit.DID,
171 &commit.Version,
172 &prevCIDStr,
173 &dataCIDStr,
174 &commit.Revision,
175 &commit.Signature,
176 &commit.SigningKeyID,
177 &commit.CreatedAt,
178 )
179 if err == sql.ErrNoRows {
180 return nil, nil
181 }
182 if err != nil {
183 return nil, fmt.Errorf("failed to get commit: %w", err)
184 }
185
186 commit.CID, err = cid.Parse(cidStr)
187 if err != nil {
188 return nil, fmt.Errorf("failed to parse commit CID: %w", err)
189 }
190
191 commit.DataCID, err = cid.Parse(dataCIDStr)
192 if err != nil {
193 return nil, fmt.Errorf("failed to parse data CID: %w", err)
194 }
195
196 if prevCIDStr.Valid {
197 prevCID, err := cid.Parse(prevCIDStr.String)
198 if err != nil {
199 return nil, fmt.Errorf("failed to parse prev CID: %w", err)
200 }
201 commit.PrevCID = &prevCID
202 }
203
204 return &commit, nil
205}
206
207func (r *RepositoryRepo) GetLatestCommit(did string) (*repository.Commit, error) {
208 query := `
209 SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at
210 FROM commits
211 WHERE did = $1
212 ORDER BY created_at DESC
213 LIMIT 1`
214
215 var commit repository.Commit
216 var cidStr, dataCIDStr string
217 var prevCIDStr sql.NullString
218
219 err := r.db.QueryRow(query, did).Scan(
220 &cidStr,
221 &commit.DID,
222 &commit.Version,
223 &prevCIDStr,
224 &dataCIDStr,
225 &commit.Revision,
226 &commit.Signature,
227 &commit.SigningKeyID,
228 &commit.CreatedAt,
229 )
230 if err == sql.ErrNoRows {
231 return nil, nil
232 }
233 if err != nil {
234 return nil, fmt.Errorf("failed to get latest commit: %w", err)
235 }
236
237 commit.CID, err = cid.Parse(cidStr)
238 if err != nil {
239 return nil, fmt.Errorf("failed to parse commit CID: %w", err)
240 }
241
242 commit.DataCID, err = cid.Parse(dataCIDStr)
243 if err != nil {
244 return nil, fmt.Errorf("failed to parse data CID: %w", err)
245 }
246
247 if prevCIDStr.Valid {
248 prevCID, err := cid.Parse(prevCIDStr.String)
249 if err != nil {
250 return nil, fmt.Errorf("failed to parse prev CID: %w", err)
251 }
252 commit.PrevCID = &prevCID
253 }
254
255 return &commit, nil
256}
257
258func (r *RepositoryRepo) ListCommits(did string, limit int, offset int) ([]*repository.Commit, error) {
259 query := `
260 SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at
261 FROM commits
262 WHERE did = $1
263 ORDER BY created_at DESC
264 LIMIT $2 OFFSET $3`
265
266 rows, err := r.db.Query(query, did, limit, offset)
267 if err != nil {
268 return nil, fmt.Errorf("failed to list commits: %w", err)
269 }
270 defer rows.Close()
271
272 var commits []*repository.Commit
273 for rows.Next() {
274 var commit repository.Commit
275 var cidStr, dataCIDStr string
276 var prevCIDStr sql.NullString
277
278 err := rows.Scan(
279 &cidStr,
280 &commit.DID,
281 &commit.Version,
282 &prevCIDStr,
283 &dataCIDStr,
284 &commit.Revision,
285 &commit.Signature,
286 &commit.SigningKeyID,
287 &commit.CreatedAt,
288 )
289 if err != nil {
290 return nil, fmt.Errorf("failed to scan commit: %w", err)
291 }
292
293 commit.CID, err = cid.Parse(cidStr)
294 if err != nil {
295 return nil, fmt.Errorf("failed to parse commit CID: %w", err)
296 }
297
298 commit.DataCID, err = cid.Parse(dataCIDStr)
299 if err != nil {
300 return nil, fmt.Errorf("failed to parse data CID: %w", err)
301 }
302
303 if prevCIDStr.Valid {
304 prevCID, err := cid.Parse(prevCIDStr.String)
305 if err != nil {
306 return nil, fmt.Errorf("failed to parse prev CID: %w", err)
307 }
308 commit.PrevCID = &prevCID
309 }
310
311 commits = append(commits, &commit)
312 }
313
314 return commits, nil
315}
316
317// Record operations
318
319func (r *RepositoryRepo) CreateRecord(record *repository.Record) error {
320 query := `
321 INSERT INTO records (did, uri, cid, collection, record_key, created_at, updated_at)
322 VALUES ($1, $2, $3, $4, $5, $6, $7)`
323
324 _, err := r.db.Exec(query,
325 record.URI[:len("at://")+len(record.URI[len("at://"):])-len(record.Collection)-len(record.RecordKey)-2], // Extract DID from URI
326 record.URI,
327 record.CID.String(),
328 record.Collection,
329 record.RecordKey,
330 record.CreatedAt,
331 record.UpdatedAt,
332 )
333 if err != nil {
334 if pqErr, ok := err.(*pq.Error); ok && pqErr.Code == "23505" { // unique_violation
335 return fmt.Errorf("record already exists: %s", record.URI)
336 }
337 return fmt.Errorf("failed to create record: %w", err)
338 }
339
340 return nil
341}
342
343func (r *RepositoryRepo) GetRecord(did string, collection string, recordKey string) (*repository.Record, error) {
344 query := `
345 SELECT uri, cid, collection, record_key, created_at, updated_at
346 FROM records
347 WHERE did = $1 AND collection = $2 AND record_key = $3`
348
349 var record repository.Record
350 var cidStr string
351
352 err := r.db.QueryRow(query, did, collection, recordKey).Scan(
353 &record.URI,
354 &cidStr,
355 &record.Collection,
356 &record.RecordKey,
357 &record.CreatedAt,
358 &record.UpdatedAt,
359 )
360 if err == sql.ErrNoRows {
361 return nil, nil
362 }
363 if err != nil {
364 return nil, fmt.Errorf("failed to get record: %w", err)
365 }
366
367 record.CID, err = cid.Parse(cidStr)
368 if err != nil {
369 return nil, fmt.Errorf("failed to parse record CID: %w", err)
370 }
371
372 return &record, nil
373}
374
375func (r *RepositoryRepo) UpdateRecord(record *repository.Record) error {
376 did := record.URI[:len("at://")+len(record.URI[len("at://"):])-len(record.Collection)-len(record.RecordKey)-2]
377
378 query := `
379 UPDATE records
380 SET cid = $4, updated_at = $5
381 WHERE did = $1 AND collection = $2 AND record_key = $3`
382
383 result, err := r.db.Exec(query,
384 did,
385 record.Collection,
386 record.RecordKey,
387 record.CID.String(),
388 time.Now(),
389 )
390 if err != nil {
391 return fmt.Errorf("failed to update record: %w", err)
392 }
393
394 rowsAffected, err := result.RowsAffected()
395 if err != nil {
396 return fmt.Errorf("failed to get rows affected: %w", err)
397 }
398 if rowsAffected == 0 {
399 return fmt.Errorf("record not found: %s", record.URI)
400 }
401
402 return nil
403}
404
405func (r *RepositoryRepo) DeleteRecord(did string, collection string, recordKey string) error {
406 query := `DELETE FROM records WHERE did = $1 AND collection = $2 AND record_key = $3`
407
408 result, err := r.db.Exec(query, did, collection, recordKey)
409 if err != nil {
410 return fmt.Errorf("failed to delete record: %w", err)
411 }
412
413 rowsAffected, err := result.RowsAffected()
414 if err != nil {
415 return fmt.Errorf("failed to get rows affected: %w", err)
416 }
417 if rowsAffected == 0 {
418 return fmt.Errorf("record not found")
419 }
420
421 return nil
422}
423
424func (r *RepositoryRepo) ListRecords(did string, collection string, limit int, offset int) ([]*repository.Record, error) {
425 query := `
426 SELECT uri, cid, collection, record_key, created_at, updated_at
427 FROM records
428 WHERE did = $1 AND collection = $2
429 ORDER BY created_at DESC
430 LIMIT $3 OFFSET $4`
431
432 rows, err := r.db.Query(query, did, collection, limit, offset)
433 if err != nil {
434 return nil, fmt.Errorf("failed to list records: %w", err)
435 }
436 defer rows.Close()
437
438 var records []*repository.Record
439 for rows.Next() {
440 var record repository.Record
441 var cidStr string
442
443 err := rows.Scan(
444 &record.URI,
445 &cidStr,
446 &record.Collection,
447 &record.RecordKey,
448 &record.CreatedAt,
449 &record.UpdatedAt,
450 )
451 if err != nil {
452 return nil, fmt.Errorf("failed to scan record: %w", err)
453 }
454
455 record.CID, err = cid.Parse(cidStr)
456 if err != nil {
457 return nil, fmt.Errorf("failed to parse record CID: %w", err)
458 }
459
460 records = append(records, &record)
461 }
462
463 return records, nil
464}
465