A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "strings"
9
10 "Coves/internal/core/communities"
11
12 "github.com/lib/pq"
13)
14
15type postgresCommunityRepo struct {
16 db *sql.DB
17}
18
19// NewCommunityRepository creates a new PostgreSQL community repository
20func NewCommunityRepository(db *sql.DB) communities.Repository {
21 return &postgresCommunityRepo{db: db}
22}
23
24// Create inserts a new community into the communities table
25func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
26 // Validate that handle is always provided (constructed by consumer)
27 if community.Handle == "" {
28 return nil, fmt.Errorf("handle is required (should be constructed by consumer before insert)")
29 }
30
31 query := `
32 INSERT INTO communities (
33 did, handle, name, display_name, description, description_facets,
34 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
35 pds_email, pds_password_encrypted,
36 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url,
37 visibility, allow_external_discovery, moderation_type, content_warnings,
38 member_count, subscriber_count, post_count,
39 federated_from, federated_id, created_at, updated_at,
40 record_uri, record_cid
41 ) VALUES (
42 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
43 $12,
44 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
45 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
46 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
47 $16,
48 $17, $18, $19, $20,
49 $21, $22, $23, $24, $25, $26, $27, $28, $29
50 )
51 RETURNING id, created_at, updated_at`
52
53 // Handle JSONB field - use sql.NullString with valid JSON or NULL
54 var descFacets interface{}
55 if len(community.DescriptionFacets) > 0 {
56 descFacets = community.DescriptionFacets
57 } else {
58 descFacets = nil
59 }
60
61 err := r.db.QueryRowContext(ctx, query,
62 community.DID,
63 community.Handle, // Always non-empty - constructed by AppView consumer
64 community.Name,
65 nullString(community.DisplayName),
66 nullString(community.Description),
67 descFacets,
68 nullString(community.AvatarCID),
69 nullString(community.BannerCID),
70 community.OwnerDID,
71 community.CreatedByDID,
72 community.HostedByDID,
73 // V2.0: PDS credentials for community account (encrypted at rest)
74 nullString(community.PDSEmail),
75 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt
76 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt
77 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt
78 nullString(community.PDSURL),
79 // V2.0: No key columns - PDS manages all keys
80 community.Visibility,
81 community.AllowExternalDiscovery,
82 nullString(community.ModerationType),
83 pq.Array(community.ContentWarnings),
84 community.MemberCount,
85 community.SubscriberCount,
86 community.PostCount,
87 nullString(community.FederatedFrom),
88 nullString(community.FederatedID),
89 community.CreatedAt,
90 community.UpdatedAt,
91 nullString(community.RecordURI),
92 nullString(community.RecordCID),
93 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
94 if err != nil {
95 // Check for unique constraint violations
96 if strings.Contains(err.Error(), "duplicate key") {
97 if strings.Contains(err.Error(), "communities_did_key") {
98 return nil, communities.ErrCommunityAlreadyExists
99 }
100 if strings.Contains(err.Error(), "communities_handle_key") {
101 return nil, communities.ErrHandleTaken
102 }
103 }
104 return nil, fmt.Errorf("failed to create community: %w", err)
105 }
106
107 return community, nil
108}
109
110// GetByDID retrieves a community by its DID
111// Note: PDS credentials are included (for internal service use only)
112// Handlers MUST use json:"-" tags to prevent credential exposure in APIs
113//
114// V2.0: Key columns not included - PDS manages all keys
115func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
116 community := &communities.Community{}
117 query := `
118 SELECT id, did, handle, name, display_name, description, description_facets,
119 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
120 pds_email,
121 CASE
122 WHEN pds_password_encrypted IS NOT NULL
123 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
124 ELSE NULL
125 END as pds_password,
126 CASE
127 WHEN pds_access_token_encrypted IS NOT NULL
128 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
129 ELSE NULL
130 END as pds_access_token,
131 CASE
132 WHEN pds_refresh_token_encrypted IS NOT NULL
133 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
134 ELSE NULL
135 END as pds_refresh_token,
136 pds_url,
137 visibility, allow_external_discovery, moderation_type, content_warnings,
138 member_count, subscriber_count, post_count,
139 federated_from, federated_id, created_at, updated_at,
140 record_uri, record_cid
141 FROM communities
142 WHERE did = $1`
143
144 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
145 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
146 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString
147 var descFacets []byte
148 var contentWarnings []string
149
150 err := r.db.QueryRowContext(ctx, query, did).Scan(
151 &community.ID, &community.DID, &community.Handle, &community.Name,
152 &displayName, &description, &descFacets,
153 &avatarCID, &bannerCID,
154 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
155 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt)
156 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL,
157 &community.Visibility, &community.AllowExternalDiscovery,
158 &moderationType, pq.Array(&contentWarnings),
159 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
160 &federatedFrom, &federatedID,
161 &community.CreatedAt, &community.UpdatedAt,
162 &recordURI, &recordCID,
163 )
164
165 if err == sql.ErrNoRows {
166 return nil, communities.ErrCommunityNotFound
167 }
168 if err != nil {
169 return nil, fmt.Errorf("failed to get community by DID: %w", err)
170 }
171
172 // Map nullable fields
173 community.DisplayName = displayName.String
174 community.Description = description.String
175 community.AvatarCID = avatarCID.String
176 community.BannerCID = bannerCID.String
177 community.PDSEmail = pdsEmail.String
178 community.PDSPassword = pdsPassword.String
179 community.PDSAccessToken = pdsAccessToken.String
180 community.PDSRefreshToken = pdsRefreshToken.String
181 community.PDSURL = pdsURL.String
182 // V2.0: No key fields - PDS manages all keys
183 community.RotationKeyPEM = "" // Empty - PDS-managed
184 community.SigningKeyPEM = "" // Empty - PDS-managed
185 community.ModerationType = moderationType.String
186 community.ContentWarnings = contentWarnings
187 community.FederatedFrom = federatedFrom.String
188 community.FederatedID = federatedID.String
189 community.RecordURI = recordURI.String
190 community.RecordCID = recordCID.String
191 if descFacets != nil {
192 community.DescriptionFacets = descFacets
193 }
194
195 return community, nil
196}
197
198// GetByHandle retrieves a community by its scoped handle
199func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
200 community := &communities.Community{}
201 query := `
202 SELECT id, did, handle, name, display_name, description, description_facets,
203 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
204 visibility, allow_external_discovery, moderation_type, content_warnings,
205 member_count, subscriber_count, post_count,
206 federated_from, federated_id, created_at, updated_at,
207 record_uri, record_cid
208 FROM communities
209 WHERE handle = $1`
210
211 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
212 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
213 var descFacets []byte
214 var contentWarnings []string
215
216 err := r.db.QueryRowContext(ctx, query, handle).Scan(
217 &community.ID, &community.DID, &community.Handle, &community.Name,
218 &displayName, &description, &descFacets,
219 &avatarCID, &bannerCID,
220 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
221 &community.Visibility, &community.AllowExternalDiscovery,
222 &moderationType, pq.Array(&contentWarnings),
223 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
224 &federatedFrom, &federatedID,
225 &community.CreatedAt, &community.UpdatedAt,
226 &recordURI, &recordCID,
227 )
228
229 if err == sql.ErrNoRows {
230 return nil, communities.ErrCommunityNotFound
231 }
232 if err != nil {
233 return nil, fmt.Errorf("failed to get community by handle: %w", err)
234 }
235
236 // Map nullable fields
237 community.DisplayName = displayName.String
238 community.Description = description.String
239 community.AvatarCID = avatarCID.String
240 community.BannerCID = bannerCID.String
241 community.ModerationType = moderationType.String
242 community.ContentWarnings = contentWarnings
243 community.FederatedFrom = federatedFrom.String
244 community.FederatedID = federatedID.String
245 community.RecordURI = recordURI.String
246 community.RecordCID = recordCID.String
247 if descFacets != nil {
248 community.DescriptionFacets = descFacets
249 }
250
251 return community, nil
252}
253
254// Update modifies an existing community's metadata
255func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
256 query := `
257 UPDATE communities
258 SET display_name = $2, description = $3, description_facets = $4,
259 avatar_cid = $5, banner_cid = $6,
260 visibility = $7, allow_external_discovery = $8,
261 moderation_type = $9, content_warnings = $10,
262 updated_at = NOW(),
263 record_uri = $11, record_cid = $12
264 WHERE did = $1
265 RETURNING updated_at`
266
267 // Handle JSONB field - use sql.NullString with valid JSON or NULL
268 var descFacets interface{}
269 if len(community.DescriptionFacets) > 0 {
270 descFacets = community.DescriptionFacets
271 } else {
272 descFacets = nil
273 }
274
275 err := r.db.QueryRowContext(ctx, query,
276 community.DID,
277 nullString(community.DisplayName),
278 nullString(community.Description),
279 descFacets,
280 nullString(community.AvatarCID),
281 nullString(community.BannerCID),
282 community.Visibility,
283 community.AllowExternalDiscovery,
284 nullString(community.ModerationType),
285 pq.Array(community.ContentWarnings),
286 nullString(community.RecordURI),
287 nullString(community.RecordCID),
288 ).Scan(&community.UpdatedAt)
289
290 if err == sql.ErrNoRows {
291 return nil, communities.ErrCommunityNotFound
292 }
293 if err != nil {
294 return nil, fmt.Errorf("failed to update community: %w", err)
295 }
296
297 return community, nil
298}
299
300// UpdateCredentials atomically updates community's PDS access and refresh tokens
301// CRITICAL: Both tokens must be updated together because refresh tokens are single-use
302// After a successful token refresh, the old refresh token is immediately revoked by the PDS
303func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error {
304 query := `
305 UPDATE communities
306 SET
307 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
308 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
309 updated_at = NOW()
310 WHERE did = $1
311 RETURNING did`
312
313 var returnedDID string
314 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID)
315
316 if err == sql.ErrNoRows {
317 return communities.ErrCommunityNotFound
318 }
319 if err != nil {
320 return fmt.Errorf("failed to update credentials: %w", err)
321 }
322
323 return nil
324}
325
326// Delete removes a community from the database
327func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
328 query := `DELETE FROM communities WHERE did = $1`
329
330 result, err := r.db.ExecContext(ctx, query, did)
331 if err != nil {
332 return fmt.Errorf("failed to delete community: %w", err)
333 }
334
335 rowsAffected, err := result.RowsAffected()
336 if err != nil {
337 return fmt.Errorf("failed to check delete result: %w", err)
338 }
339
340 if rowsAffected == 0 {
341 return communities.ErrCommunityNotFound
342 }
343
344 return nil
345}
346
347// List retrieves communities with filtering and pagination
348func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, error) {
349 // Build query with filters
350 whereClauses := []string{}
351 args := []interface{}{}
352 argCount := 1
353
354 if req.Visibility != "" {
355 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
356 args = append(args, req.Visibility)
357 argCount++
358 }
359
360 // TODO: Add category filter when DB schema supports it
361 // if req.Category != "" { ... }
362
363 // TODO: Add language filter when DB schema supports it
364 // if req.Language != "" { ... }
365
366 whereClause := ""
367 if len(whereClauses) > 0 {
368 whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
369 }
370
371 // Build sort clause - map sort enum to DB columns
372 sortColumn := "subscriber_count" // default: popular
373 sortOrder := "DESC"
374
375 switch req.Sort {
376 case "popular":
377 // Most subscribers (default)
378 sortColumn = "subscriber_count"
379 sortOrder = "DESC"
380 case "active":
381 // Most posts/activity
382 sortColumn = "post_count"
383 sortOrder = "DESC"
384 case "new":
385 // Recently created
386 sortColumn = "created_at"
387 sortOrder = "DESC"
388 case "alphabetical":
389 // Sorted by name A-Z
390 sortColumn = "name"
391 sortOrder = "ASC"
392 default:
393 // Fallback to popular if empty or invalid (should be validated in handler)
394 sortColumn = "subscriber_count"
395 sortOrder = "DESC"
396 }
397
398 // Get communities with pagination
399 query := fmt.Sprintf(`
400 SELECT id, did, handle, name, display_name, description, description_facets,
401 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
402 visibility, allow_external_discovery, moderation_type, content_warnings,
403 member_count, subscriber_count, post_count,
404 federated_from, federated_id, created_at, updated_at,
405 record_uri, record_cid
406 FROM communities
407 %s
408 ORDER BY %s %s
409 LIMIT $%d OFFSET $%d`,
410 whereClause, sortColumn, sortOrder, argCount, argCount+1)
411
412 args = append(args, req.Limit, req.Offset)
413
414 rows, err := r.db.QueryContext(ctx, query, args...)
415 if err != nil {
416 return nil, fmt.Errorf("failed to list communities: %w", err)
417 }
418 defer func() {
419 if closeErr := rows.Close(); closeErr != nil {
420 log.Printf("Failed to close rows: %v", closeErr)
421 }
422 }()
423
424 result := []*communities.Community{}
425 for rows.Next() {
426 community := &communities.Community{}
427 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
428 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
429 var descFacets []byte
430 var contentWarnings []string
431
432 scanErr := rows.Scan(
433 &community.ID, &community.DID, &community.Handle, &community.Name,
434 &displayName, &description, &descFacets,
435 &avatarCID, &bannerCID,
436 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
437 &community.Visibility, &community.AllowExternalDiscovery,
438 &moderationType, pq.Array(&contentWarnings),
439 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
440 &federatedFrom, &federatedID,
441 &community.CreatedAt, &community.UpdatedAt,
442 &recordURI, &recordCID,
443 )
444 if scanErr != nil {
445 return nil, fmt.Errorf("failed to scan community: %w", scanErr)
446 }
447
448 // Map nullable fields
449 community.DisplayName = displayName.String
450 community.Description = description.String
451 community.AvatarCID = avatarCID.String
452 community.BannerCID = bannerCID.String
453 community.ModerationType = moderationType.String
454 community.ContentWarnings = contentWarnings
455 community.FederatedFrom = federatedFrom.String
456 community.FederatedID = federatedID.String
457 community.RecordURI = recordURI.String
458 community.RecordCID = recordCID.String
459 if descFacets != nil {
460 community.DescriptionFacets = descFacets
461 }
462
463 result = append(result, community)
464 }
465
466 if err = rows.Err(); err != nil {
467 return nil, fmt.Errorf("error iterating communities: %w", err)
468 }
469
470 return result, nil
471}
472
473// Search searches communities by name/description using fuzzy matching
474func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
475 // Build query with fuzzy search and visibility filter
476 whereClauses := []string{
477 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
478 }
479 args := []interface{}{req.Query}
480 argCount := 2
481
482 if req.Visibility != "" {
483 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
484 args = append(args, req.Visibility)
485 argCount++
486 }
487
488 whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
489
490 // Get total count
491 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
492 var totalCount int
493 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
494 if err != nil {
495 return nil, 0, fmt.Errorf("failed to count search results: %w", err)
496 }
497
498 // Search with relevance ranking using pg_trgm similarity
499 // Filter out results with very low relevance (< 0.2) to avoid noise
500 query := fmt.Sprintf(`
501 SELECT id, did, handle, name, display_name, description, description_facets,
502 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
503 visibility, allow_external_discovery, moderation_type, content_warnings,
504 member_count, subscriber_count, post_count,
505 federated_from, federated_id, created_at, updated_at,
506 record_uri, record_cid,
507 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
508 FROM communities
509 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
510 ORDER BY relevance DESC, member_count DESC
511 LIMIT $%d OFFSET $%d`,
512 whereClause, argCount, argCount+1)
513
514 args = append(args, req.Limit, req.Offset)
515
516 rows, err := r.db.QueryContext(ctx, query, args...)
517 if err != nil {
518 return nil, 0, fmt.Errorf("failed to search communities: %w", err)
519 }
520 defer func() {
521 if closeErr := rows.Close(); closeErr != nil {
522 log.Printf("Failed to close rows: %v", closeErr)
523 }
524 }()
525
526 result := []*communities.Community{}
527 for rows.Next() {
528 community := &communities.Community{}
529 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
530 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
531 var descFacets []byte
532 var contentWarnings []string
533 var relevance float64
534
535 scanErr := rows.Scan(
536 &community.ID, &community.DID, &community.Handle, &community.Name,
537 &displayName, &description, &descFacets,
538 &avatarCID, &bannerCID,
539 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
540 &community.Visibility, &community.AllowExternalDiscovery,
541 &moderationType, pq.Array(&contentWarnings),
542 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
543 &federatedFrom, &federatedID,
544 &community.CreatedAt, &community.UpdatedAt,
545 &recordURI, &recordCID,
546 &relevance,
547 )
548 if scanErr != nil {
549 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
550 }
551
552 // Map nullable fields
553 community.DisplayName = displayName.String
554 community.Description = description.String
555 community.AvatarCID = avatarCID.String
556 community.BannerCID = bannerCID.String
557 community.ModerationType = moderationType.String
558 community.ContentWarnings = contentWarnings
559 community.FederatedFrom = federatedFrom.String
560 community.FederatedID = federatedID.String
561 community.RecordURI = recordURI.String
562 community.RecordCID = recordCID.String
563 if descFacets != nil {
564 community.DescriptionFacets = descFacets
565 }
566
567 result = append(result, community)
568 }
569
570 if err = rows.Err(); err != nil {
571 return nil, 0, fmt.Errorf("error iterating search results: %w", err)
572 }
573
574 return result, totalCount, nil
575}
576
577// Helper functions
578func nullString(s string) sql.NullString {
579 return sql.NullString{String: s, Valid: s != ""}
580}