A community based topic aggregation platform built on atproto
1package postgres 2 3import ( 4 "context" 5 "database/sql" 6 "fmt" 7 "strings" 8 9 "Coves/internal/core/communities" 10 "github.com/lib/pq" 11) 12 13type postgresCommunityRepo struct { 14 db *sql.DB 15} 16 17// NewCommunityRepository creates a new PostgreSQL community repository 18func NewCommunityRepository(db *sql.DB) communities.Repository { 19 return &postgresCommunityRepo{db: db} 20} 21 22// Create inserts a new community into the communities table 23func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) { 24 query := ` 25 INSERT INTO communities ( 26 did, handle, name, display_name, description, description_facets, 27 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 28 pds_email, pds_password_hash, 29 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url, 30 visibility, allow_external_discovery, moderation_type, content_warnings, 31 member_count, subscriber_count, post_count, 32 federated_from, federated_id, created_at, updated_at, 33 record_uri, record_cid 34 ) VALUES ( 35 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, 36 $12, $13, 37 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 38 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END, 39 $16, 40 $17, $18, $19, $20, 41 $21, $22, $23, $24, $25, $26, $27, $28, $29 42 ) 43 RETURNING id, created_at, updated_at` 44 45 // Handle JSONB field - use sql.NullString with valid JSON or NULL 46 var descFacets interface{} 47 if community.DescriptionFacets != nil && len(community.DescriptionFacets) > 0 { 48 descFacets = community.DescriptionFacets 49 } else { 50 descFacets = nil 51 } 52 53 err := r.db.QueryRowContext(ctx, query, 54 community.DID, 55 community.Handle, 56 community.Name, 57 nullString(community.DisplayName), 58 nullString(community.Description), 59 descFacets, 60 nullString(community.AvatarCID), 61 nullString(community.BannerCID), 62 community.OwnerDID, 63 community.CreatedByDID, 64 community.HostedByDID, 65 // V2: PDS credentials for community account 66 nullString(community.PDSEmail), 67 nullString(community.PDSPasswordHash), 68 nullString(community.PDSAccessToken), 69 nullString(community.PDSRefreshToken), 70 nullString(community.PDSURL), 71 community.Visibility, 72 community.AllowExternalDiscovery, 73 nullString(community.ModerationType), 74 pq.Array(community.ContentWarnings), 75 community.MemberCount, 76 community.SubscriberCount, 77 community.PostCount, 78 nullString(community.FederatedFrom), 79 nullString(community.FederatedID), 80 community.CreatedAt, 81 community.UpdatedAt, 82 nullString(community.RecordURI), 83 nullString(community.RecordCID), 84 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt) 85 86 if err != nil { 87 // Check for unique constraint violations 88 if strings.Contains(err.Error(), "duplicate key") { 89 if strings.Contains(err.Error(), "communities_did_key") { 90 return nil, communities.ErrCommunityAlreadyExists 91 } 92 if strings.Contains(err.Error(), "communities_handle_key") { 93 return nil, communities.ErrHandleTaken 94 } 95 } 96 return nil, fmt.Errorf("failed to create community: %w", err) 97 } 98 99 return community, nil 100} 101 102// GetByDID retrieves a community by its DID 103// Note: PDS credentials are included (for internal service use only) 104// Handlers MUST use json:"-" tags to prevent credential exposure in APIs 105func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) { 106 community := &communities.Community{} 107 query := ` 108 SELECT id, did, handle, name, display_name, description, description_facets, 109 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 110 pds_email, pds_password_hash, 111 COALESCE(pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), '') as pds_access_token, 112 COALESCE(pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), '') as pds_refresh_token, 113 pds_url, 114 visibility, allow_external_discovery, moderation_type, content_warnings, 115 member_count, subscriber_count, post_count, 116 federated_from, federated_id, created_at, updated_at, 117 record_uri, record_cid 118 FROM communities 119 WHERE did = $1` 120 121 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 122 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 123 var pdsEmail, pdsPasswordHash, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString 124 var descFacets []byte 125 var contentWarnings []string 126 127 err := r.db.QueryRowContext(ctx, query, did).Scan( 128 &community.ID, &community.DID, &community.Handle, &community.Name, 129 &displayName, &description, &descFacets, 130 &avatarCID, &bannerCID, 131 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 132 // V2: PDS credentials 133 &pdsEmail, &pdsPasswordHash, &pdsAccessToken, &pdsRefreshToken, &pdsURL, 134 &community.Visibility, &community.AllowExternalDiscovery, 135 &moderationType, pq.Array(&contentWarnings), 136 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 137 &federatedFrom, &federatedID, 138 &community.CreatedAt, &community.UpdatedAt, 139 &recordURI, &recordCID, 140 ) 141 142 if err == sql.ErrNoRows { 143 return nil, communities.ErrCommunityNotFound 144 } 145 if err != nil { 146 return nil, fmt.Errorf("failed to get community by DID: %w", err) 147 } 148 149 // Map nullable fields 150 community.DisplayName = displayName.String 151 community.Description = description.String 152 community.AvatarCID = avatarCID.String 153 community.BannerCID = bannerCID.String 154 community.PDSEmail = pdsEmail.String 155 community.PDSPasswordHash = pdsPasswordHash.String 156 community.PDSAccessToken = pdsAccessToken.String 157 community.PDSRefreshToken = pdsRefreshToken.String 158 community.PDSURL = pdsURL.String 159 community.ModerationType = moderationType.String 160 community.ContentWarnings = contentWarnings 161 community.FederatedFrom = federatedFrom.String 162 community.FederatedID = federatedID.String 163 community.RecordURI = recordURI.String 164 community.RecordCID = recordCID.String 165 if descFacets != nil { 166 community.DescriptionFacets = descFacets 167 } 168 169 return community, nil 170} 171 172// GetByHandle retrieves a community by its scoped handle 173func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) { 174 community := &communities.Community{} 175 query := ` 176 SELECT id, did, handle, name, display_name, description, description_facets, 177 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 178 visibility, allow_external_discovery, moderation_type, content_warnings, 179 member_count, subscriber_count, post_count, 180 federated_from, federated_id, created_at, updated_at, 181 record_uri, record_cid 182 FROM communities 183 WHERE handle = $1` 184 185 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 186 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 187 var descFacets []byte 188 var contentWarnings []string 189 190 err := r.db.QueryRowContext(ctx, query, handle).Scan( 191 &community.ID, &community.DID, &community.Handle, &community.Name, 192 &displayName, &description, &descFacets, 193 &avatarCID, &bannerCID, 194 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 195 &community.Visibility, &community.AllowExternalDiscovery, 196 &moderationType, pq.Array(&contentWarnings), 197 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 198 &federatedFrom, &federatedID, 199 &community.CreatedAt, &community.UpdatedAt, 200 &recordURI, &recordCID, 201 ) 202 203 if err == sql.ErrNoRows { 204 return nil, communities.ErrCommunityNotFound 205 } 206 if err != nil { 207 return nil, fmt.Errorf("failed to get community by handle: %w", err) 208 } 209 210 // Map nullable fields 211 community.DisplayName = displayName.String 212 community.Description = description.String 213 community.AvatarCID = avatarCID.String 214 community.BannerCID = bannerCID.String 215 community.ModerationType = moderationType.String 216 community.ContentWarnings = contentWarnings 217 community.FederatedFrom = federatedFrom.String 218 community.FederatedID = federatedID.String 219 community.RecordURI = recordURI.String 220 community.RecordCID = recordCID.String 221 if descFacets != nil { 222 community.DescriptionFacets = descFacets 223 } 224 225 return community, nil 226} 227 228// Update modifies an existing community's metadata 229func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) { 230 query := ` 231 UPDATE communities 232 SET display_name = $2, description = $3, description_facets = $4, 233 avatar_cid = $5, banner_cid = $6, 234 visibility = $7, allow_external_discovery = $8, 235 moderation_type = $9, content_warnings = $10, 236 updated_at = NOW(), 237 record_uri = $11, record_cid = $12 238 WHERE did = $1 239 RETURNING updated_at` 240 241 // Handle JSONB field - use sql.NullString with valid JSON or NULL 242 var descFacets interface{} 243 if community.DescriptionFacets != nil && len(community.DescriptionFacets) > 0 { 244 descFacets = community.DescriptionFacets 245 } else { 246 descFacets = nil 247 } 248 249 err := r.db.QueryRowContext(ctx, query, 250 community.DID, 251 nullString(community.DisplayName), 252 nullString(community.Description), 253 descFacets, 254 nullString(community.AvatarCID), 255 nullString(community.BannerCID), 256 community.Visibility, 257 community.AllowExternalDiscovery, 258 nullString(community.ModerationType), 259 pq.Array(community.ContentWarnings), 260 nullString(community.RecordURI), 261 nullString(community.RecordCID), 262 ).Scan(&community.UpdatedAt) 263 264 if err == sql.ErrNoRows { 265 return nil, communities.ErrCommunityNotFound 266 } 267 if err != nil { 268 return nil, fmt.Errorf("failed to update community: %w", err) 269 } 270 271 return community, nil 272} 273 274// Delete removes a community from the database 275func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error { 276 query := `DELETE FROM communities WHERE did = $1` 277 278 result, err := r.db.ExecContext(ctx, query, did) 279 if err != nil { 280 return fmt.Errorf("failed to delete community: %w", err) 281 } 282 283 rowsAffected, err := result.RowsAffected() 284 if err != nil { 285 return fmt.Errorf("failed to check delete result: %w", err) 286 } 287 288 if rowsAffected == 0 { 289 return communities.ErrCommunityNotFound 290 } 291 292 return nil 293} 294 295// List retrieves communities with filtering and pagination 296func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) { 297 // Build query with filters 298 whereClauses := []string{} 299 args := []interface{}{} 300 argCount := 1 301 302 if req.Visibility != "" { 303 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 304 args = append(args, req.Visibility) 305 argCount++ 306 } 307 308 if req.HostedBy != "" { 309 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount)) 310 args = append(args, req.HostedBy) 311 argCount++ 312 } 313 314 whereClause := "" 315 if len(whereClauses) > 0 { 316 whereClause = "WHERE " + strings.Join(whereClauses, " AND ") 317 } 318 319 // Get total count 320 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 321 var totalCount int 322 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 323 if err != nil { 324 return nil, 0, fmt.Errorf("failed to count communities: %w", err) 325 } 326 327 // Build sort clause 328 sortColumn := "created_at" 329 if req.SortBy != "" { 330 switch req.SortBy { 331 case "member_count", "subscriber_count", "post_count", "created_at": 332 sortColumn = req.SortBy 333 } 334 } 335 336 sortOrder := "DESC" 337 if strings.ToUpper(req.SortOrder) == "ASC" { 338 sortOrder = "ASC" 339 } 340 341 // Get communities with pagination 342 query := fmt.Sprintf(` 343 SELECT id, did, handle, name, display_name, description, description_facets, 344 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 345 visibility, allow_external_discovery, moderation_type, content_warnings, 346 member_count, subscriber_count, post_count, 347 federated_from, federated_id, created_at, updated_at, 348 record_uri, record_cid 349 FROM communities 350 %s 351 ORDER BY %s %s 352 LIMIT $%d OFFSET $%d`, 353 whereClause, sortColumn, sortOrder, argCount, argCount+1) 354 355 args = append(args, req.Limit, req.Offset) 356 357 rows, err := r.db.QueryContext(ctx, query, args...) 358 if err != nil { 359 return nil, 0, fmt.Errorf("failed to list communities: %w", err) 360 } 361 defer rows.Close() 362 363 result := []*communities.Community{} 364 for rows.Next() { 365 community := &communities.Community{} 366 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 367 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 368 var descFacets []byte 369 var contentWarnings []string 370 371 err := rows.Scan( 372 &community.ID, &community.DID, &community.Handle, &community.Name, 373 &displayName, &description, &descFacets, 374 &avatarCID, &bannerCID, 375 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 376 &community.Visibility, &community.AllowExternalDiscovery, 377 &moderationType, pq.Array(&contentWarnings), 378 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 379 &federatedFrom, &federatedID, 380 &community.CreatedAt, &community.UpdatedAt, 381 &recordURI, &recordCID, 382 ) 383 if err != nil { 384 return nil, 0, fmt.Errorf("failed to scan community: %w", err) 385 } 386 387 // Map nullable fields 388 community.DisplayName = displayName.String 389 community.Description = description.String 390 community.AvatarCID = avatarCID.String 391 community.BannerCID = bannerCID.String 392 community.ModerationType = moderationType.String 393 community.ContentWarnings = contentWarnings 394 community.FederatedFrom = federatedFrom.String 395 community.FederatedID = federatedID.String 396 community.RecordURI = recordURI.String 397 community.RecordCID = recordCID.String 398 if descFacets != nil { 399 community.DescriptionFacets = descFacets 400 } 401 402 result = append(result, community) 403 } 404 405 if err = rows.Err(); err != nil { 406 return nil, 0, fmt.Errorf("error iterating communities: %w", err) 407 } 408 409 return result, totalCount, nil 410} 411 412// Search searches communities by name/description using fuzzy matching 413func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) { 414 // Build query with fuzzy search and visibility filter 415 whereClauses := []string{ 416 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')", 417 } 418 args := []interface{}{req.Query} 419 argCount := 2 420 421 if req.Visibility != "" { 422 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount)) 423 args = append(args, req.Visibility) 424 argCount++ 425 } 426 427 whereClause := "WHERE " + strings.Join(whereClauses, " AND ") 428 429 // Get total count 430 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause) 431 var totalCount int 432 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount) 433 if err != nil { 434 return nil, 0, fmt.Errorf("failed to count search results: %w", err) 435 } 436 437 // Search with relevance ranking using pg_trgm similarity 438 // Filter out results with very low relevance (< 0.2) to avoid noise 439 query := fmt.Sprintf(` 440 SELECT id, did, handle, name, display_name, description, description_facets, 441 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did, 442 visibility, allow_external_discovery, moderation_type, content_warnings, 443 member_count, subscriber_count, post_count, 444 federated_from, federated_id, created_at, updated_at, 445 record_uri, record_cid, 446 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance 447 FROM communities 448 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2 449 ORDER BY relevance DESC, member_count DESC 450 LIMIT $%d OFFSET $%d`, 451 whereClause, argCount, argCount+1) 452 453 args = append(args, req.Limit, req.Offset) 454 455 rows, err := r.db.QueryContext(ctx, query, args...) 456 if err != nil { 457 return nil, 0, fmt.Errorf("failed to search communities: %w", err) 458 } 459 defer rows.Close() 460 461 result := []*communities.Community{} 462 for rows.Next() { 463 community := &communities.Community{} 464 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString 465 var federatedFrom, federatedID, recordURI, recordCID sql.NullString 466 var descFacets []byte 467 var contentWarnings []string 468 var relevance float64 469 470 err := rows.Scan( 471 &community.ID, &community.DID, &community.Handle, &community.Name, 472 &displayName, &description, &descFacets, 473 &avatarCID, &bannerCID, 474 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID, 475 &community.Visibility, &community.AllowExternalDiscovery, 476 &moderationType, pq.Array(&contentWarnings), 477 &community.MemberCount, &community.SubscriberCount, &community.PostCount, 478 &federatedFrom, &federatedID, 479 &community.CreatedAt, &community.UpdatedAt, 480 &recordURI, &recordCID, 481 &relevance, 482 ) 483 if err != nil { 484 return nil, 0, fmt.Errorf("failed to scan community: %w", err) 485 } 486 487 // Map nullable fields 488 community.DisplayName = displayName.String 489 community.Description = description.String 490 community.AvatarCID = avatarCID.String 491 community.BannerCID = bannerCID.String 492 community.ModerationType = moderationType.String 493 community.ContentWarnings = contentWarnings 494 community.FederatedFrom = federatedFrom.String 495 community.FederatedID = federatedID.String 496 community.RecordURI = recordURI.String 497 community.RecordCID = recordCID.String 498 if descFacets != nil { 499 community.DescriptionFacets = descFacets 500 } 501 502 result = append(result, community) 503 } 504 505 if err = rows.Err(); err != nil { 506 return nil, 0, fmt.Errorf("error iterating search results: %w", err) 507 } 508 509 return result, totalCount, nil 510} 511 512// Helper functions 513func nullString(s string) sql.NullString { 514 return sql.NullString{String: s, Valid: s != ""} 515} 516 517func nullBytes(b []byte) []byte { 518 if b == nil || len(b) == 0 { 519 return nil 520 } 521 return b 522}