A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "database/sql" 7 "fmt" 8 "log" 9 "strings" 10 11 "github.com/lib/pq" 12) 13 14type postgresCommunityRepo struct { 15 db *sql.DB 16} 17 18// NewCommunityRepository creates a new PostgreSQL community repository 19func NewCommunityRepository(db *sql.DB) communities.Repository { 20 return &postgresCommunityRepo{db: db} 21} 22 23// Create inserts a new community into the communities table 24func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) { 25 query := ` 26 INSERT INTO communities ( 27 did, handle, name, display_name, description, description_facets, 28 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 29 pds_email, pds_password_encrypted, 30 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url, 31 visibility, allow_external_discovery, moderation_type, content_warnings, 32 member_count, subscriber_count, post_count, 33 federated_from, federated_id, created_at, updated_at, 34 record_uri, record_cid 35 ) VALUES ( 36 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, 37 $12, 38 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 39 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 40 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 41 $16, 42 $17, $18, $19, $20, 43 $21, $22, $23, $24, $25, $26, $27, $28, $29 44 ) 45 RETURNING id, created_at, updated_at` 46 47 // Handle JSONB field - use sql.NullString with valid JSON or NULL 48 var descFacets interface{} 49 if len(community.DescriptionFacets) > 0 { 50 descFacets = community.DescriptionFacets 51 } else { 52 descFacets = nil 53 } 54 55 err := r.db.QueryRowContext(ctx, query, 56 community.DID, 57 community.Handle, 58 community.Name, 59 nullString(community.DisplayName), 60 nullString(community.Description), 61 descFacets, 62 nullString(community.AvatarCID), 63 nullString(community.BannerCID), 64 community.OwnerDID, 65 community.CreatedByDID, 66 community.HostedByDID, 67 // V2.0: PDS credentials for community account (encrypted at rest) 68 nullString(community.PDSEmail), 69 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt 70 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt 71 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt 72 nullString(community.PDSURL), 73 // V2.0: No key columns - PDS manages all keys 74 community.Visibility, 75 community.AllowExternalDiscovery, 76 nullString(community.ModerationType), 77 pq.Array(community.ContentWarnings), 78 community.MemberCount, 79 community.SubscriberCount, 80 community.PostCount, 81 nullString(community.FederatedFrom), 82 nullString(community.FederatedID), 83 community.CreatedAt, 84 community.UpdatedAt, 85 nullString(community.RecordURI), 86 nullString(community.RecordCID), 87 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt) 88 if err != nil { 89 // Check for unique constraint violations 90 if strings.Contains(err.Error(), "duplicate key") { 91 if strings.Contains(err.Error(), "communities_did_key") { 92 return nil, communities.ErrCommunityAlreadyExists 93 } 94 if strings.Contains(err.Error(), "communities_handle_key") { 95 return nil, communities.ErrHandleTaken 96 } 97 } 98 return nil, fmt.Errorf("failed to create community: %w", err) 99 } 100 101 return community, nil 102} 103 104// GetByDID retrieves a community by its DID 105// Note: PDS credentials are included (for internal service use only) 106// Handlers MUST use json:"-" tags to prevent credential exposure in APIs 107// 108// V2.0: Key columns not included - PDS manages all keys 109func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) { 110 community := &communities.Community{} 111 query := ` 112 SELECT id, did, handle, name, display_name, description, description_facets, 113 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 114 pds_email, 115 CASE 116 WHEN pds_password_encrypted IS NOT NULL 117 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 118 ELSE NULL 119 END as pds_password, 120 CASE 121 WHEN pds_access_token_encrypted IS NOT NULL 122 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 123 ELSE NULL 124 END as pds_access_token, 125 CASE 126 WHEN pds_refresh_token_encrypted IS NOT NULL 127 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 128 ELSE NULL 129 END as pds_refresh_token, 130 pds_url, 131 visibility, allow_external_discovery, moderation_type, content_warnings, 132 member_count, subscriber_count, post_count, 133 federated_from, federated_id, created_at, updated_at, 134 record_uri, record_cid 135 FROM communities 136 WHERE did = $1` 137 138 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 139 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 140 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString 141 var descFacets []byte 142 var contentWarnings []string 143 144 err := r.db.QueryRowContext(ctx, query, did).Scan( 145 &community.ID, &community.DID, &community.Handle, &community.Name, 146 &displayName, &description, &descFacets, 147 &avatarCID, &bannerCID, 148 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 149 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt) 150 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL, 151 &community.Visibility, &community.AllowExternalDiscovery, 152 &moderationType, pq.Array(&contentWarnings), 153 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 154 &federatedFrom, &federatedID, 155 &community.CreatedAt, &community.UpdatedAt, 156 &recordURI, &recordCID, 157 ) 158 159 if err == sql.ErrNoRows { 160 return nil, communities.ErrCommunityNotFound 161 } 162 if err != nil { 163 return nil, fmt.Errorf("failed to get community by DID: %w", err) 164 } 165 166 // Map nullable fields 167 community.DisplayName = displayName.String 168 community.Description = description.String 169 community.AvatarCID = avatarCID.String 170 community.BannerCID = bannerCID.String 171 community.PDSEmail = pdsEmail.String 172 community.PDSPassword = pdsPassword.String 173 community.PDSAccessToken = pdsAccessToken.String 174 community.PDSRefreshToken = pdsRefreshToken.String 175 community.PDSURL = pdsURL.String 176 // V2.0: No key fields - PDS manages all keys 177 community.RotationKeyPEM = "" // Empty - PDS-managed 178 community.SigningKeyPEM = "" // Empty - PDS-managed 179 community.ModerationType = moderationType.String 180 community.ContentWarnings = contentWarnings 181 community.FederatedFrom = federatedFrom.String 182 community.FederatedID = federatedID.String 183 community.RecordURI = recordURI.String 184 community.RecordCID = recordCID.String 185 if descFacets != nil { 186 community.DescriptionFacets = descFacets 187 } 188 189 return community, nil 190} 191 192// GetByHandle retrieves a community by its scoped handle 193func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) { 194 community := &communities.Community{} 195 query := ` 196 SELECT id, did, handle, name, display_name, description, description_facets, 197 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 198 visibility, allow_external_discovery, moderation_type, content_warnings, 199 member_count, subscriber_count, post_count, 200 federated_from, federated_id, created_at, updated_at, 201 record_uri, record_cid 202 FROM communities 203 WHERE handle = $1` 204 205 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 206 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 207 var descFacets []byte 208 var contentWarnings []string 209 210 err := r.db.QueryRowContext(ctx, query, handle).Scan( 211 &community.ID, &community.DID, &community.Handle, &community.Name, 212 &displayName, &description, &descFacets, 213 &avatarCID, &bannerCID, 214 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 215 &community.Visibility, &community.AllowExternalDiscovery, 216 &moderationType, pq.Array(&contentWarnings), 217 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 218 &federatedFrom, &federatedID, 219 &community.CreatedAt, &community.UpdatedAt, 220 &recordURI, &recordCID, 221 ) 222 223 if err == sql.ErrNoRows { 224 return nil, communities.ErrCommunityNotFound 225 } 226 if err != nil { 227 return nil, fmt.Errorf("failed to get community by handle: %w", err) 228 } 229 230 // Map nullable fields 231 community.DisplayName = displayName.String 232 community.Description = description.String 233 community.AvatarCID = avatarCID.String 234 community.BannerCID = bannerCID.String 235 community.ModerationType = moderationType.String 236 community.ContentWarnings = contentWarnings 237 community.FederatedFrom = federatedFrom.String 238 community.FederatedID = federatedID.String 239 community.RecordURI = recordURI.String 240 community.RecordCID = recordCID.String 241 if descFacets != nil { 242 community.DescriptionFacets = descFacets 243 } 244 245 return community, nil 246} 247 248// Update modifies an existing community's metadata 249func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) { 250 query := ` 251 UPDATE communities 252 SET display_name = $2, description = $3, description_facets = $4, 253 avatar_cid = $5, banner_cid = $6, 254 visibility = $7, allow_external_discovery = $8, 255 moderation_type = $9, content_warnings = $10, 256 updated_at = NOW(), 257 record_uri = $11, record_cid = $12 258 WHERE did = $1 259 RETURNING updated_at` 260 261 // Handle JSONB field - use sql.NullString with valid JSON or NULL 262 var descFacets interface{} 263 if len(community.DescriptionFacets) > 0 { 264 descFacets = community.DescriptionFacets 265 } else { 266 descFacets = nil 267 } 268 269 err := r.db.QueryRowContext(ctx, query, 270 community.DID, 271 nullString(community.DisplayName), 272 nullString(community.Description), 273 descFacets, 274 nullString(community.AvatarCID), 275 nullString(community.BannerCID), 276 community.Visibility, 277 community.AllowExternalDiscovery, 278 nullString(community.ModerationType), 279 pq.Array(community.ContentWarnings), 280 nullString(community.RecordURI), 281 nullString(community.RecordCID), 282 ).Scan(&community.UpdatedAt) 283 284 if err == sql.ErrNoRows { 285 return nil, communities.ErrCommunityNotFound 286 } 287 if err != nil { 288 return nil, fmt.Errorf("failed to update community: %w", err) 289 } 290 291 return community, nil 292} 293 294// UpdateCredentials atomically updates community's PDS access and refresh tokens 295// CRITICAL: Both tokens must be updated together because refresh tokens are single-use 296// After a successful token refresh, the old refresh token is immediately revoked by the PDS 297func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error { 298 query := ` 299 UPDATE communities 300 SET 301 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 302 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), 303 updated_at = NOW() 304 WHERE did = $1 305 RETURNING did` 306 307 var returnedDID string 308 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID) 309 310 if err == sql.ErrNoRows { 311 return communities.ErrCommunityNotFound 312 } 313 if err != nil { 314 return fmt.Errorf("failed to update credentials: %w", err) 315 } 316 317 return nil 318} 319 320// Delete removes a community from the database 321func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error { 322 query := `DELETE FROM communities WHERE did = $1` 323 324 result, err := r.db.ExecContext(ctx, query, did) 325 if err != nil { 326 return fmt.Errorf("failed to delete community: %w", err) 327 } 328 329 rowsAffected, err := result.RowsAffected() 330 if err != nil { 331 return fmt.Errorf("failed to check delete result: %w", err) 332 } 333 334 if rowsAffected == 0 { 335 return communities.ErrCommunityNotFound 336 } 337 338 return nil 339} 340 341// List retrieves communities with filtering and pagination 342func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) { 343 // Build query with filters 344 whereClauses := []string{} 345 args := []interface{}{} 346 argCount := 1 347 348 if req.Visibility != "" { 349 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 350 args = append(args, req.Visibility) 351 argCount++ 352 } 353 354 if req.HostedBy != "" { 355 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount)) 356 args = append(args, req.HostedBy) 357 argCount++ 358 } 359 360 whereClause := "" 361 if len(whereClauses) > 0 { 362 whereClause = "WHERE " + strings.Join(whereClauses, " AND ") 363 } 364 365 // Get total count 366 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 367 var totalCount int 368 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 369 if err != nil { 370 return nil, 0, fmt.Errorf("failed to count communities: %w", err) 371 } 372 373 // Build sort clause 374 sortColumn := "created_at" 375 if req.SortBy != "" { 376 switch req.SortBy { 377 case "member_count", "subscriber_count", "post_count", "created_at": 378 sortColumn = req.SortBy 379 } 380 } 381 382 sortOrder := "DESC" 383 if strings.ToUpper(req.SortOrder) == "ASC" { 384 sortOrder = "ASC" 385 } 386 387 // Get communities with pagination 388 query := fmt.Sprintf(` 389 SELECT id, did, handle, name, display_name, description, description_facets, 390 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 391 visibility, allow_external_discovery, moderation_type, content_warnings, 392 member_count, subscriber_count, post_count, 393 federated_from, federated_id, created_at, updated_at, 394 record_uri, record_cid 395 FROM communities 396 %s 397 ORDER BY %s %s 398 LIMIT $%d OFFSET $%d`, 399 whereClause, sortColumn, sortOrder, argCount, argCount+1) 400 401 args = append(args, req.Limit, req.Offset) 402 403 rows, err := r.db.QueryContext(ctx, query, args...) 404 if err != nil { 405 return nil, 0, fmt.Errorf("failed to list communities: %w", err) 406 } 407 defer func() { 408 if closeErr := rows.Close(); closeErr != nil { 409 log.Printf("Failed to close rows: %v", closeErr) 410 } 411 }() 412 413 result := []*communities.Community{} 414 for rows.Next() { 415 community := &communities.Community{} 416 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 417 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 418 var descFacets []byte 419 var contentWarnings []string 420 421 scanErr := rows.Scan( 422 &community.ID, &community.DID, &community.Handle, &community.Name, 423 &displayName, &description, &descFacets, 424 &avatarCID, &bannerCID, 425 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 426 &community.Visibility, &community.AllowExternalDiscovery, 427 &moderationType, pq.Array(&contentWarnings), 428 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 429 &federatedFrom, &federatedID, 430 &community.CreatedAt, &community.UpdatedAt, 431 &recordURI, &recordCID, 432 ) 433 if scanErr != nil { 434 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 435 } 436 437 // Map nullable fields 438 community.DisplayName = displayName.String 439 community.Description = description.String 440 community.AvatarCID = avatarCID.String 441 community.BannerCID = bannerCID.String 442 community.ModerationType = moderationType.String 443 community.ContentWarnings = contentWarnings 444 community.FederatedFrom = federatedFrom.String 445 community.FederatedID = federatedID.String 446 community.RecordURI = recordURI.String 447 community.RecordCID = recordCID.String 448 if descFacets != nil { 449 community.DescriptionFacets = descFacets 450 } 451 452 result = append(result, community) 453 } 454 455 if err = rows.Err(); err != nil { 456 return nil, 0, fmt.Errorf("error iterating communities: %w", err) 457 } 458 459 return result, totalCount, nil 460} 461 462// Search searches communities by name/description using fuzzy matching 463func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) { 464 // Build query with fuzzy search and visibility filter 465 whereClauses := []string{ 466 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')", 467 } 468 args := []interface{}{req.Query} 469 argCount := 2 470 471 if req.Visibility != "" { 472 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 473 args = append(args, req.Visibility) 474 argCount++ 475 } 476 477 whereClause := "WHERE " + strings.Join(whereClauses, " AND ") 478 479 // Get total count 480 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 481 var totalCount int 482 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 483 if err != nil { 484 return nil, 0, fmt.Errorf("failed to count search results: %w", err) 485 } 486 487 // Search with relevance ranking using pg_trgm similarity 488 // Filter out results with very low relevance (< 0.2) to avoid noise 489 query := fmt.Sprintf(` 490 SELECT id, did, handle, name, display_name, description, description_facets, 491 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 492 visibility, allow_external_discovery, moderation_type, content_warnings, 493 member_count, subscriber_count, post_count, 494 federated_from, federated_id, created_at, updated_at, 495 record_uri, record_cid, 496 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance 497 FROM communities 498 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2 499 ORDER BY relevance DESC, member_count DESC 500 LIMIT $%d OFFSET $%d`, 501 whereClause, argCount, argCount+1) 502 503 args = append(args, req.Limit, req.Offset) 504 505 rows, err := r.db.QueryContext(ctx, query, args...) 506 if err != nil { 507 return nil, 0, fmt.Errorf("failed to search communities: %w", err) 508 } 509 defer func() { 510 if closeErr := rows.Close(); closeErr != nil { 511 log.Printf("Failed to close rows: %v", closeErr) 512 } 513 }() 514 515 result := []*communities.Community{} 516 for rows.Next() { 517 community := &communities.Community{} 518 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 519 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 520 var descFacets []byte 521 var contentWarnings []string 522 var relevance float64 523 524 scanErr := rows.Scan( 525 &community.ID, &community.DID, &community.Handle, &community.Name, 526 &displayName, &description, &descFacets, 527 &avatarCID, &bannerCID, 528 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 529 &community.Visibility, &community.AllowExternalDiscovery, 530 &moderationType, pq.Array(&contentWarnings), 531 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 532 &federatedFrom, &federatedID, 533 &community.CreatedAt, &community.UpdatedAt, 534 &recordURI, &recordCID, 535 &relevance, 536 ) 537 if scanErr != nil { 538 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 539 } 540 541 // Map nullable fields 542 community.DisplayName = displayName.String 543 community.Description = description.String 544 community.AvatarCID = avatarCID.String 545 community.BannerCID = bannerCID.String 546 community.ModerationType = moderationType.String 547 community.ContentWarnings = contentWarnings 548 community.FederatedFrom = federatedFrom.String 549 community.FederatedID = federatedID.String 550 community.RecordURI = recordURI.String 551 community.RecordCID = recordCID.String 552 if descFacets != nil { 553 community.DescriptionFacets = descFacets 554 } 555 556 result = append(result, community) 557 } 558 559 if err = rows.Err(); err != nil { 560 return nil, 0, fmt.Errorf("error iterating search results: %w", err) 561 } 562 563 return result, totalCount, nil 564} 565 566// Helper functions 567func nullString(s string) sql.NullString { 568 return sql.NullString{String: s, Valid: s != ""} 569}