A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "database/sql" 5 "fmt" 6 "time" 7 8 "Coves/internal/core/repository" 9 "github.com/ipfs/go-cid" 10 "github.com/lib/pq" 11) 12 13// RepositoryRepo implements repository.RepositoryRepository using PostgreSQL 14type RepositoryRepo struct { 15 db *sql.DB 16} 17 18// NewRepositoryRepo creates a new PostgreSQL repository implementation 19func NewRepositoryRepo(db *sql.DB) *RepositoryRepo { 20 return &RepositoryRepo{db: db} 21} 22 23// Repository operations 24 25func (r *RepositoryRepo) Create(repo *repository.Repository) error { 26 query := ` 27 INSERT INTO repositories (did, head_cid, revision, record_count, storage_size, created_at, updated_at) 28 VALUES ($1, $2, $3, $4, $5, $6, $7)` 29 30 _, err := r.db.Exec(query, 31 repo.DID, 32 repo.HeadCID.String(), 33 repo.Revision, 34 repo.RecordCount, 35 repo.StorageSize, 36 repo.CreatedAt, 37 repo.UpdatedAt, 38 ) 39 if err != nil { 40 return fmt.Errorf("failed to create repository: %w", err) 41 } 42 43 return nil 44} 45 46func (r *RepositoryRepo) GetByDID(did string) (*repository.Repository, error) { 47 query := ` 48 SELECT did, head_cid, revision, record_count, storage_size, created_at, updated_at 49 FROM repositories 50 WHERE did = $1` 51 52 var repo repository.Repository 53 var headCIDStr string 54 55 err := r.db.QueryRow(query, did).Scan( 56 &repo.DID, 57 &headCIDStr, 58 &repo.Revision, 59 &repo.RecordCount, 60 &repo.StorageSize, 61 &repo.CreatedAt, 62 &repo.UpdatedAt, 63 ) 64 if err == sql.ErrNoRows { 65 return nil, nil 66 } 67 if err != nil { 68 return nil, fmt.Errorf("failed to get repository: %w", err) 69 } 70 71 repo.HeadCID, err = cid.Parse(headCIDStr) 72 if err != nil { 73 return nil, fmt.Errorf("failed to parse head CID: %w", err) 74 } 75 76 return &repo, nil 77} 78 79func (r *RepositoryRepo) Update(repo *repository.Repository) error { 80 query := ` 81 UPDATE repositories 82 SET head_cid = $2, revision = $3, record_count = $4, storage_size = $5, updated_at = $6 83 WHERE did = $1` 84 85 result, err := r.db.Exec(query, 86 repo.DID, 87 repo.HeadCID.String(), 88 repo.Revision, 89 repo.RecordCount, 90 repo.StorageSize, 91 time.Now(), 92 ) 93 if err != nil { 94 return fmt.Errorf("failed to update repository: %w", err) 95 } 96 97 rowsAffected, err := result.RowsAffected() 98 if err != nil { 99 return fmt.Errorf("failed to get rows affected: %w", err) 100 } 101 if rowsAffected == 0 { 102 return fmt.Errorf("repository not found: %s", repo.DID) 103 } 104 105 return nil 106} 107 108func (r *RepositoryRepo) Delete(did string) error { 109 query := `DELETE FROM repositories WHERE did = $1` 110 111 result, err := r.db.Exec(query, did) 112 if err != nil { 113 return fmt.Errorf("failed to delete repository: %w", err) 114 } 115 116 rowsAffected, err := result.RowsAffected() 117 if err != nil { 118 return fmt.Errorf("failed to get rows affected: %w", err) 119 } 120 if rowsAffected == 0 { 121 return fmt.Errorf("repository not found: %s", did) 122 } 123 124 return nil 125} 126 127// Commit operations 128 129func (r *RepositoryRepo) CreateCommit(commit *repository.Commit) error { 130 query := ` 131 INSERT INTO commits (cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at) 132 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)` 133 134 var prevCID *string 135 if commit.PrevCID != nil { 136 s := commit.PrevCID.String() 137 prevCID = &s 138 } 139 140 _, err := r.db.Exec(query, 141 commit.CID.String(), 142 commit.DID, 143 commit.Version, 144 prevCID, 145 commit.DataCID.String(), 146 commit.Revision, 147 commit.Signature, 148 commit.SigningKeyID, 149 commit.CreatedAt, 150 ) 151 if err != nil { 152 return fmt.Errorf("failed to create commit: %w", err) 153 } 154 155 return nil 156} 157 158func (r *RepositoryRepo) GetCommit(did string, commitCID cid.Cid) (*repository.Commit, error) { 159 query := ` 160 SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at 161 FROM commits 162 WHERE did = $1 AND cid = $2` 163 164 var commit repository.Commit 165 var cidStr, dataCIDStr string 166 var prevCIDStr sql.NullString 167 168 err := r.db.QueryRow(query, did, commitCID.String()).Scan( 169 &cidStr, 170 &commit.DID, 171 &commit.Version, 172 &prevCIDStr, 173 &dataCIDStr, 174 &commit.Revision, 175 &commit.Signature, 176 &commit.SigningKeyID, 177 &commit.CreatedAt, 178 ) 179 if err == sql.ErrNoRows { 180 return nil, nil 181 } 182 if err != nil { 183 return nil, fmt.Errorf("failed to get commit: %w", err) 184 } 185 186 commit.CID, err = cid.Parse(cidStr) 187 if err != nil { 188 return nil, fmt.Errorf("failed to parse commit CID: %w", err) 189 } 190 191 commit.DataCID, err = cid.Parse(dataCIDStr) 192 if err != nil { 193 return nil, fmt.Errorf("failed to parse data CID: %w", err) 194 } 195 196 if prevCIDStr.Valid { 197 prevCID, err := cid.Parse(prevCIDStr.String) 198 if err != nil { 199 return nil, fmt.Errorf("failed to parse prev CID: %w", err) 200 } 201 commit.PrevCID = &prevCID 202 } 203 204 return &commit, nil 205} 206 207func (r *RepositoryRepo) GetLatestCommit(did string) (*repository.Commit, error) { 208 query := ` 209 SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at 210 FROM commits 211 WHERE did = $1 212 ORDER BY created_at DESC 213 LIMIT 1` 214 215 var commit repository.Commit 216 var cidStr, dataCIDStr string 217 var prevCIDStr sql.NullString 218 219 err := r.db.QueryRow(query, did).Scan( 220 &cidStr, 221 &commit.DID, 222 &commit.Version, 223 &prevCIDStr, 224 &dataCIDStr, 225 &commit.Revision, 226 &commit.Signature, 227 &commit.SigningKeyID, 228 &commit.CreatedAt, 229 ) 230 if err == sql.ErrNoRows { 231 return nil, nil 232 } 233 if err != nil { 234 return nil, fmt.Errorf("failed to get latest commit: %w", err) 235 } 236 237 commit.CID, err = cid.Parse(cidStr) 238 if err != nil { 239 return nil, fmt.Errorf("failed to parse commit CID: %w", err) 240 } 241 242 commit.DataCID, err = cid.Parse(dataCIDStr) 243 if err != nil { 244 return nil, fmt.Errorf("failed to parse data CID: %w", err) 245 } 246 247 if prevCIDStr.Valid { 248 prevCID, err := cid.Parse(prevCIDStr.String) 249 if err != nil { 250 return nil, fmt.Errorf("failed to parse prev CID: %w", err) 251 } 252 commit.PrevCID = &prevCID 253 } 254 255 return &commit, nil 256} 257 258func (r *RepositoryRepo) ListCommits(did string, limit int, offset int) ([]*repository.Commit, error) { 259 query := ` 260 SELECT cid, did, version, prev_cid, data_cid, revision, signature, signing_key_id, created_at 261 FROM commits 262 WHERE did = $1 263 ORDER BY created_at DESC 264 LIMIT $2 OFFSET $3` 265 266 rows, err := r.db.Query(query, did, limit, offset) 267 if err != nil { 268 return nil, fmt.Errorf("failed to list commits: %w", err) 269 } 270 defer rows.Close() 271 272 var commits []*repository.Commit 273 for rows.Next() { 274 var commit repository.Commit 275 var cidStr, dataCIDStr string 276 var prevCIDStr sql.NullString 277 278 err := rows.Scan( 279 &cidStr, 280 &commit.DID, 281 &commit.Version, 282 &prevCIDStr, 283 &dataCIDStr, 284 &commit.Revision, 285 &commit.Signature, 286 &commit.SigningKeyID, 287 &commit.CreatedAt, 288 ) 289 if err != nil { 290 return nil, fmt.Errorf("failed to scan commit: %w", err) 291 } 292 293 commit.CID, err = cid.Parse(cidStr) 294 if err != nil { 295 return nil, fmt.Errorf("failed to parse commit CID: %w", err) 296 } 297 298 commit.DataCID, err = cid.Parse(dataCIDStr) 299 if err != nil { 300 return nil, fmt.Errorf("failed to parse data CID: %w", err) 301 } 302 303 if prevCIDStr.Valid { 304 prevCID, err := cid.Parse(prevCIDStr.String) 305 if err != nil { 306 return nil, fmt.Errorf("failed to parse prev CID: %w", err) 307 } 308 commit.PrevCID = &prevCID 309 } 310 311 commits = append(commits, &commit) 312 } 313 314 return commits, nil 315} 316 317// Record operations 318 319func (r *RepositoryRepo) CreateRecord(record *repository.Record) error { 320 query := ` 321 INSERT INTO records (did, uri, cid, collection, record_key, created_at, updated_at) 322 VALUES ($1, $2, $3, $4, $5, $6, $7)` 323 324 _, err := r.db.Exec(query, 325 record.URI[:len("at://")+len(record.URI[len("at://"):])-len(record.Collection)-len(record.RecordKey)-2], // Extract DID from URI 326 record.URI, 327 record.CID.String(), 328 record.Collection, 329 record.RecordKey, 330 record.CreatedAt, 331 record.UpdatedAt, 332 ) 333 if err != nil { 334 if pqErr, ok := err.(*pq.Error); ok && pqErr.Code == "23505" { // unique_violation 335 return fmt.Errorf("record already exists: %s", record.URI) 336 } 337 return fmt.Errorf("failed to create record: %w", err) 338 } 339 340 return nil 341} 342 343func (r *RepositoryRepo) GetRecord(did string, collection string, recordKey string) (*repository.Record, error) { 344 query := ` 345 SELECT uri, cid, collection, record_key, created_at, updated_at 346 FROM records 347 WHERE did = $1 AND collection = $2 AND record_key = $3` 348 349 var record repository.Record 350 var cidStr string 351 352 err := r.db.QueryRow(query, did, collection, recordKey).Scan( 353 &record.URI, 354 &cidStr, 355 &record.Collection, 356 &record.RecordKey, 357 &record.CreatedAt, 358 &record.UpdatedAt, 359 ) 360 if err == sql.ErrNoRows { 361 return nil, nil 362 } 363 if err != nil { 364 return nil, fmt.Errorf("failed to get record: %w", err) 365 } 366 367 record.CID, err = cid.Parse(cidStr) 368 if err != nil { 369 return nil, fmt.Errorf("failed to parse record CID: %w", err) 370 } 371 372 return &record, nil 373} 374 375func (r *RepositoryRepo) UpdateRecord(record *repository.Record) error { 376 did := record.URI[:len("at://")+len(record.URI[len("at://"):])-len(record.Collection)-len(record.RecordKey)-2] 377 378 query := ` 379 UPDATE records 380 SET cid = $4, updated_at = $5 381 WHERE did = $1 AND collection = $2 AND record_key = $3` 382 383 result, err := r.db.Exec(query, 384 did, 385 record.Collection, 386 record.RecordKey, 387 record.CID.String(), 388 time.Now(), 389 ) 390 if err != nil { 391 return fmt.Errorf("failed to update record: %w", err) 392 } 393 394 rowsAffected, err := result.RowsAffected() 395 if err != nil { 396 return fmt.Errorf("failed to get rows affected: %w", err) 397 } 398 if rowsAffected == 0 { 399 return fmt.Errorf("record not found: %s", record.URI) 400 } 401 402 return nil 403} 404 405func (r *RepositoryRepo) DeleteRecord(did string, collection string, recordKey string) error { 406 query := `DELETE FROM records WHERE did = $1 AND collection = $2 AND record_key = $3` 407 408 result, err := r.db.Exec(query, did, collection, recordKey) 409 if err != nil { 410 return fmt.Errorf("failed to delete record: %w", err) 411 } 412 413 rowsAffected, err := result.RowsAffected() 414 if err != nil { 415 return fmt.Errorf("failed to get rows affected: %w", err) 416 } 417 if rowsAffected == 0 { 418 return fmt.Errorf("record not found") 419 } 420 421 return nil 422} 423 424func (r *RepositoryRepo) ListRecords(did string, collection string, limit int, offset int) ([]*repository.Record, error) { 425 query := ` 426 SELECT uri, cid, collection, record_key, created_at, updated_at 427 FROM records 428 WHERE did = $1 AND collection = $2 429 ORDER BY created_at DESC 430 LIMIT $3 OFFSET $4` 431 432 rows, err := r.db.Query(query, did, collection, limit, offset) 433 if err != nil { 434 return nil, fmt.Errorf("failed to list records: %w", err) 435 } 436 defer rows.Close() 437 438 var records []*repository.Record 439 for rows.Next() { 440 var record repository.Record 441 var cidStr string 442 443 err := rows.Scan( 444 &record.URI, 445 &cidStr, 446 &record.Collection, 447 &record.RecordKey, 448 &record.CreatedAt, 449 &record.UpdatedAt, 450 ) 451 if err != nil { 452 return nil, fmt.Errorf("failed to scan record: %w", err) 453 } 454 455 record.CID, err = cid.Parse(cidStr) 456 if err != nil { 457 return nil, fmt.Errorf("failed to parse record CID: %w", err) 458 } 459 460 records = append(records, &record) 461 } 462 463 return records, nil 464} 465