A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "Coves/internal/core/communities" 5 "context" 6 "database/sql" 7 "fmt" 8 "log" 9 "strings" 10 11 "github.com/lib/pq" 12) 13 14type postgresCommunityRepo struct { 15 db *sql.DB 16} 17 18// NewCommunityRepository creates a new PostgreSQL community repository 19func NewCommunityRepository(db *sql.DB) communities.Repository { 20 return &postgresCommunityRepo{db: db} 21} 22 23// Create inserts a new community into the communities table 24func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) { 25 query := ` 26 INSERT INTO communities ( 27 did, handle, name, display_name, description, description_facets, 28 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 29 pds_email, pds_password_encrypted, 30 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url, 31 visibility, allow_external_discovery, moderation_type, content_warnings, 32 member_count, subscriber_count, post_count, 33 federated_from, federated_id, created_at, updated_at, 34 record_uri, record_cid 35 ) VALUES ( 36 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, 37 $12, 38 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 39 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 40 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 41 $16, 42 $17, $18, $19, $20, 43 $21, $22, $23, $24, $25, $26, $27, $28, $29 44 ) 45 RETURNING id, created_at, updated_at` 46 47 // Handle JSONB field - use sql.NullString with valid JSON or NULL 48 var descFacets interface{} 49 if len(community.DescriptionFacets) > 0 { 50 descFacets = community.DescriptionFacets 51 } else { 52 descFacets = nil 53 } 54 55 err := r.db.QueryRowContext(ctx, query, 56 community.DID, 57 community.Handle, 58 community.Name, 59 nullString(community.DisplayName), 60 nullString(community.Description), 61 descFacets, 62 nullString(community.AvatarCID), 63 nullString(community.BannerCID), 64 community.OwnerDID, 65 community.CreatedByDID, 66 community.HostedByDID, 67 // V2.0: PDS credentials for community account (encrypted at rest) 68 nullString(community.PDSEmail), 69 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt 70 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt 71 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt 72 nullString(community.PDSURL), 73 // V2.0: No key columns - PDS manages all keys 74 community.Visibility, 75 community.AllowExternalDiscovery, 76 nullString(community.ModerationType), 77 pq.Array(community.ContentWarnings), 78 community.MemberCount, 79 community.SubscriberCount, 80 community.PostCount, 81 nullString(community.FederatedFrom), 82 nullString(community.FederatedID), 83 community.CreatedAt, 84 community.UpdatedAt, 85 nullString(community.RecordURI), 86 nullString(community.RecordCID), 87 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt) 88 if err != nil { 89 // Check for unique constraint violations 90 if strings.Contains(err.Error(), "duplicate key") { 91 if strings.Contains(err.Error(), "communities_did_key") { 92 return nil, communities.ErrCommunityAlreadyExists 93 } 94 if strings.Contains(err.Error(), "communities_handle_key") { 95 return nil, communities.ErrHandleTaken 96 } 97 } 98 return nil, fmt.Errorf("failed to create community: %w", err) 99 } 100 101 return community, nil 102} 103 104// GetByDID retrieves a community by its DID 105// Note: PDS credentials are included (for internal service use only) 106// Handlers MUST use json:"-" tags to prevent credential exposure in APIs 107// 108// V2.0: Key columns not included - PDS manages all keys 109func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) { 110 community := &communities.Community{} 111 query := ` 112 SELECT id, did, handle, name, display_name, description, description_facets, 113 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 114 pds_email, 115 CASE 116 WHEN pds_password_encrypted IS NOT NULL 117 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 118 ELSE NULL 119 END as pds_password, 120 CASE 121 WHEN pds_access_token_encrypted IS NOT NULL 122 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 123 ELSE NULL 124 END as pds_access_token, 125 CASE 126 WHEN pds_refresh_token_encrypted IS NOT NULL 127 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) 128 ELSE NULL 129 END as pds_refresh_token, 130 pds_url, 131 visibility, allow_external_discovery, moderation_type, content_warnings, 132 member_count, subscriber_count, post_count, 133 federated_from, federated_id, created_at, updated_at, 134 record_uri, record_cid 135 FROM communities 136 WHERE did = $1` 137 138 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 139 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 140 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString 141 var descFacets []byte 142 var contentWarnings []string 143 144 err := r.db.QueryRowContext(ctx, query, did).Scan( 145 &community.ID, &community.DID, &community.Handle, &community.Name, 146 &displayName, &description, &descFacets, 147 &avatarCID, &bannerCID, 148 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 149 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt) 150 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL, 151 &community.Visibility, &community.AllowExternalDiscovery, 152 &moderationType, pq.Array(&contentWarnings), 153 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 154 &federatedFrom, &federatedID, 155 &community.CreatedAt, &community.UpdatedAt, 156 &recordURI, &recordCID, 157 ) 158 159 if err == sql.ErrNoRows { 160 return nil, communities.ErrCommunityNotFound 161 } 162 if err != nil { 163 return nil, fmt.Errorf("failed to get community by DID: %w", err) 164 } 165 166 // Map nullable fields 167 community.DisplayName = displayName.String 168 community.Description = description.String 169 community.AvatarCID = avatarCID.String 170 community.BannerCID = bannerCID.String 171 community.PDSEmail = pdsEmail.String 172 community.PDSPassword = pdsPassword.String 173 community.PDSAccessToken = pdsAccessToken.String 174 community.PDSRefreshToken = pdsRefreshToken.String 175 community.PDSURL = pdsURL.String 176 // V2.0: No key fields - PDS manages all keys 177 community.RotationKeyPEM = "" // Empty - PDS-managed 178 community.SigningKeyPEM = "" // Empty - PDS-managed 179 community.ModerationType = moderationType.String 180 community.ContentWarnings = contentWarnings 181 community.FederatedFrom = federatedFrom.String 182 community.FederatedID = federatedID.String 183 community.RecordURI = recordURI.String 184 community.RecordCID = recordCID.String 185 if descFacets != nil { 186 community.DescriptionFacets = descFacets 187 } 188 189 return community, nil 190} 191 192// GetByHandle retrieves a community by its scoped handle 193func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) { 194 community := &communities.Community{} 195 query := ` 196 SELECT id, did, handle, name, display_name, description, description_facets, 197 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 198 visibility, allow_external_discovery, moderation_type, content_warnings, 199 member_count, subscriber_count, post_count, 200 federated_from, federated_id, created_at, updated_at, 201 record_uri, record_cid 202 FROM communities 203 WHERE handle = $1` 204 205 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 206 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 207 var descFacets []byte 208 var contentWarnings []string 209 210 err := r.db.QueryRowContext(ctx, query, handle).Scan( 211 &community.ID, &community.DID, &community.Handle, &community.Name, 212 &displayName, &description, &descFacets, 213 &avatarCID, &bannerCID, 214 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 215 &community.Visibility, &community.AllowExternalDiscovery, 216 &moderationType, pq.Array(&contentWarnings), 217 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 218 &federatedFrom, &federatedID, 219 &community.CreatedAt, &community.UpdatedAt, 220 &recordURI, &recordCID, 221 ) 222 223 if err == sql.ErrNoRows { 224 return nil, communities.ErrCommunityNotFound 225 } 226 if err != nil { 227 return nil, fmt.Errorf("failed to get community by handle: %w", err) 228 } 229 230 // Map nullable fields 231 community.DisplayName = displayName.String 232 community.Description = description.String 233 community.AvatarCID = avatarCID.String 234 community.BannerCID = bannerCID.String 235 community.ModerationType = moderationType.String 236 community.ContentWarnings = contentWarnings 237 community.FederatedFrom = federatedFrom.String 238 community.FederatedID = federatedID.String 239 community.RecordURI = recordURI.String 240 community.RecordCID = recordCID.String 241 if descFacets != nil { 242 community.DescriptionFacets = descFacets 243 } 244 245 return community, nil 246} 247 248// Update modifies an existing community's metadata 249func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) { 250 query := ` 251 UPDATE communities 252 SET display_name = $2, description = $3, description_facets = $4, 253 avatar_cid = $5, banner_cid = $6, 254 visibility = $7, allow_external_discovery = $8, 255 moderation_type = $9, content_warnings = $10, 256 updated_at = NOW(), 257 record_uri = $11, record_cid = $12 258 WHERE did = $1 259 RETURNING updated_at` 260 261 // Handle JSONB field - use sql.NullString with valid JSON or NULL 262 var descFacets interface{} 263 if len(community.DescriptionFacets) > 0 { 264 descFacets = community.DescriptionFacets 265 } else { 266 descFacets = nil 267 } 268 269 err := r.db.QueryRowContext(ctx, query, 270 community.DID, 271 nullString(community.DisplayName), 272 nullString(community.Description), 273 descFacets, 274 nullString(community.AvatarCID), 275 nullString(community.BannerCID), 276 community.Visibility, 277 community.AllowExternalDiscovery, 278 nullString(community.ModerationType), 279 pq.Array(community.ContentWarnings), 280 nullString(community.RecordURI), 281 nullString(community.RecordCID), 282 ).Scan(&community.UpdatedAt) 283 284 if err == sql.ErrNoRows { 285 return nil, communities.ErrCommunityNotFound 286 } 287 if err != nil { 288 return nil, fmt.Errorf("failed to update community: %w", err) 289 } 290 291 return community, nil 292} 293 294// Delete removes a community from the database 295func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error { 296 query := `DELETE FROM communities WHERE did = $1` 297 298 result, err := r.db.ExecContext(ctx, query, did) 299 if err != nil { 300 return fmt.Errorf("failed to delete community: %w", err) 301 } 302 303 rowsAffected, err := result.RowsAffected() 304 if err != nil { 305 return fmt.Errorf("failed to check delete result: %w", err) 306 } 307 308 if rowsAffected == 0 { 309 return communities.ErrCommunityNotFound 310 } 311 312 return nil 313} 314 315// List retrieves communities with filtering and pagination 316func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) { 317 // Build query with filters 318 whereClauses := []string{} 319 args := []interface{}{} 320 argCount := 1 321 322 if req.Visibility != "" { 323 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 324 args = append(args, req.Visibility) 325 argCount++ 326 } 327 328 if req.HostedBy != "" { 329 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount)) 330 args = append(args, req.HostedBy) 331 argCount++ 332 } 333 334 whereClause := "" 335 if len(whereClauses) > 0 { 336 whereClause = "WHERE " + strings.Join(whereClauses, " AND ") 337 } 338 339 // Get total count 340 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 341 var totalCount int 342 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 343 if err != nil { 344 return nil, 0, fmt.Errorf("failed to count communities: %w", err) 345 } 346 347 // Build sort clause 348 sortColumn := "created_at" 349 if req.SortBy != "" { 350 switch req.SortBy { 351 case "member_count", "subscriber_count", "post_count", "created_at": 352 sortColumn = req.SortBy 353 } 354 } 355 356 sortOrder := "DESC" 357 if strings.ToUpper(req.SortOrder) == "ASC" { 358 sortOrder = "ASC" 359 } 360 361 // Get communities with pagination 362 query := fmt.Sprintf(` 363 SELECT id, did, handle, name, display_name, description, description_facets, 364 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 365 visibility, allow_external_discovery, moderation_type, content_warnings, 366 member_count, subscriber_count, post_count, 367 federated_from, federated_id, created_at, updated_at, 368 record_uri, record_cid 369 FROM communities 370 %s 371 ORDER BY %s %s 372 LIMIT $%d OFFSET $%d`, 373 whereClause, sortColumn, sortOrder, argCount, argCount+1) 374 375 args = append(args, req.Limit, req.Offset) 376 377 rows, err := r.db.QueryContext(ctx, query, args...) 378 if err != nil { 379 return nil, 0, fmt.Errorf("failed to list communities: %w", err) 380 } 381 defer func() { 382 if closeErr := rows.Close(); closeErr != nil { 383 log.Printf("Failed to close rows: %v", closeErr) 384 } 385 }() 386 387 result := []*communities.Community{} 388 for rows.Next() { 389 community := &communities.Community{} 390 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 391 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 392 var descFacets []byte 393 var contentWarnings []string 394 395 scanErr := rows.Scan( 396 &community.ID, &community.DID, &community.Handle, &community.Name, 397 &displayName, &description, &descFacets, 398 &avatarCID, &bannerCID, 399 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 400 &community.Visibility, &community.AllowExternalDiscovery, 401 &moderationType, pq.Array(&contentWarnings), 402 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 403 &federatedFrom, &federatedID, 404 &community.CreatedAt, &community.UpdatedAt, 405 &recordURI, &recordCID, 406 ) 407 if scanErr != nil { 408 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 409 } 410 411 // Map nullable fields 412 community.DisplayName = displayName.String 413 community.Description = description.String 414 community.AvatarCID = avatarCID.String 415 community.BannerCID = bannerCID.String 416 community.ModerationType = moderationType.String 417 community.ContentWarnings = contentWarnings 418 community.FederatedFrom = federatedFrom.String 419 community.FederatedID = federatedID.String 420 community.RecordURI = recordURI.String 421 community.RecordCID = recordCID.String 422 if descFacets != nil { 423 community.DescriptionFacets = descFacets 424 } 425 426 result = append(result, community) 427 } 428 429 if err = rows.Err(); err != nil { 430 return nil, 0, fmt.Errorf("error iterating communities: %w", err) 431 } 432 433 return result, totalCount, nil 434} 435 436// Search searches communities by name/description using fuzzy matching 437func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) { 438 // Build query with fuzzy search and visibility filter 439 whereClauses := []string{ 440 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')", 441 } 442 args := []interface{}{req.Query} 443 argCount := 2 444 445 if req.Visibility != "" { 446 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 447 args = append(args, req.Visibility) 448 argCount++ 449 } 450 451 whereClause := "WHERE " + strings.Join(whereClauses, " AND ") 452 453 // Get total count 454 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 455 var totalCount int 456 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 457 if err != nil { 458 return nil, 0, fmt.Errorf("failed to count search results: %w", err) 459 } 460 461 // Search with relevance ranking using pg_trgm similarity 462 // Filter out results with very low relevance (< 0.2) to avoid noise 463 query := fmt.Sprintf(` 464 SELECT id, did, handle, name, display_name, description, description_facets, 465 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 466 visibility, allow_external_discovery, moderation_type, content_warnings, 467 member_count, subscriber_count, post_count, 468 federated_from, federated_id, created_at, updated_at, 469 record_uri, record_cid, 470 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance 471 FROM communities 472 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2 473 ORDER BY relevance DESC, member_count DESC 474 LIMIT $%d OFFSET $%d`, 475 whereClause, argCount, argCount+1) 476 477 args = append(args, req.Limit, req.Offset) 478 479 rows, err := r.db.QueryContext(ctx, query, args...) 480 if err != nil { 481 return nil, 0, fmt.Errorf("failed to search communities: %w", err) 482 } 483 defer func() { 484 if closeErr := rows.Close(); closeErr != nil { 485 log.Printf("Failed to close rows: %v", closeErr) 486 } 487 }() 488 489 result := []*communities.Community{} 490 for rows.Next() { 491 community := &communities.Community{} 492 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 493 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 494 var descFacets []byte 495 var contentWarnings []string 496 var relevance float64 497 498 scanErr := rows.Scan( 499 &community.ID, &community.DID, &community.Handle, &community.Name, 500 &displayName, &description, &descFacets, 501 &avatarCID, &bannerCID, 502 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 503 &community.Visibility, &community.AllowExternalDiscovery, 504 &moderationType, pq.Array(&contentWarnings), 505 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 506 &federatedFrom, &federatedID, 507 &community.CreatedAt, &community.UpdatedAt, 508 &recordURI, &recordCID, 509 &relevance, 510 ) 511 if scanErr != nil { 512 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr) 513 } 514 515 // Map nullable fields 516 community.DisplayName = displayName.String 517 community.Description = description.String 518 community.AvatarCID = avatarCID.String 519 community.BannerCID = bannerCID.String 520 community.ModerationType = moderationType.String 521 community.ContentWarnings = contentWarnings 522 community.FederatedFrom = federatedFrom.String 523 community.FederatedID = federatedID.String 524 community.RecordURI = recordURI.String 525 community.RecordCID = recordCID.String 526 if descFacets != nil { 527 community.DescriptionFacets = descFacets 528 } 529 530 result = append(result, community) 531 } 532 533 if err = rows.Err(); err != nil { 534 return nil, 0, fmt.Errorf("error iterating search results: %w", err) 535 } 536 537 return result, totalCount, nil 538} 539 540// Helper functions 541func nullString(s string) sql.NullString { 542 return sql.NullString{String: s, Valid: s != ""} 543}