A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "strings" 9 10 "Coves/internal/core/communities" 11 12 "github.com/lib/pq" 13) 14 15type postgresCommunityRepo struct { 16 db *sql.DB 17} 18 19// NewCommunityRepository creates a new PostgreSQL community repository 20func NewCommunityRepository(db *sql.DB) communities.Repository { 21 return &postgresCommunityRepo{db: db} 22} 23 24// Create inserts a new community into the communities table 25func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) { 26 // Validate that handle is always provided (constructed by consumer) 27 if community.Handle == "" { 28 return nil, fmt.Errorf("handle is required (should be constructed by consumer before insert)") 29 } 30 31 query := ` 32 INSERT INTO communities ( 33 did, handle, name, display_name, description, description_facets, 34 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 35 pds_email, pds_password_encrypted, 36 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url, 37 visibility, allow_external_discovery, moderation_type, content_warnings, 38 member_count, subscriber_count, post_count, 39 federated_from, federated_id, created_at, updated_at, 40 record_uri, record_cid 41 ) VALUES ( 42 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, 43 $12, 44 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 45 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 46 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 47 $16, 48 $17, $18, $19, $20, 49 $21, $22, $23, $24, $25, $26, $27, $28, $29 50 ) 51 RETURNING id, created_at, updated_at` 52 53 // Handle JSONB field - use sql.NullString with valid JSON or NULL 54 var descFacets interface{} 55 if len(community.DescriptionFacets) > 0 { 56 descFacets = community.DescriptionFacets 57 } else { 58 descFacets = nil 59 } 60 61 err := r.db.QueryRowContext(ctx, query, 62 community.DID, 63 community.Handle, // Always non-empty - constructed by AppView consumer 64 community.Name, 65 nullString(community.DisplayName), 66 nullString(community.Description), 67 descFacets, 68 nullString(community.AvatarCID), 69 nullString(community.BannerCID), 70 community.OwnerDID, 71 community.CreatedByDID, 72 community.HostedByDID, 73 // V2.0: PDS credentials for community account (encrypted at rest) 74 nullString(community.PDSEmail), 75 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt 76 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt 77 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt 78 nullString(community.PDSURL), 79 // V2.0: No key columns - PDS manages all keys 80 community.Visibility, 81 community.AllowExternalDiscovery, 82 nullString(community.ModerationType), 83 pq.Array(community.ContentWarnings), 84 community.MemberCount, 85 community.SubscriberCount, 86 community.PostCount, 87 nullString(community.FederatedFrom), 88 nullString(community.FederatedID), 89 community.CreatedAt, 90 community.UpdatedAt, 91 nullString(community.RecordURI), 92 nullString(community.RecordCID), 93 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt) 94 if err != nil { 95 // Check for unique constraint violations 96 if strings.Contains(err.Error(), "duplicate key") { 97 if strings.Contains(err.Error(), "communities_did_key") { 98 return nil, communities.ErrCommunityAlreadyExists 99 } 100 if strings.Contains(err.Error(), "communities_handle_key") { 101 return nil, communities.ErrHandleTaken 102 } 103 } 104 return nil, fmt.Errorf("failed to create community: %w", err) 105 } 106 107 return community, nil 108} 109 110// GetByDID retrieves a community by its DID 111// Note: PDS credentials are included (for internal service use only) 112// Handlers MUST use json:"-" tags to prevent credential exposure in APIs 113// 114// V2.0: Key columns not included - PDS manages all keys 115func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) { 116 community := &communities.Community{} 117 query := ` 118 SELECT id, did, handle, name, display_name, description, description_facets, 119 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 120 pds_email, 121 CASE 122 WHEN pds_password_encrypted IS NOT NULL 123 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 124 ELSE NULL 125 END as pds_password, 126 CASE 127 WHEN pds_access_token_encrypted IS NOT NULL 128 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 129 ELSE NULL 130 END as pds_access_token, 131 CASE 132 WHEN pds_refresh_token_encrypted IS NOT NULL 133 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 134 ELSE NULL 135 END as pds_refresh_token, 136 pds_url, 137 visibility, allow_external_discovery, moderation_type, content_warnings, 138 member_count, subscriber_count, post_count, 139 federated_from, federated_id, created_at, updated_at, 140 record_uri, record_cid 141 FROM communities 142 WHERE did = $1` 143 144 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 145 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 146 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString 147 var descFacets []byte 148 var contentWarnings []string 149 150 err := r.db.QueryRowContext(ctx, query, did).Scan( 151 &community.ID, &community.DID, &community.Handle, &community.Name, 152 &displayName, &description, &descFacets, 153 &avatarCID, &bannerCID, 154 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 155 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt) 156 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL, 157 &community.Visibility, &community.AllowExternalDiscovery, 158 &moderationType, pq.Array(&contentWarnings), 159 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 160 &federatedFrom, &federatedID, 161 &community.CreatedAt, &community.UpdatedAt, 162 &recordURI, &recordCID, 163 ) 164 165 if err == sql.ErrNoRows { 166 return nil, communities.ErrCommunityNotFound 167 } 168 if err != nil { 169 return nil, fmt.Errorf("failed to get community by DID: %w", err) 170 } 171 172 // Map nullable fields 173 community.DisplayName = displayName.String 174 community.Description = description.String 175 community.AvatarCID = avatarCID.String 176 community.BannerCID = bannerCID.String 177 community.PDSEmail = pdsEmail.String 178 community.PDSPassword = pdsPassword.String 179 community.PDSAccessToken = pdsAccessToken.String 180 community.PDSRefreshToken = pdsRefreshToken.String 181 community.PDSURL = pdsURL.String 182 // V2.0: No key fields - PDS manages all keys 183 community.RotationKeyPEM = "" // Empty - PDS-managed 184 community.SigningKeyPEM = "" // Empty - PDS-managed 185 community.ModerationType = moderationType.String 186 community.ContentWarnings = contentWarnings 187 community.FederatedFrom = federatedFrom.String 188 community.FederatedID = federatedID.String 189 community.RecordURI = recordURI.String 190 community.RecordCID = recordCID.String 191 if descFacets != nil { 192 community.DescriptionFacets = descFacets 193 } 194 195 return community, nil 196} 197 198// GetByHandle retrieves a community by its scoped handle 199func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) { 200 community := &communities.Community{} 201 query := ` 202 SELECT id, did, handle, name, display_name, description, description_facets, 203 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 204 visibility, allow_external_discovery, moderation_type, content_warnings, 205 member_count, subscriber_count, post_count, 206 federated_from, federated_id, created_at, updated_at, 207 record_uri, record_cid 208 FROM communities 209 WHERE handle = $1` 210 211 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 212 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 213 var descFacets []byte 214 var contentWarnings []string 215 216 err := r.db.QueryRowContext(ctx, query, handle).Scan( 217 &community.ID, &community.DID, &community.Handle, &community.Name, 218 &displayName, &description, &descFacets, 219 &avatarCID, &bannerCID, 220 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 221 &community.Visibility, &community.AllowExternalDiscovery, 222 &moderationType, pq.Array(&contentWarnings), 223 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 224 &federatedFrom, &federatedID, 225 &community.CreatedAt, &community.UpdatedAt, 226 &recordURI, &recordCID, 227 ) 228 229 if err == sql.ErrNoRows { 230 return nil, communities.ErrCommunityNotFound 231 } 232 if err != nil { 233 return nil, fmt.Errorf("failed to get community by handle: %w", err) 234 } 235 236 // Map nullable fields 237 community.DisplayName = displayName.String 238 community.Description = description.String 239 community.AvatarCID = avatarCID.String 240 community.BannerCID = bannerCID.String 241 community.ModerationType = moderationType.String 242 community.ContentWarnings = contentWarnings 243 community.FederatedFrom = federatedFrom.String 244 community.FederatedID = federatedID.String 245 community.RecordURI = recordURI.String 246 community.RecordCID = recordCID.String 247 if descFacets != nil { 248 community.DescriptionFacets = descFacets 249 } 250 251 return community, nil 252} 253 254// Update modifies an existing community's metadata 255func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) { 256 query := ` 257 UPDATE communities 258 SET display_name = $2, description = $3, description_facets = $4, 259 avatar_cid = $5, banner_cid = $6, 260 visibility = $7, allow_external_discovery = $8, 261 moderation_type = $9, content_warnings = $10, 262 updated_at = NOW(), 263 record_uri = $11, record_cid = $12 264 WHERE did = $1 265 RETURNING updated_at` 266 267 // Handle JSONB field - use sql.NullString with valid JSON or NULL 268 var descFacets interface{} 269 if len(community.DescriptionFacets) > 0 { 270 descFacets = community.DescriptionFacets 271 } else { 272 descFacets = nil 273 } 274 275 err := r.db.QueryRowContext(ctx, query, 276 community.DID, 277 nullString(community.DisplayName), 278 nullString(community.Description), 279 descFacets, 280 nullString(community.AvatarCID), 281 nullString(community.BannerCID), 282 community.Visibility, 283 community.AllowExternalDiscovery, 284 nullString(community.ModerationType), 285 pq.Array(community.ContentWarnings), 286 nullString(community.RecordURI), 287 nullString(community.RecordCID), 288 ).Scan(&community.UpdatedAt) 289 290 if err == sql.ErrNoRows { 291 return nil, communities.ErrCommunityNotFound 292 } 293 if err != nil { 294 return nil, fmt.Errorf("failed to update community: %w", err) 295 } 296 297 return community, nil 298} 299 300// UpdateCredentials atomically updates community's PDS access and refresh tokens 301// CRITICAL: Both tokens must be updated together because refresh tokens are single-use 302// After a successful token refresh, the old refresh token is immediately revoked by the PDS 303func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error { 304 query := ` 305 UPDATE communities 306 SET 307 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 308 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 309 updated_at = NOW() 310 WHERE did = $1 311 RETURNING did` 312 313 var returnedDID string 314 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID) 315 316 if err == sql.ErrNoRows { 317 return communities.ErrCommunityNotFound 318 } 319 if err != nil { 320 return fmt.Errorf("failed to update credentials: %w", err) 321 } 322 323 return nil 324} 325 326// Delete removes a community from the database 327func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error { 328 query := `DELETE FROM communities WHERE did = $1` 329 330 result, err := r.db.ExecContext(ctx, query, did) 331 if err != nil { 332 return fmt.Errorf("failed to delete community: %w", err) 333 } 334 335 rowsAffected, err := result.RowsAffected() 336 if err != nil { 337 return fmt.Errorf("failed to check delete result: %w", err) 338 } 339 340 if rowsAffected == 0 { 341 return communities.ErrCommunityNotFound 342 } 343 344 return nil 345} 346 347// List retrieves communities with filtering and pagination 348func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, error) { 349 // Build query with filters 350 whereClauses := []string{} 351 args := []interface{}{} 352 argCount := 1 353 354 if req.Visibility != "" { 355 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 356 args = append(args, req.Visibility) 357 argCount++ 358 } 359 360 // TODO: Add category filter when DB schema supports it 361 // if req.Category != "" { ... } 362 363 // TODO: Add language filter when DB schema supports it 364 // if req.Language != "" { ... } 365 366 whereClause := "" 367 if len(whereClauses) > 0 { 368 whereClause = "WHERE " + strings.Join(whereClauses, " AND ") 369 } 370 371 // Build sort clause - map sort enum to DB columns 372 sortColumn := "subscriber_count" // default: popular 373 sortOrder := "DESC" 374 375 switch req.Sort { 376 case "popular": 377 // Most subscribers (default) 378 sortColumn = "subscriber_count" 379 sortOrder = "DESC" 380 case "active": 381 // Most posts/activity 382 sortColumn = "post_count" 383 sortOrder = "DESC" 384 case "new": 385 // Recently created 386 sortColumn = "created_at" 387 sortOrder = "DESC" 388 case "alphabetical": 389 // Sorted by name A-Z 390 sortColumn = "name" 391 sortOrder = "ASC" 392 default: 393 // Fallback to popular if empty or invalid (should be validated in handler) 394 sortColumn = "subscriber_count" 395 sortOrder = "DESC" 396 } 397 398 // Get communities with pagination 399 query := fmt.Sprintf(` 400 SELECT id, did, handle, name, display_name, description, description_facets, 401 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 402 visibility, allow_external_discovery, moderation_type, content_warnings, 403 member_count, subscriber_count, post_count, 404 federated_from, federated_id, created_at, updated_at, 405 record_uri, record_cid 406 FROM communities 407 %s 408 ORDER BY %s %s 409 LIMIT $%d OFFSET $%d`, 410 whereClause, sortColumn, sortOrder, argCount, argCount+1) 411 412 args = append(args, req.Limit, req.Offset) 413 414 rows, err := r.db.QueryContext(ctx, query, args...) 415 if err != nil { 416 return nil, fmt.Errorf("failed to list communities: %w", err) 417 } 418 defer func() { 419 if closeErr := rows.Close(); closeErr != nil { 420 log.Printf("Failed to close rows: %v", closeErr) 421 } 422 }() 423 424 result := []*communities.Community{} 425 for rows.Next() { 426 community := &communities.Community{} 427 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 428 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 429 var descFacets []byte 430 var contentWarnings []string 431 432 scanErr := rows.Scan( 433 &community.ID, &community.DID, &community.Handle, &community.Name, 434 &displayName, &description, &descFacets, 435 &avatarCID, &bannerCID, 436 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 437 &community.Visibility, &community.AllowExternalDiscovery, 438 &moderationType, pq.Array(&contentWarnings), 439 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 440 &federatedFrom, &federatedID, 441 &community.CreatedAt, &community.UpdatedAt, 442 &recordURI, &recordCID, 443 ) 444 if scanErr != nil { 445 return nil, fmt.Errorf("failed to scan community: %w", scanErr) 446 } 447 448 // Map nullable fields 449 community.DisplayName = displayName.String 450 community.Description = description.String 451 community.AvatarCID = avatarCID.String 452 community.BannerCID = bannerCID.String 453 community.ModerationType = moderationType.String 454 community.ContentWarnings = contentWarnings 455 community.FederatedFrom = federatedFrom.String 456 community.FederatedID = federatedID.String 457 community.RecordURI = recordURI.String 458 community.RecordCID = recordCID.String 459 if descFacets != nil { 460 community.DescriptionFacets = descFacets 461 } 462 463 result = append(result, community) 464 } 465 466 if err = rows.Err(); err != nil { 467 return nil, fmt.Errorf("error iterating communities: %w", err) 468 } 469 470 return result, nil 471} 472 473// Search searches communities by name/description using fuzzy matching 474func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) { 475 // Build query with fuzzy search and visibility filter 476 whereClauses := []string{ 477 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')", 478 } 479 args := []interface{}{req.Query} 480 argCount := 2 481 482 if req.Visibility != "" { 483 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 484 args = append(args, req.Visibility) 485 argCount++ 486 } 487 488 whereClause := "WHERE " + strings.Join(whereClauses, " AND ") 489 490 // Get total count 491 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 492 var totalCount int 493 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 494 if err != nil { 495 return nil, 0, fmt.Errorf("failed to count search results: %w", err) 496 } 497 498 // Search with relevance ranking using pg_trgm similarity 499 // Filter out results with very low relevance (< 0.2) to avoid noise 500 query := fmt.Sprintf(` 501 SELECT id, did, handle, name, display_name, description, description_facets, 502 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 503 visibility, allow_external_discovery, moderation_type, content_warnings, 504 member_count, subscriber_count, post_count, 505 federated_from, federated_id, created_at, updated_at, 506 record_uri, record_cid, 507 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance 508 FROM communities 509 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2 510 ORDER BY relevance DESC, member_count DESC 511 LIMIT $%d OFFSET $%d`, 512 whereClause, argCount, argCount+1) 513 514 args = append(args, req.Limit, req.Offset) 515 516 rows, err := r.db.QueryContext(ctx, query, args...) 517 if err != nil { 518 return nil, 0, fmt.Errorf("failed to search communities: %w", err) 519 } 520 defer func() { 521 if closeErr := rows.Close(); closeErr != nil { 522 log.Printf("Failed to close rows: %v", closeErr) 523 } 524 }() 525 526 result := []*communities.Community{} 527 for rows.Next() { 528 community := &communities.Community{} 529 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 530 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 531 var descFacets []byte 532 var contentWarnings []string 533 var relevance float64 534 535 scanErr := rows.Scan( 536 &community.ID, &community.DID, &community.Handle, &community.Name, 537 &displayName, &description, &descFacets, 538 &avatarCID, &bannerCID, 539 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 540 &community.Visibility, &community.AllowExternalDiscovery, 541 &moderationType, pq.Array(&contentWarnings), 542 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 543 &federatedFrom, &federatedID, 544 &community.CreatedAt, &community.UpdatedAt, 545 &recordURI, &recordCID, 546 &relevance, 547 ) 548 if scanErr != nil { 549 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 550 } 551 552 // Map nullable fields 553 community.DisplayName = displayName.String 554 community.Description = description.String 555 community.AvatarCID = avatarCID.String 556 community.BannerCID = bannerCID.String 557 community.ModerationType = moderationType.String 558 community.ContentWarnings = contentWarnings 559 community.FederatedFrom = federatedFrom.String 560 community.FederatedID = federatedID.String 561 community.RecordURI = recordURI.String 562 community.RecordCID = recordCID.String 563 if descFacets != nil { 564 community.DescriptionFacets = descFacets 565 } 566 567 result = append(result, community) 568 } 569 570 if err = rows.Err(); err != nil { 571 return nil, 0, fmt.Errorf("error iterating search results: %w", err) 572 } 573 574 return result, totalCount, nil 575} 576 577// Helper functions 578func nullString(s string) sql.NullString { 579 return sql.NullString{String: s, Valid: s != ""} 580}