A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "log"
8 "strings"
9
10 "Coves/internal/core/communities"
11
12 "github.com/lib/pq"
13)
14
15type postgresCommunityRepo struct {
16 db *sql.DB
17}
18
19// NewCommunityRepository creates a new PostgreSQL community repository
20func NewCommunityRepository(db *sql.DB) communities.Repository {
21 return &postgresCommunityRepo{db: db}
22}
23
24// Create inserts a new community into the communities table
25func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
26 // Validate that handle is always provided (constructed by consumer)
27 if community.Handle == "" {
28 return nil, fmt.Errorf("handle is required (should be constructed by consumer before insert)")
29 }
30
31 query := `
32 INSERT INTO communities (
33 did, handle, name, display_name, description, description_facets,
34 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
35 pds_email, pds_password_encrypted,
36 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url,
37 visibility, allow_external_discovery, moderation_type, content_warnings,
38 member_count, subscriber_count, post_count,
39 federated_from, federated_id, created_at, updated_at,
40 record_uri, record_cid
41 ) VALUES (
42 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
43 $12,
44 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
45 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
46 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
47 $16,
48 $17, $18, $19, $20,
49 $21, $22, $23, $24, $25, $26, $27, $28, $29
50 )
51 RETURNING id, created_at, updated_at`
52
53 // Handle JSONB field - use sql.NullString with valid JSON or NULL
54 var descFacets interface{}
55 if len(community.DescriptionFacets) > 0 {
56 descFacets = community.DescriptionFacets
57 } else {
58 descFacets = nil
59 }
60
61 err := r.db.QueryRowContext(ctx, query,
62 community.DID,
63 community.Handle, // Always non-empty - constructed by AppView consumer
64 community.Name,
65 nullString(community.DisplayName),
66 nullString(community.Description),
67 descFacets,
68 nullString(community.AvatarCID),
69 nullString(community.BannerCID),
70 community.OwnerDID,
71 community.CreatedByDID,
72 community.HostedByDID,
73 // V2.0: PDS credentials for community account (encrypted at rest)
74 nullString(community.PDSEmail),
75 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt
76 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt
77 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt
78 nullString(community.PDSURL),
79 // V2.0: No key columns - PDS manages all keys
80 community.Visibility,
81 community.AllowExternalDiscovery,
82 nullString(community.ModerationType),
83 pq.Array(community.ContentWarnings),
84 community.MemberCount,
85 community.SubscriberCount,
86 community.PostCount,
87 nullString(community.FederatedFrom),
88 nullString(community.FederatedID),
89 community.CreatedAt,
90 community.UpdatedAt,
91 nullString(community.RecordURI),
92 nullString(community.RecordCID),
93 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
94 if err != nil {
95 // Check for unique constraint violations
96 if strings.Contains(err.Error(), "duplicate key") {
97 if strings.Contains(err.Error(), "communities_did_key") {
98 return nil, communities.ErrCommunityAlreadyExists
99 }
100 if strings.Contains(err.Error(), "communities_handle_key") {
101 return nil, communities.ErrHandleTaken
102 }
103 }
104 return nil, fmt.Errorf("failed to create community: %w", err)
105 }
106
107 return community, nil
108}
109
110// GetByDID retrieves a community by its DID
111// Note: PDS credentials are included (for internal service use only)
112// Handlers MUST use json:"-" tags to prevent credential exposure in APIs
113//
114// V2.0: Key columns not included - PDS manages all keys
115func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
116 community := &communities.Community{}
117 query := `
118 SELECT id, did, handle, name, display_name, description, description_facets,
119 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
120 pds_email,
121 CASE
122 WHEN pds_password_encrypted IS NOT NULL
123 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
124 ELSE NULL
125 END as pds_password,
126 CASE
127 WHEN pds_access_token_encrypted IS NOT NULL
128 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
129 ELSE NULL
130 END as pds_access_token,
131 CASE
132 WHEN pds_refresh_token_encrypted IS NOT NULL
133 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
134 ELSE NULL
135 END as pds_refresh_token,
136 pds_url,
137 visibility, allow_external_discovery, moderation_type, content_warnings,
138 member_count, subscriber_count, post_count,
139 federated_from, federated_id, created_at, updated_at,
140 record_uri, record_cid
141 FROM communities
142 WHERE did = $1`
143
144 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
145 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
146 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString
147 var descFacets []byte
148 var contentWarnings []string
149
150 err := r.db.QueryRowContext(ctx, query, did).Scan(
151 &community.ID, &community.DID, &community.Handle, &community.Name,
152 &displayName, &description, &descFacets,
153 &avatarCID, &bannerCID,
154 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
155 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt)
156 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL,
157 &community.Visibility, &community.AllowExternalDiscovery,
158 &moderationType, pq.Array(&contentWarnings),
159 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
160 &federatedFrom, &federatedID,
161 &community.CreatedAt, &community.UpdatedAt,
162 &recordURI, &recordCID,
163 )
164
165 if err == sql.ErrNoRows {
166 return nil, communities.ErrCommunityNotFound
167 }
168 if err != nil {
169 return nil, fmt.Errorf("failed to get community by DID: %w", err)
170 }
171
172 // Map nullable fields
173 community.DisplayName = displayName.String
174 community.Description = description.String
175 community.AvatarCID = avatarCID.String
176 community.BannerCID = bannerCID.String
177 community.PDSEmail = pdsEmail.String
178 community.PDSPassword = pdsPassword.String
179 community.PDSAccessToken = pdsAccessToken.String
180 community.PDSRefreshToken = pdsRefreshToken.String
181 community.PDSURL = pdsURL.String
182 // V2.0: No key fields - PDS manages all keys
183 community.RotationKeyPEM = "" // Empty - PDS-managed
184 community.SigningKeyPEM = "" // Empty - PDS-managed
185 community.ModerationType = moderationType.String
186 community.ContentWarnings = contentWarnings
187 community.FederatedFrom = federatedFrom.String
188 community.FederatedID = federatedID.String
189 community.RecordURI = recordURI.String
190 community.RecordCID = recordCID.String
191 if descFacets != nil {
192 community.DescriptionFacets = descFacets
193 }
194
195 return community, nil
196}
197
198// GetByHandle retrieves a community by its scoped handle
199func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
200 community := &communities.Community{}
201 query := `
202 SELECT id, did, handle, name, display_name, description, description_facets,
203 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
204 visibility, allow_external_discovery, moderation_type, content_warnings,
205 member_count, subscriber_count, post_count,
206 federated_from, federated_id, created_at, updated_at,
207 record_uri, record_cid
208 FROM communities
209 WHERE handle = $1`
210
211 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
212 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
213 var descFacets []byte
214 var contentWarnings []string
215
216 err := r.db.QueryRowContext(ctx, query, handle).Scan(
217 &community.ID, &community.DID, &community.Handle, &community.Name,
218 &displayName, &description, &descFacets,
219 &avatarCID, &bannerCID,
220 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
221 &community.Visibility, &community.AllowExternalDiscovery,
222 &moderationType, pq.Array(&contentWarnings),
223 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
224 &federatedFrom, &federatedID,
225 &community.CreatedAt, &community.UpdatedAt,
226 &recordURI, &recordCID,
227 )
228
229 if err == sql.ErrNoRows {
230 return nil, communities.ErrCommunityNotFound
231 }
232 if err != nil {
233 return nil, fmt.Errorf("failed to get community by handle: %w", err)
234 }
235
236 // Map nullable fields
237 community.DisplayName = displayName.String
238 community.Description = description.String
239 community.AvatarCID = avatarCID.String
240 community.BannerCID = bannerCID.String
241 community.ModerationType = moderationType.String
242 community.ContentWarnings = contentWarnings
243 community.FederatedFrom = federatedFrom.String
244 community.FederatedID = federatedID.String
245 community.RecordURI = recordURI.String
246 community.RecordCID = recordCID.String
247 if descFacets != nil {
248 community.DescriptionFacets = descFacets
249 }
250
251 return community, nil
252}
253
254// Update modifies an existing community's metadata
255func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
256 query := `
257 UPDATE communities
258 SET display_name = $2, description = $3, description_facets = $4,
259 avatar_cid = $5, banner_cid = $6,
260 visibility = $7, allow_external_discovery = $8,
261 moderation_type = $9, content_warnings = $10,
262 updated_at = NOW(),
263 record_uri = $11, record_cid = $12
264 WHERE did = $1
265 RETURNING updated_at`
266
267 // Handle JSONB field - use sql.NullString with valid JSON or NULL
268 var descFacets interface{}
269 if len(community.DescriptionFacets) > 0 {
270 descFacets = community.DescriptionFacets
271 } else {
272 descFacets = nil
273 }
274
275 err := r.db.QueryRowContext(ctx, query,
276 community.DID,
277 nullString(community.DisplayName),
278 nullString(community.Description),
279 descFacets,
280 nullString(community.AvatarCID),
281 nullString(community.BannerCID),
282 community.Visibility,
283 community.AllowExternalDiscovery,
284 nullString(community.ModerationType),
285 pq.Array(community.ContentWarnings),
286 nullString(community.RecordURI),
287 nullString(community.RecordCID),
288 ).Scan(&community.UpdatedAt)
289
290 if err == sql.ErrNoRows {
291 return nil, communities.ErrCommunityNotFound
292 }
293 if err != nil {
294 return nil, fmt.Errorf("failed to update community: %w", err)
295 }
296
297 return community, nil
298}
299
300// UpdateCredentials atomically updates community's PDS access and refresh tokens
301// CRITICAL: Both tokens must be updated together because refresh tokens are single-use
302// After a successful token refresh, the old refresh token is immediately revoked by the PDS
303func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error {
304 query := `
305 UPDATE communities
306 SET
307 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
308 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
309 updated_at = NOW()
310 WHERE did = $1
311 RETURNING did`
312
313 var returnedDID string
314 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID)
315
316 if err == sql.ErrNoRows {
317 return communities.ErrCommunityNotFound
318 }
319 if err != nil {
320 return fmt.Errorf("failed to update credentials: %w", err)
321 }
322
323 return nil
324}
325
326// Delete removes a community from the database
327func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
328 query := `DELETE FROM communities WHERE did = $1`
329
330 result, err := r.db.ExecContext(ctx, query, did)
331 if err != nil {
332 return fmt.Errorf("failed to delete community: %w", err)
333 }
334
335 rowsAffected, err := result.RowsAffected()
336 if err != nil {
337 return fmt.Errorf("failed to check delete result: %w", err)
338 }
339
340 if rowsAffected == 0 {
341 return communities.ErrCommunityNotFound
342 }
343
344 return nil
345}
346
347// List retrieves communities with filtering and pagination
348func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) {
349 // Build query with filters
350 whereClauses := []string{}
351 args := []interface{}{}
352 argCount := 1
353
354 if req.Visibility != "" {
355 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
356 args = append(args, req.Visibility)
357 argCount++
358 }
359
360 if req.HostedBy != "" {
361 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount))
362 args = append(args, req.HostedBy)
363 argCount++
364 }
365
366 whereClause := ""
367 if len(whereClauses) > 0 {
368 whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
369 }
370
371 // Get total count
372 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
373 var totalCount int
374 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
375 if err != nil {
376 return nil, 0, fmt.Errorf("failed to count communities: %w", err)
377 }
378
379 // Build sort clause
380 sortColumn := "created_at"
381 if req.SortBy != "" {
382 switch req.SortBy {
383 case "member_count", "subscriber_count", "post_count", "created_at":
384 sortColumn = req.SortBy
385 }
386 }
387
388 sortOrder := "DESC"
389 if strings.ToUpper(req.SortOrder) == "ASC" {
390 sortOrder = "ASC"
391 }
392
393 // Get communities with pagination
394 query := fmt.Sprintf(`
395 SELECT id, did, handle, name, display_name, description, description_facets,
396 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
397 visibility, allow_external_discovery, moderation_type, content_warnings,
398 member_count, subscriber_count, post_count,
399 federated_from, federated_id, created_at, updated_at,
400 record_uri, record_cid
401 FROM communities
402 %s
403 ORDER BY %s %s
404 LIMIT $%d OFFSET $%d`,
405 whereClause, sortColumn, sortOrder, argCount, argCount+1)
406
407 args = append(args, req.Limit, req.Offset)
408
409 rows, err := r.db.QueryContext(ctx, query, args...)
410 if err != nil {
411 return nil, 0, fmt.Errorf("failed to list communities: %w", err)
412 }
413 defer func() {
414 if closeErr := rows.Close(); closeErr != nil {
415 log.Printf("Failed to close rows: %v", closeErr)
416 }
417 }()
418
419 result := []*communities.Community{}
420 for rows.Next() {
421 community := &communities.Community{}
422 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
423 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
424 var descFacets []byte
425 var contentWarnings []string
426
427 scanErr := rows.Scan(
428 &community.ID, &community.DID, &community.Handle, &community.Name,
429 &displayName, &description, &descFacets,
430 &avatarCID, &bannerCID,
431 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
432 &community.Visibility, &community.AllowExternalDiscovery,
433 &moderationType, pq.Array(&contentWarnings),
434 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
435 &federatedFrom, &federatedID,
436 &community.CreatedAt, &community.UpdatedAt,
437 &recordURI, &recordCID,
438 )
439 if scanErr != nil {
440 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
441 }
442
443 // Map nullable fields
444 community.DisplayName = displayName.String
445 community.Description = description.String
446 community.AvatarCID = avatarCID.String
447 community.BannerCID = bannerCID.String
448 community.ModerationType = moderationType.String
449 community.ContentWarnings = contentWarnings
450 community.FederatedFrom = federatedFrom.String
451 community.FederatedID = federatedID.String
452 community.RecordURI = recordURI.String
453 community.RecordCID = recordCID.String
454 if descFacets != nil {
455 community.DescriptionFacets = descFacets
456 }
457
458 result = append(result, community)
459 }
460
461 if err = rows.Err(); err != nil {
462 return nil, 0, fmt.Errorf("error iterating communities: %w", err)
463 }
464
465 return result, totalCount, nil
466}
467
468// Search searches communities by name/description using fuzzy matching
469func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
470 // Build query with fuzzy search and visibility filter
471 whereClauses := []string{
472 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
473 }
474 args := []interface{}{req.Query}
475 argCount := 2
476
477 if req.Visibility != "" {
478 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
479 args = append(args, req.Visibility)
480 argCount++
481 }
482
483 whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
484
485 // Get total count
486 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
487 var totalCount int
488 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
489 if err != nil {
490 return nil, 0, fmt.Errorf("failed to count search results: %w", err)
491 }
492
493 // Search with relevance ranking using pg_trgm similarity
494 // Filter out results with very low relevance (< 0.2) to avoid noise
495 query := fmt.Sprintf(`
496 SELECT id, did, handle, name, display_name, description, description_facets,
497 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
498 visibility, allow_external_discovery, moderation_type, content_warnings,
499 member_count, subscriber_count, post_count,
500 federated_from, federated_id, created_at, updated_at,
501 record_uri, record_cid,
502 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
503 FROM communities
504 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
505 ORDER BY relevance DESC, member_count DESC
506 LIMIT $%d OFFSET $%d`,
507 whereClause, argCount, argCount+1)
508
509 args = append(args, req.Limit, req.Offset)
510
511 rows, err := r.db.QueryContext(ctx, query, args...)
512 if err != nil {
513 return nil, 0, fmt.Errorf("failed to search communities: %w", err)
514 }
515 defer func() {
516 if closeErr := rows.Close(); closeErr != nil {
517 log.Printf("Failed to close rows: %v", closeErr)
518 }
519 }()
520
521 result := []*communities.Community{}
522 for rows.Next() {
523 community := &communities.Community{}
524 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
525 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
526 var descFacets []byte
527 var contentWarnings []string
528 var relevance float64
529
530 scanErr := rows.Scan(
531 &community.ID, &community.DID, &community.Handle, &community.Name,
532 &displayName, &description, &descFacets,
533 &avatarCID, &bannerCID,
534 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
535 &community.Visibility, &community.AllowExternalDiscovery,
536 &moderationType, pq.Array(&contentWarnings),
537 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
538 &federatedFrom, &federatedID,
539 &community.CreatedAt, &community.UpdatedAt,
540 &recordURI, &recordCID,
541 &relevance,
542 )
543 if scanErr != nil {
544 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
545 }
546
547 // Map nullable fields
548 community.DisplayName = displayName.String
549 community.Description = description.String
550 community.AvatarCID = avatarCID.String
551 community.BannerCID = bannerCID.String
552 community.ModerationType = moderationType.String
553 community.ContentWarnings = contentWarnings
554 community.FederatedFrom = federatedFrom.String
555 community.FederatedID = federatedID.String
556 community.RecordURI = recordURI.String
557 community.RecordCID = recordCID.String
558 if descFacets != nil {
559 community.DescriptionFacets = descFacets
560 }
561
562 result = append(result, community)
563 }
564
565 if err = rows.Err(); err != nil {
566 return nil, 0, fmt.Errorf("error iterating search results: %w", err)
567 }
568
569 return result, totalCount, nil
570}
571
572// Helper functions
573func nullString(s string) sql.NullString {
574 return sql.NullString{String: s, Valid: s != ""}
575}