A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "database/sql" 7 "fmt" 8 "log" 9 "strings" 10 11 "github.com/lib/pq" 12) 13 14type postgresCommunityRepo struct { 15 db *sql.DB 16} 17 18// NewCommunityRepository creates a new PostgreSQL community repository 19func NewCommunityRepository(db *sql.DB) communities.Repository { 20 return &postgresCommunityRepo{db: db} 21} 22 23// Create inserts a new community into the communities table 24func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) { 25 // Validate that handle is always provided (constructed by consumer) 26 if community.Handle == "" { 27 return nil, fmt.Errorf("handle is required (should be constructed by consumer before insert)") 28 } 29 30 query := ` 31 INSERT INTO communities ( 32 did, handle, name, display_name, description, description_facets, 33 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 34 pds_email, pds_password_encrypted, 35 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url, 36 visibility, allow_external_discovery, moderation_type, content_warnings, 37 member_count, subscriber_count, post_count, 38 federated_from, federated_id, created_at, updated_at, 39 record_uri, record_cid 40 ) VALUES ( 41 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, 42 $12, 43 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 44 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 45 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 46 $16, 47 $17, $18, $19, $20, 48 $21, $22, $23, $24, $25, $26, $27, $28, $29 49 ) 50 RETURNING id, created_at, updated_at` 51 52 // Handle JSONB field - use sql.NullString with valid JSON or NULL 53 var descFacets interface{} 54 if len(community.DescriptionFacets) > 0 { 55 descFacets = community.DescriptionFacets 56 } else { 57 descFacets = nil 58 } 59 60 err := r.db.QueryRowContext(ctx, query, 61 community.DID, 62 community.Handle, // Always non-empty - constructed by AppView consumer 63 community.Name, 64 nullString(community.DisplayName), 65 nullString(community.Description), 66 descFacets, 67 nullString(community.AvatarCID), 68 nullString(community.BannerCID), 69 community.OwnerDID, 70 community.CreatedByDID, 71 community.HostedByDID, 72 // V2.0: PDS credentials for community account (encrypted at rest) 73 nullString(community.PDSEmail), 74 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt 75 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt 76 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt 77 nullString(community.PDSURL), 78 // V2.0: No key columns - PDS manages all keys 79 community.Visibility, 80 community.AllowExternalDiscovery, 81 nullString(community.ModerationType), 82 pq.Array(community.ContentWarnings), 83 community.MemberCount, 84 community.SubscriberCount, 85 community.PostCount, 86 nullString(community.FederatedFrom), 87 nullString(community.FederatedID), 88 community.CreatedAt, 89 community.UpdatedAt, 90 nullString(community.RecordURI), 91 nullString(community.RecordCID), 92 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt) 93 if err != nil { 94 // Check for unique constraint violations 95 if strings.Contains(err.Error(), "duplicate key") { 96 if strings.Contains(err.Error(), "communities_did_key") { 97 return nil, communities.ErrCommunityAlreadyExists 98 } 99 if strings.Contains(err.Error(), "communities_handle_key") { 100 return nil, communities.ErrHandleTaken 101 } 102 } 103 return nil, fmt.Errorf("failed to create community: %w", err) 104 } 105 106 return community, nil 107} 108 109// GetByDID retrieves a community by its DID 110// Note: PDS credentials are included (for internal service use only) 111// Handlers MUST use json:"-" tags to prevent credential exposure in APIs 112// 113// V2.0: Key columns not included - PDS manages all keys 114func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) { 115 community := &communities.Community{} 116 query := ` 117 SELECT id, did, handle, name, display_name, description, description_facets, 118 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 119 pds_email, 120 CASE 121 WHEN pds_password_encrypted IS NOT NULL 122 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 123 ELSE NULL 124 END as pds_password, 125 CASE 126 WHEN pds_access_token_encrypted IS NOT NULL 127 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 128 ELSE NULL 129 END as pds_access_token, 130 CASE 131 WHEN pds_refresh_token_encrypted IS NOT NULL 132 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 133 ELSE NULL 134 END as pds_refresh_token, 135 pds_url, 136 visibility, allow_external_discovery, moderation_type, content_warnings, 137 member_count, subscriber_count, post_count, 138 federated_from, federated_id, created_at, updated_at, 139 record_uri, record_cid 140 FROM communities 141 WHERE did = $1` 142 143 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 144 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 145 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString 146 var descFacets []byte 147 var contentWarnings []string 148 149 err := r.db.QueryRowContext(ctx, query, did).Scan( 150 &community.ID, &community.DID, &community.Handle, &community.Name, 151 &displayName, &description, &descFacets, 152 &avatarCID, &bannerCID, 153 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 154 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt) 155 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL, 156 &community.Visibility, &community.AllowExternalDiscovery, 157 &moderationType, pq.Array(&contentWarnings), 158 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 159 &federatedFrom, &federatedID, 160 &community.CreatedAt, &community.UpdatedAt, 161 &recordURI, &recordCID, 162 ) 163 164 if err == sql.ErrNoRows { 165 return nil, communities.ErrCommunityNotFound 166 } 167 if err != nil { 168 return nil, fmt.Errorf("failed to get community by DID: %w", err) 169 } 170 171 // Map nullable fields 172 community.DisplayName = displayName.String 173 community.Description = description.String 174 community.AvatarCID = avatarCID.String 175 community.BannerCID = bannerCID.String 176 community.PDSEmail = pdsEmail.String 177 community.PDSPassword = pdsPassword.String 178 community.PDSAccessToken = pdsAccessToken.String 179 community.PDSRefreshToken = pdsRefreshToken.String 180 community.PDSURL = pdsURL.String 181 // V2.0: No key fields - PDS manages all keys 182 community.RotationKeyPEM = "" // Empty - PDS-managed 183 community.SigningKeyPEM = "" // Empty - PDS-managed 184 community.ModerationType = moderationType.String 185 community.ContentWarnings = contentWarnings 186 community.FederatedFrom = federatedFrom.String 187 community.FederatedID = federatedID.String 188 community.RecordURI = recordURI.String 189 community.RecordCID = recordCID.String 190 if descFacets != nil { 191 community.DescriptionFacets = descFacets 192 } 193 194 return community, nil 195} 196 197// GetByHandle retrieves a community by its scoped handle 198func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) { 199 community := &communities.Community{} 200 query := ` 201 SELECT id, did, handle, name, display_name, description, description_facets, 202 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 203 visibility, allow_external_discovery, moderation_type, content_warnings, 204 member_count, subscriber_count, post_count, 205 federated_from, federated_id, created_at, updated_at, 206 record_uri, record_cid 207 FROM communities 208 WHERE handle = $1` 209 210 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 211 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 212 var descFacets []byte 213 var contentWarnings []string 214 215 err := r.db.QueryRowContext(ctx, query, handle).Scan( 216 &community.ID, &community.DID, &community.Handle, &community.Name, 217 &displayName, &description, &descFacets, 218 &avatarCID, &bannerCID, 219 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 220 &community.Visibility, &community.AllowExternalDiscovery, 221 &moderationType, pq.Array(&contentWarnings), 222 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 223 &federatedFrom, &federatedID, 224 &community.CreatedAt, &community.UpdatedAt, 225 &recordURI, &recordCID, 226 ) 227 228 if err == sql.ErrNoRows { 229 return nil, communities.ErrCommunityNotFound 230 } 231 if err != nil { 232 return nil, fmt.Errorf("failed to get community by handle: %w", err) 233 } 234 235 // Map nullable fields 236 community.DisplayName = displayName.String 237 community.Description = description.String 238 community.AvatarCID = avatarCID.String 239 community.BannerCID = bannerCID.String 240 community.ModerationType = moderationType.String 241 community.ContentWarnings = contentWarnings 242 community.FederatedFrom = federatedFrom.String 243 community.FederatedID = federatedID.String 244 community.RecordURI = recordURI.String 245 community.RecordCID = recordCID.String 246 if descFacets != nil { 247 community.DescriptionFacets = descFacets 248 } 249 250 return community, nil 251} 252 253// Update modifies an existing community's metadata 254func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) { 255 query := ` 256 UPDATE communities 257 SET display_name = $2, description = $3, description_facets = $4, 258 avatar_cid = $5, banner_cid = $6, 259 visibility = $7, allow_external_discovery = $8, 260 moderation_type = $9, content_warnings = $10, 261 updated_at = NOW(), 262 record_uri = $11, record_cid = $12 263 WHERE did = $1 264 RETURNING updated_at` 265 266 // Handle JSONB field - use sql.NullString with valid JSON or NULL 267 var descFacets interface{} 268 if len(community.DescriptionFacets) > 0 { 269 descFacets = community.DescriptionFacets 270 } else { 271 descFacets = nil 272 } 273 274 err := r.db.QueryRowContext(ctx, query, 275 community.DID, 276 nullString(community.DisplayName), 277 nullString(community.Description), 278 descFacets, 279 nullString(community.AvatarCID), 280 nullString(community.BannerCID), 281 community.Visibility, 282 community.AllowExternalDiscovery, 283 nullString(community.ModerationType), 284 pq.Array(community.ContentWarnings), 285 nullString(community.RecordURI), 286 nullString(community.RecordCID), 287 ).Scan(&community.UpdatedAt) 288 289 if err == sql.ErrNoRows { 290 return nil, communities.ErrCommunityNotFound 291 } 292 if err != nil { 293 return nil, fmt.Errorf("failed to update community: %w", err) 294 } 295 296 return community, nil 297} 298 299// UpdateCredentials atomically updates community's PDS access and refresh tokens 300// CRITICAL: Both tokens must be updated together because refresh tokens are single-use 301// After a successful token refresh, the old refresh token is immediately revoked by the PDS 302func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error { 303 query := ` 304 UPDATE communities 305 SET 306 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 307 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 308 updated_at = NOW() 309 WHERE did = $1 310 RETURNING did` 311 312 var returnedDID string 313 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID) 314 315 if err == sql.ErrNoRows { 316 return communities.ErrCommunityNotFound 317 } 318 if err != nil { 319 return fmt.Errorf("failed to update credentials: %w", err) 320 } 321 322 return nil 323} 324 325// Delete removes a community from the database 326func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error { 327 query := `DELETE FROM communities WHERE did = $1` 328 329 result, err := r.db.ExecContext(ctx, query, did) 330 if err != nil { 331 return fmt.Errorf("failed to delete community: %w", err) 332 } 333 334 rowsAffected, err := result.RowsAffected() 335 if err != nil { 336 return fmt.Errorf("failed to check delete result: %w", err) 337 } 338 339 if rowsAffected == 0 { 340 return communities.ErrCommunityNotFound 341 } 342 343 return nil 344} 345 346// List retrieves communities with filtering and pagination 347func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) { 348 // Build query with filters 349 whereClauses := []string{} 350 args := []interface{}{} 351 argCount := 1 352 353 if req.Visibility != "" { 354 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 355 args = append(args, req.Visibility) 356 argCount++ 357 } 358 359 if req.HostedBy != "" { 360 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount)) 361 args = append(args, req.HostedBy) 362 argCount++ 363 } 364 365 whereClause := "" 366 if len(whereClauses) > 0 { 367 whereClause = "WHERE " + strings.Join(whereClauses, " AND ") 368 } 369 370 // Get total count 371 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 372 var totalCount int 373 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 374 if err != nil { 375 return nil, 0, fmt.Errorf("failed to count communities: %w", err) 376 } 377 378 // Build sort clause 379 sortColumn := "created_at" 380 if req.SortBy != "" { 381 switch req.SortBy { 382 case "member_count", "subscriber_count", "post_count", "created_at": 383 sortColumn = req.SortBy 384 } 385 } 386 387 sortOrder := "DESC" 388 if strings.ToUpper(req.SortOrder) == "ASC" { 389 sortOrder = "ASC" 390 } 391 392 // Get communities with pagination 393 query := fmt.Sprintf(` 394 SELECT id, did, handle, name, display_name, description, description_facets, 395 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 396 visibility, allow_external_discovery, moderation_type, content_warnings, 397 member_count, subscriber_count, post_count, 398 federated_from, federated_id, created_at, updated_at, 399 record_uri, record_cid 400 FROM communities 401 %s 402 ORDER BY %s %s 403 LIMIT $%d OFFSET $%d`, 404 whereClause, sortColumn, sortOrder, argCount, argCount+1) 405 406 args = append(args, req.Limit, req.Offset) 407 408 rows, err := r.db.QueryContext(ctx, query, args...) 409 if err != nil { 410 return nil, 0, fmt.Errorf("failed to list communities: %w", err) 411 } 412 defer func() { 413 if closeErr := rows.Close(); closeErr != nil { 414 log.Printf("Failed to close rows: %v", closeErr) 415 } 416 }() 417 418 result := []*communities.Community{} 419 for rows.Next() { 420 community := &communities.Community{} 421 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 422 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 423 var descFacets []byte 424 var contentWarnings []string 425 426 scanErr := rows.Scan( 427 &community.ID, &community.DID, &community.Handle, &community.Name, 428 &displayName, &description, &descFacets, 429 &avatarCID, &bannerCID, 430 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 431 &community.Visibility, &community.AllowExternalDiscovery, 432 &moderationType, pq.Array(&contentWarnings), 433 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 434 &federatedFrom, &federatedID, 435 &community.CreatedAt, &community.UpdatedAt, 436 &recordURI, &recordCID, 437 ) 438 if scanErr != nil { 439 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 440 } 441 442 // Map nullable fields 443 community.DisplayName = displayName.String 444 community.Description = description.String 445 community.AvatarCID = avatarCID.String 446 community.BannerCID = bannerCID.String 447 community.ModerationType = moderationType.String 448 community.ContentWarnings = contentWarnings 449 community.FederatedFrom = federatedFrom.String 450 community.FederatedID = federatedID.String 451 community.RecordURI = recordURI.String 452 community.RecordCID = recordCID.String 453 if descFacets != nil { 454 community.DescriptionFacets = descFacets 455 } 456 457 result = append(result, community) 458 } 459 460 if err = rows.Err(); err != nil { 461 return nil, 0, fmt.Errorf("error iterating communities: %w", err) 462 } 463 464 return result, totalCount, nil 465} 466 467// Search searches communities by name/description using fuzzy matching 468func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) { 469 // Build query with fuzzy search and visibility filter 470 whereClauses := []string{ 471 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')", 472 } 473 args := []interface{}{req.Query} 474 argCount := 2 475 476 if req.Visibility != "" { 477 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 478 args = append(args, req.Visibility) 479 argCount++ 480 } 481 482 whereClause := "WHERE " + strings.Join(whereClauses, " AND ") 483 484 // Get total count 485 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 486 var totalCount int 487 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 488 if err != nil { 489 return nil, 0, fmt.Errorf("failed to count search results: %w", err) 490 } 491 492 // Search with relevance ranking using pg_trgm similarity 493 // Filter out results with very low relevance (< 0.2) to avoid noise 494 query := fmt.Sprintf(` 495 SELECT id, did, handle, name, display_name, description, description_facets, 496 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 497 visibility, allow_external_discovery, moderation_type, content_warnings, 498 member_count, subscriber_count, post_count, 499 federated_from, federated_id, created_at, updated_at, 500 record_uri, record_cid, 501 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance 502 FROM communities 503 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2 504 ORDER BY relevance DESC, member_count DESC 505 LIMIT $%d OFFSET $%d`, 506 whereClause, argCount, argCount+1) 507 508 args = append(args, req.Limit, req.Offset) 509 510 rows, err := r.db.QueryContext(ctx, query, args...) 511 if err != nil { 512 return nil, 0, fmt.Errorf("failed to search communities: %w", err) 513 } 514 defer func() { 515 if closeErr := rows.Close(); closeErr != nil { 516 log.Printf("Failed to close rows: %v", closeErr) 517 } 518 }() 519 520 result := []*communities.Community{} 521 for rows.Next() { 522 community := &communities.Community{} 523 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 524 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 525 var descFacets []byte 526 var contentWarnings []string 527 var relevance float64 528 529 scanErr := rows.Scan( 530 &community.ID, &community.DID, &community.Handle, &community.Name, 531 &displayName, &description, &descFacets, 532 &avatarCID, &bannerCID, 533 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 534 &community.Visibility, &community.AllowExternalDiscovery, 535 &moderationType, pq.Array(&contentWarnings), 536 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 537 &federatedFrom, &federatedID, 538 &community.CreatedAt, &community.UpdatedAt, 539 &recordURI, &recordCID, 540 &relevance, 541 ) 542 if scanErr != nil { 543 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 544 } 545 546 // Map nullable fields 547 community.DisplayName = displayName.String 548 community.Description = description.String 549 community.AvatarCID = avatarCID.String 550 community.BannerCID = bannerCID.String 551 community.ModerationType = moderationType.String 552 community.ContentWarnings = contentWarnings 553 community.FederatedFrom = federatedFrom.String 554 community.FederatedID = federatedID.String 555 community.RecordURI = recordURI.String 556 community.RecordCID = recordCID.String 557 if descFacets != nil { 558 community.DescriptionFacets = descFacets 559 } 560 561 result = append(result, community) 562 } 563 564 if err = rows.Err(); err != nil { 565 return nil, 0, fmt.Errorf("error iterating search results: %w", err) 566 } 567 568 return result, totalCount, nil 569} 570 571// Helper functions 572func nullString(s string) sql.NullString { 573 return sql.NullString{String: s, Valid: s != ""} 574}