A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "log" 8 "strings" 9 10 "Coves/internal/core/communities" 11 12 "github.com/lib/pq" 13) 14 15type postgresCommunityRepo struct { 16 db *sql.DB 17} 18 19// NewCommunityRepository creates a new PostgreSQL community repository 20func NewCommunityRepository(db *sql.DB) communities.Repository { 21 return &postgresCommunityRepo{db: db} 22} 23 24// Create inserts a new community into the communities table 25func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) { 26 // Validate that handle is always provided (constructed by consumer) 27 if community.Handle == "" { 28 return nil, fmt.Errorf("handle is required (should be constructed by consumer before insert)") 29 } 30 31 query := ` 32 INSERT INTO communities ( 33 did, handle, name, display_name, description, description_facets, 34 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 35 pds_email, pds_password_encrypted, 36 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url, 37 visibility, allow_external_discovery, moderation_type, content_warnings, 38 member_count, subscriber_count, post_count, 39 federated_from, federated_id, created_at, updated_at, 40 record_uri, record_cid 41 ) VALUES ( 42 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, 43 $12, 44 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 45 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 46 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 47 $16, 48 $17, $18, $19, $20, 49 $21, $22, $23, $24, $25, $26, $27, $28, $29 50 ) 51 RETURNING id, created_at, updated_at` 52 53 // Handle JSONB field - use sql.NullString with valid JSON or NULL 54 var descFacets interface{} 55 if len(community.DescriptionFacets) > 0 { 56 descFacets = community.DescriptionFacets 57 } else { 58 descFacets = nil 59 } 60 61 err := r.db.QueryRowContext(ctx, query, 62 community.DID, 63 community.Handle, // Always non-empty - constructed by AppView consumer 64 community.Name, 65 nullString(community.DisplayName), 66 nullString(community.Description), 67 descFacets, 68 nullString(community.AvatarCID), 69 nullString(community.BannerCID), 70 community.OwnerDID, 71 community.CreatedByDID, 72 community.HostedByDID, 73 // V2.0: PDS credentials for community account (encrypted at rest) 74 nullString(community.PDSEmail), 75 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt 76 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt 77 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt 78 nullString(community.PDSURL), 79 // V2.0: No key columns - PDS manages all keys 80 community.Visibility, 81 community.AllowExternalDiscovery, 82 nullString(community.ModerationType), 83 pq.Array(community.ContentWarnings), 84 community.MemberCount, 85 community.SubscriberCount, 86 community.PostCount, 87 nullString(community.FederatedFrom), 88 nullString(community.FederatedID), 89 community.CreatedAt, 90 community.UpdatedAt, 91 nullString(community.RecordURI), 92 nullString(community.RecordCID), 93 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt) 94 if err != nil { 95 // Check for unique constraint violations 96 if strings.Contains(err.Error(), "duplicate key") { 97 if strings.Contains(err.Error(), "communities_did_key") { 98 return nil, communities.ErrCommunityAlreadyExists 99 } 100 if strings.Contains(err.Error(), "communities_handle_key") { 101 return nil, communities.ErrHandleTaken 102 } 103 } 104 return nil, fmt.Errorf("failed to create community: %w", err) 105 } 106 107 return community, nil 108} 109 110// GetByDID retrieves a community by its DID 111// Note: PDS credentials are included (for internal service use only) 112// Handlers MUST use json:"-" tags to prevent credential exposure in APIs 113// 114// V2.0: Key columns not included - PDS manages all keys 115func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) { 116 community := &communities.Community{} 117 query := ` 118 SELECT id, did, handle, name, display_name, description, description_facets, 119 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 120 pds_email, 121 CASE 122 WHEN pds_password_encrypted IS NOT NULL 123 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 124 ELSE NULL 125 END as pds_password, 126 CASE 127 WHEN pds_access_token_encrypted IS NOT NULL 128 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 129 ELSE NULL 130 END as pds_access_token, 131 CASE 132 WHEN pds_refresh_token_encrypted IS NOT NULL 133 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 134 ELSE NULL 135 END as pds_refresh_token, 136 pds_url, 137 visibility, allow_external_discovery, moderation_type, content_warnings, 138 member_count, subscriber_count, post_count, 139 federated_from, federated_id, created_at, updated_at, 140 record_uri, record_cid 141 FROM communities 142 WHERE did = $1` 143 144 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 145 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 146 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString 147 var descFacets []byte 148 var contentWarnings []string 149 150 err := r.db.QueryRowContext(ctx, query, did).Scan( 151 &community.ID, &community.DID, &community.Handle, &community.Name, 152 &displayName, &description, &descFacets, 153 &avatarCID, &bannerCID, 154 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 155 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt) 156 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL, 157 &community.Visibility, &community.AllowExternalDiscovery, 158 &moderationType, pq.Array(&contentWarnings), 159 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 160 &federatedFrom, &federatedID, 161 &community.CreatedAt, &community.UpdatedAt, 162 &recordURI, &recordCID, 163 ) 164 165 if err == sql.ErrNoRows { 166 return nil, communities.ErrCommunityNotFound 167 } 168 if err != nil { 169 return nil, fmt.Errorf("failed to get community by DID: %w", err) 170 } 171 172 // Map nullable fields 173 community.DisplayName = displayName.String 174 community.Description = description.String 175 community.AvatarCID = avatarCID.String 176 community.BannerCID = bannerCID.String 177 community.PDSEmail = pdsEmail.String 178 community.PDSPassword = pdsPassword.String 179 community.PDSAccessToken = pdsAccessToken.String 180 community.PDSRefreshToken = pdsRefreshToken.String 181 community.PDSURL = pdsURL.String 182 // V2.0: No key fields - PDS manages all keys 183 community.RotationKeyPEM = "" // Empty - PDS-managed 184 community.SigningKeyPEM = "" // Empty - PDS-managed 185 community.ModerationType = moderationType.String 186 community.ContentWarnings = contentWarnings 187 community.FederatedFrom = federatedFrom.String 188 community.FederatedID = federatedID.String 189 community.RecordURI = recordURI.String 190 community.RecordCID = recordCID.String 191 if descFacets != nil { 192 community.DescriptionFacets = descFacets 193 } 194 195 return community, nil 196} 197 198// GetByHandle retrieves a community by its scoped handle 199func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) { 200 community := &communities.Community{} 201 query := ` 202 SELECT id, did, handle, name, display_name, description, description_facets, 203 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 204 visibility, allow_external_discovery, moderation_type, content_warnings, 205 member_count, subscriber_count, post_count, 206 federated_from, federated_id, created_at, updated_at, 207 record_uri, record_cid 208 FROM communities 209 WHERE handle = $1` 210 211 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 212 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 213 var descFacets []byte 214 var contentWarnings []string 215 216 err := r.db.QueryRowContext(ctx, query, handle).Scan( 217 &community.ID, &community.DID, &community.Handle, &community.Name, 218 &displayName, &description, &descFacets, 219 &avatarCID, &bannerCID, 220 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 221 &community.Visibility, &community.AllowExternalDiscovery, 222 &moderationType, pq.Array(&contentWarnings), 223 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 224 &federatedFrom, &federatedID, 225 &community.CreatedAt, &community.UpdatedAt, 226 &recordURI, &recordCID, 227 ) 228 229 if err == sql.ErrNoRows { 230 return nil, communities.ErrCommunityNotFound 231 } 232 if err != nil { 233 return nil, fmt.Errorf("failed to get community by handle: %w", err) 234 } 235 236 // Map nullable fields 237 community.DisplayName = displayName.String 238 community.Description = description.String 239 community.AvatarCID = avatarCID.String 240 community.BannerCID = bannerCID.String 241 community.ModerationType = moderationType.String 242 community.ContentWarnings = contentWarnings 243 community.FederatedFrom = federatedFrom.String 244 community.FederatedID = federatedID.String 245 community.RecordURI = recordURI.String 246 community.RecordCID = recordCID.String 247 if descFacets != nil { 248 community.DescriptionFacets = descFacets 249 } 250 251 return community, nil 252} 253 254// Update modifies an existing community's metadata 255func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) { 256 query := ` 257 UPDATE communities 258 SET display_name = $2, description = $3, description_facets = $4, 259 avatar_cid = $5, banner_cid = $6, 260 visibility = $7, allow_external_discovery = $8, 261 moderation_type = $9, content_warnings = $10, 262 updated_at = NOW(), 263 record_uri = $11, record_cid = $12 264 WHERE did = $1 265 RETURNING updated_at` 266 267 // Handle JSONB field - use sql.NullString with valid JSON or NULL 268 var descFacets interface{} 269 if len(community.DescriptionFacets) > 0 { 270 descFacets = community.DescriptionFacets 271 } else { 272 descFacets = nil 273 } 274 275 err := r.db.QueryRowContext(ctx, query, 276 community.DID, 277 nullString(community.DisplayName), 278 nullString(community.Description), 279 descFacets, 280 nullString(community.AvatarCID), 281 nullString(community.BannerCID), 282 community.Visibility, 283 community.AllowExternalDiscovery, 284 nullString(community.ModerationType), 285 pq.Array(community.ContentWarnings), 286 nullString(community.RecordURI), 287 nullString(community.RecordCID), 288 ).Scan(&community.UpdatedAt) 289 290 if err == sql.ErrNoRows { 291 return nil, communities.ErrCommunityNotFound 292 } 293 if err != nil { 294 return nil, fmt.Errorf("failed to update community: %w", err) 295 } 296 297 return community, nil 298} 299 300// UpdateCredentials atomically updates community's PDS access and refresh tokens 301// CRITICAL: Both tokens must be updated together because refresh tokens are single-use 302// After a successful token refresh, the old refresh token is immediately revoked by the PDS 303func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error { 304 query := ` 305 UPDATE communities 306 SET 307 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 308 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 309 updated_at = NOW() 310 WHERE did = $1 311 RETURNING did` 312 313 var returnedDID string 314 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID) 315 316 if err == sql.ErrNoRows { 317 return communities.ErrCommunityNotFound 318 } 319 if err != nil { 320 return fmt.Errorf("failed to update credentials: %w", err) 321 } 322 323 return nil 324} 325 326// Delete removes a community from the database 327func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error { 328 query := `DELETE FROM communities WHERE did = $1` 329 330 result, err := r.db.ExecContext(ctx, query, did) 331 if err != nil { 332 return fmt.Errorf("failed to delete community: %w", err) 333 } 334 335 rowsAffected, err := result.RowsAffected() 336 if err != nil { 337 return fmt.Errorf("failed to check delete result: %w", err) 338 } 339 340 if rowsAffected == 0 { 341 return communities.ErrCommunityNotFound 342 } 343 344 return nil 345} 346 347// List retrieves communities with filtering and pagination 348func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) { 349 // Build query with filters 350 whereClauses := []string{} 351 args := []interface{}{} 352 argCount := 1 353 354 if req.Visibility != "" { 355 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 356 args = append(args, req.Visibility) 357 argCount++ 358 } 359 360 if req.HostedBy != "" { 361 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount)) 362 args = append(args, req.HostedBy) 363 argCount++ 364 } 365 366 whereClause := "" 367 if len(whereClauses) > 0 { 368 whereClause = "WHERE " + strings.Join(whereClauses, " AND ") 369 } 370 371 // Get total count 372 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 373 var totalCount int 374 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 375 if err != nil { 376 return nil, 0, fmt.Errorf("failed to count communities: %w", err) 377 } 378 379 // Build sort clause 380 sortColumn := "created_at" 381 if req.SortBy != "" { 382 switch req.SortBy { 383 case "member_count", "subscriber_count", "post_count", "created_at": 384 sortColumn = req.SortBy 385 } 386 } 387 388 sortOrder := "DESC" 389 if strings.ToUpper(req.SortOrder) == "ASC" { 390 sortOrder = "ASC" 391 } 392 393 // Get communities with pagination 394 query := fmt.Sprintf(` 395 SELECT id, did, handle, name, display_name, description, description_facets, 396 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 397 visibility, allow_external_discovery, moderation_type, content_warnings, 398 member_count, subscriber_count, post_count, 399 federated_from, federated_id, created_at, updated_at, 400 record_uri, record_cid 401 FROM communities 402 %s 403 ORDER BY %s %s 404 LIMIT $%d OFFSET $%d`, 405 whereClause, sortColumn, sortOrder, argCount, argCount+1) 406 407 args = append(args, req.Limit, req.Offset) 408 409 rows, err := r.db.QueryContext(ctx, query, args...) 410 if err != nil { 411 return nil, 0, fmt.Errorf("failed to list communities: %w", err) 412 } 413 defer func() { 414 if closeErr := rows.Close(); closeErr != nil { 415 log.Printf("Failed to close rows: %v", closeErr) 416 } 417 }() 418 419 result := []*communities.Community{} 420 for rows.Next() { 421 community := &communities.Community{} 422 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 423 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 424 var descFacets []byte 425 var contentWarnings []string 426 427 scanErr := rows.Scan( 428 &community.ID, &community.DID, &community.Handle, &community.Name, 429 &displayName, &description, &descFacets, 430 &avatarCID, &bannerCID, 431 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 432 &community.Visibility, &community.AllowExternalDiscovery, 433 &moderationType, pq.Array(&contentWarnings), 434 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 435 &federatedFrom, &federatedID, 436 &community.CreatedAt, &community.UpdatedAt, 437 &recordURI, &recordCID, 438 ) 439 if scanErr != nil { 440 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 441 } 442 443 // Map nullable fields 444 community.DisplayName = displayName.String 445 community.Description = description.String 446 community.AvatarCID = avatarCID.String 447 community.BannerCID = bannerCID.String 448 community.ModerationType = moderationType.String 449 community.ContentWarnings = contentWarnings 450 community.FederatedFrom = federatedFrom.String 451 community.FederatedID = federatedID.String 452 community.RecordURI = recordURI.String 453 community.RecordCID = recordCID.String 454 if descFacets != nil { 455 community.DescriptionFacets = descFacets 456 } 457 458 result = append(result, community) 459 } 460 461 if err = rows.Err(); err != nil { 462 return nil, 0, fmt.Errorf("error iterating communities: %w", err) 463 } 464 465 return result, totalCount, nil 466} 467 468// Search searches communities by name/description using fuzzy matching 469func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) { 470 // Build query with fuzzy search and visibility filter 471 whereClauses := []string{ 472 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')", 473 } 474 args := []interface{}{req.Query} 475 argCount := 2 476 477 if req.Visibility != "" { 478 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 479 args = append(args, req.Visibility) 480 argCount++ 481 } 482 483 whereClause := "WHERE " + strings.Join(whereClauses, " AND ") 484 485 // Get total count 486 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 487 var totalCount int 488 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 489 if err != nil { 490 return nil, 0, fmt.Errorf("failed to count search results: %w", err) 491 } 492 493 // Search with relevance ranking using pg_trgm similarity 494 // Filter out results with very low relevance (< 0.2) to avoid noise 495 query := fmt.Sprintf(` 496 SELECT id, did, handle, name, display_name, description, description_facets, 497 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 498 visibility, allow_external_discovery, moderation_type, content_warnings, 499 member_count, subscriber_count, post_count, 500 federated_from, federated_id, created_at, updated_at, 501 record_uri, record_cid, 502 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance 503 FROM communities 504 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2 505 ORDER BY relevance DESC, member_count DESC 506 LIMIT $%d OFFSET $%d`, 507 whereClause, argCount, argCount+1) 508 509 args = append(args, req.Limit, req.Offset) 510 511 rows, err := r.db.QueryContext(ctx, query, args...) 512 if err != nil { 513 return nil, 0, fmt.Errorf("failed to search communities: %w", err) 514 } 515 defer func() { 516 if closeErr := rows.Close(); closeErr != nil { 517 log.Printf("Failed to close rows: %v", closeErr) 518 } 519 }() 520 521 result := []*communities.Community{} 522 for rows.Next() { 523 community := &communities.Community{} 524 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 525 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 526 var descFacets []byte 527 var contentWarnings []string 528 var relevance float64 529 530 scanErr := rows.Scan( 531 &community.ID, &community.DID, &community.Handle, &community.Name, 532 &displayName, &description, &descFacets, 533 &avatarCID, &bannerCID, 534 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 535 &community.Visibility, &community.AllowExternalDiscovery, 536 &moderationType, pq.Array(&contentWarnings), 537 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 538 &federatedFrom, &federatedID, 539 &community.CreatedAt, &community.UpdatedAt, 540 &recordURI, &recordCID, 541 &relevance, 542 ) 543 if scanErr != nil { 544 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 545 } 546 547 // Map nullable fields 548 community.DisplayName = displayName.String 549 community.Description = description.String 550 community.AvatarCID = avatarCID.String 551 community.BannerCID = bannerCID.String 552 community.ModerationType = moderationType.String 553 community.ContentWarnings = contentWarnings 554 community.FederatedFrom = federatedFrom.String 555 community.FederatedID = federatedID.String 556 community.RecordURI = recordURI.String 557 community.RecordCID = recordCID.String 558 if descFacets != nil { 559 community.DescriptionFacets = descFacets 560 } 561 562 result = append(result, community) 563 } 564 565 if err = rows.Err(); err != nil { 566 return nil, 0, fmt.Errorf("error iterating search results: %w", err) 567 } 568 569 return result, totalCount, nil 570} 571 572// Helper functions 573func nullString(s string) sql.NullString { 574 return sql.NullString{String: s, Valid: s != ""} 575}