A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "strings"
10
11 "github.com/lib/pq"
12)
13
14type postgresCommunityRepo struct {
15 db *sql.DB
16}
17
18// NewCommunityRepository creates a new PostgreSQL community repository
19func NewCommunityRepository(db *sql.DB) communities.Repository {
20 return &postgresCommunityRepo{db: db}
21}
22
23// Create inserts a new community into the communities table
24func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
25 // Validate that handle is always provided (constructed by consumer)
26 if community.Handle == "" {
27 return nil, fmt.Errorf("handle is required (should be constructed by consumer before insert)")
28 }
29
30 query := `
31 INSERT INTO communities (
32 did, handle, name, display_name, description, description_facets,
33 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
34 pds_email, pds_password_encrypted,
35 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url,
36 visibility, allow_external_discovery, moderation_type, content_warnings,
37 member_count, subscriber_count, post_count,
38 federated_from, federated_id, created_at, updated_at,
39 record_uri, record_cid
40 ) VALUES (
41 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
42 $12,
43 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
44 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
45 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
46 $16,
47 $17, $18, $19, $20,
48 $21, $22, $23, $24, $25, $26, $27, $28, $29
49 )
50 RETURNING id, created_at, updated_at`
51
52 // Handle JSONB field - use sql.NullString with valid JSON or NULL
53 var descFacets interface{}
54 if len(community.DescriptionFacets) > 0 {
55 descFacets = community.DescriptionFacets
56 } else {
57 descFacets = nil
58 }
59
60 err := r.db.QueryRowContext(ctx, query,
61 community.DID,
62 community.Handle, // Always non-empty - constructed by AppView consumer
63 community.Name,
64 nullString(community.DisplayName),
65 nullString(community.Description),
66 descFacets,
67 nullString(community.AvatarCID),
68 nullString(community.BannerCID),
69 community.OwnerDID,
70 community.CreatedByDID,
71 community.HostedByDID,
72 // V2.0: PDS credentials for community account (encrypted at rest)
73 nullString(community.PDSEmail),
74 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt
75 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt
76 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt
77 nullString(community.PDSURL),
78 // V2.0: No key columns - PDS manages all keys
79 community.Visibility,
80 community.AllowExternalDiscovery,
81 nullString(community.ModerationType),
82 pq.Array(community.ContentWarnings),
83 community.MemberCount,
84 community.SubscriberCount,
85 community.PostCount,
86 nullString(community.FederatedFrom),
87 nullString(community.FederatedID),
88 community.CreatedAt,
89 community.UpdatedAt,
90 nullString(community.RecordURI),
91 nullString(community.RecordCID),
92 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
93 if err != nil {
94 // Check for unique constraint violations
95 if strings.Contains(err.Error(), "duplicate key") {
96 if strings.Contains(err.Error(), "communities_did_key") {
97 return nil, communities.ErrCommunityAlreadyExists
98 }
99 if strings.Contains(err.Error(), "communities_handle_key") {
100 return nil, communities.ErrHandleTaken
101 }
102 }
103 return nil, fmt.Errorf("failed to create community: %w", err)
104 }
105
106 return community, nil
107}
108
109// GetByDID retrieves a community by its DID
110// Note: PDS credentials are included (for internal service use only)
111// Handlers MUST use json:"-" tags to prevent credential exposure in APIs
112//
113// V2.0: Key columns not included - PDS manages all keys
114func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
115 community := &communities.Community{}
116 query := `
117 SELECT id, did, handle, name, display_name, description, description_facets,
118 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
119 pds_email,
120 CASE
121 WHEN pds_password_encrypted IS NOT NULL
122 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
123 ELSE NULL
124 END as pds_password,
125 CASE
126 WHEN pds_access_token_encrypted IS NOT NULL
127 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
128 ELSE NULL
129 END as pds_access_token,
130 CASE
131 WHEN pds_refresh_token_encrypted IS NOT NULL
132 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
133 ELSE NULL
134 END as pds_refresh_token,
135 pds_url,
136 visibility, allow_external_discovery, moderation_type, content_warnings,
137 member_count, subscriber_count, post_count,
138 federated_from, federated_id, created_at, updated_at,
139 record_uri, record_cid
140 FROM communities
141 WHERE did = $1`
142
143 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
144 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
145 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString
146 var descFacets []byte
147 var contentWarnings []string
148
149 err := r.db.QueryRowContext(ctx, query, did).Scan(
150 &community.ID, &community.DID, &community.Handle, &community.Name,
151 &displayName, &description, &descFacets,
152 &avatarCID, &bannerCID,
153 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
154 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt)
155 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL,
156 &community.Visibility, &community.AllowExternalDiscovery,
157 &moderationType, pq.Array(&contentWarnings),
158 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
159 &federatedFrom, &federatedID,
160 &community.CreatedAt, &community.UpdatedAt,
161 &recordURI, &recordCID,
162 )
163
164 if err == sql.ErrNoRows {
165 return nil, communities.ErrCommunityNotFound
166 }
167 if err != nil {
168 return nil, fmt.Errorf("failed to get community by DID: %w", err)
169 }
170
171 // Map nullable fields
172 community.DisplayName = displayName.String
173 community.Description = description.String
174 community.AvatarCID = avatarCID.String
175 community.BannerCID = bannerCID.String
176 community.PDSEmail = pdsEmail.String
177 community.PDSPassword = pdsPassword.String
178 community.PDSAccessToken = pdsAccessToken.String
179 community.PDSRefreshToken = pdsRefreshToken.String
180 community.PDSURL = pdsURL.String
181 // V2.0: No key fields - PDS manages all keys
182 community.RotationKeyPEM = "" // Empty - PDS-managed
183 community.SigningKeyPEM = "" // Empty - PDS-managed
184 community.ModerationType = moderationType.String
185 community.ContentWarnings = contentWarnings
186 community.FederatedFrom = federatedFrom.String
187 community.FederatedID = federatedID.String
188 community.RecordURI = recordURI.String
189 community.RecordCID = recordCID.String
190 if descFacets != nil {
191 community.DescriptionFacets = descFacets
192 }
193
194 return community, nil
195}
196
197// GetByHandle retrieves a community by its scoped handle
198func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
199 community := &communities.Community{}
200 query := `
201 SELECT id, did, handle, name, display_name, description, description_facets,
202 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
203 visibility, allow_external_discovery, moderation_type, content_warnings,
204 member_count, subscriber_count, post_count,
205 federated_from, federated_id, created_at, updated_at,
206 record_uri, record_cid
207 FROM communities
208 WHERE handle = $1`
209
210 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
211 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
212 var descFacets []byte
213 var contentWarnings []string
214
215 err := r.db.QueryRowContext(ctx, query, handle).Scan(
216 &community.ID, &community.DID, &community.Handle, &community.Name,
217 &displayName, &description, &descFacets,
218 &avatarCID, &bannerCID,
219 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
220 &community.Visibility, &community.AllowExternalDiscovery,
221 &moderationType, pq.Array(&contentWarnings),
222 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
223 &federatedFrom, &federatedID,
224 &community.CreatedAt, &community.UpdatedAt,
225 &recordURI, &recordCID,
226 )
227
228 if err == sql.ErrNoRows {
229 return nil, communities.ErrCommunityNotFound
230 }
231 if err != nil {
232 return nil, fmt.Errorf("failed to get community by handle: %w", err)
233 }
234
235 // Map nullable fields
236 community.DisplayName = displayName.String
237 community.Description = description.String
238 community.AvatarCID = avatarCID.String
239 community.BannerCID = bannerCID.String
240 community.ModerationType = moderationType.String
241 community.ContentWarnings = contentWarnings
242 community.FederatedFrom = federatedFrom.String
243 community.FederatedID = federatedID.String
244 community.RecordURI = recordURI.String
245 community.RecordCID = recordCID.String
246 if descFacets != nil {
247 community.DescriptionFacets = descFacets
248 }
249
250 return community, nil
251}
252
253// Update modifies an existing community's metadata
254func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
255 query := `
256 UPDATE communities
257 SET display_name = $2, description = $3, description_facets = $4,
258 avatar_cid = $5, banner_cid = $6,
259 visibility = $7, allow_external_discovery = $8,
260 moderation_type = $9, content_warnings = $10,
261 updated_at = NOW(),
262 record_uri = $11, record_cid = $12
263 WHERE did = $1
264 RETURNING updated_at`
265
266 // Handle JSONB field - use sql.NullString with valid JSON or NULL
267 var descFacets interface{}
268 if len(community.DescriptionFacets) > 0 {
269 descFacets = community.DescriptionFacets
270 } else {
271 descFacets = nil
272 }
273
274 err := r.db.QueryRowContext(ctx, query,
275 community.DID,
276 nullString(community.DisplayName),
277 nullString(community.Description),
278 descFacets,
279 nullString(community.AvatarCID),
280 nullString(community.BannerCID),
281 community.Visibility,
282 community.AllowExternalDiscovery,
283 nullString(community.ModerationType),
284 pq.Array(community.ContentWarnings),
285 nullString(community.RecordURI),
286 nullString(community.RecordCID),
287 ).Scan(&community.UpdatedAt)
288
289 if err == sql.ErrNoRows {
290 return nil, communities.ErrCommunityNotFound
291 }
292 if err != nil {
293 return nil, fmt.Errorf("failed to update community: %w", err)
294 }
295
296 return community, nil
297}
298
299// UpdateCredentials atomically updates community's PDS access and refresh tokens
300// CRITICAL: Both tokens must be updated together because refresh tokens are single-use
301// After a successful token refresh, the old refresh token is immediately revoked by the PDS
302func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error {
303 query := `
304 UPDATE communities
305 SET
306 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
307 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
308 updated_at = NOW()
309 WHERE did = $1
310 RETURNING did`
311
312 var returnedDID string
313 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID)
314
315 if err == sql.ErrNoRows {
316 return communities.ErrCommunityNotFound
317 }
318 if err != nil {
319 return fmt.Errorf("failed to update credentials: %w", err)
320 }
321
322 return nil
323}
324
325// Delete removes a community from the database
326func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
327 query := `DELETE FROM communities WHERE did = $1`
328
329 result, err := r.db.ExecContext(ctx, query, did)
330 if err != nil {
331 return fmt.Errorf("failed to delete community: %w", err)
332 }
333
334 rowsAffected, err := result.RowsAffected()
335 if err != nil {
336 return fmt.Errorf("failed to check delete result: %w", err)
337 }
338
339 if rowsAffected == 0 {
340 return communities.ErrCommunityNotFound
341 }
342
343 return nil
344}
345
346// List retrieves communities with filtering and pagination
347func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) {
348 // Build query with filters
349 whereClauses := []string{}
350 args := []interface{}{}
351 argCount := 1
352
353 if req.Visibility != "" {
354 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
355 args = append(args, req.Visibility)
356 argCount++
357 }
358
359 if req.HostedBy != "" {
360 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount))
361 args = append(args, req.HostedBy)
362 argCount++
363 }
364
365 whereClause := ""
366 if len(whereClauses) > 0 {
367 whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
368 }
369
370 // Get total count
371 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
372 var totalCount int
373 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
374 if err != nil {
375 return nil, 0, fmt.Errorf("failed to count communities: %w", err)
376 }
377
378 // Build sort clause
379 sortColumn := "created_at"
380 if req.SortBy != "" {
381 switch req.SortBy {
382 case "member_count", "subscriber_count", "post_count", "created_at":
383 sortColumn = req.SortBy
384 }
385 }
386
387 sortOrder := "DESC"
388 if strings.ToUpper(req.SortOrder) == "ASC" {
389 sortOrder = "ASC"
390 }
391
392 // Get communities with pagination
393 query := fmt.Sprintf(`
394 SELECT id, did, handle, name, display_name, description, description_facets,
395 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
396 visibility, allow_external_discovery, moderation_type, content_warnings,
397 member_count, subscriber_count, post_count,
398 federated_from, federated_id, created_at, updated_at,
399 record_uri, record_cid
400 FROM communities
401 %s
402 ORDER BY %s %s
403 LIMIT $%d OFFSET $%d`,
404 whereClause, sortColumn, sortOrder, argCount, argCount+1)
405
406 args = append(args, req.Limit, req.Offset)
407
408 rows, err := r.db.QueryContext(ctx, query, args...)
409 if err != nil {
410 return nil, 0, fmt.Errorf("failed to list communities: %w", err)
411 }
412 defer func() {
413 if closeErr := rows.Close(); closeErr != nil {
414 log.Printf("Failed to close rows: %v", closeErr)
415 }
416 }()
417
418 result := []*communities.Community{}
419 for rows.Next() {
420 community := &communities.Community{}
421 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
422 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
423 var descFacets []byte
424 var contentWarnings []string
425
426 scanErr := rows.Scan(
427 &community.ID, &community.DID, &community.Handle, &community.Name,
428 &displayName, &description, &descFacets,
429 &avatarCID, &bannerCID,
430 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
431 &community.Visibility, &community.AllowExternalDiscovery,
432 &moderationType, pq.Array(&contentWarnings),
433 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
434 &federatedFrom, &federatedID,
435 &community.CreatedAt, &community.UpdatedAt,
436 &recordURI, &recordCID,
437 )
438 if scanErr != nil {
439 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
440 }
441
442 // Map nullable fields
443 community.DisplayName = displayName.String
444 community.Description = description.String
445 community.AvatarCID = avatarCID.String
446 community.BannerCID = bannerCID.String
447 community.ModerationType = moderationType.String
448 community.ContentWarnings = contentWarnings
449 community.FederatedFrom = federatedFrom.String
450 community.FederatedID = federatedID.String
451 community.RecordURI = recordURI.String
452 community.RecordCID = recordCID.String
453 if descFacets != nil {
454 community.DescriptionFacets = descFacets
455 }
456
457 result = append(result, community)
458 }
459
460 if err = rows.Err(); err != nil {
461 return nil, 0, fmt.Errorf("error iterating communities: %w", err)
462 }
463
464 return result, totalCount, nil
465}
466
467// Search searches communities by name/description using fuzzy matching
468func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
469 // Build query with fuzzy search and visibility filter
470 whereClauses := []string{
471 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
472 }
473 args := []interface{}{req.Query}
474 argCount := 2
475
476 if req.Visibility != "" {
477 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
478 args = append(args, req.Visibility)
479 argCount++
480 }
481
482 whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
483
484 // Get total count
485 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
486 var totalCount int
487 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
488 if err != nil {
489 return nil, 0, fmt.Errorf("failed to count search results: %w", err)
490 }
491
492 // Search with relevance ranking using pg_trgm similarity
493 // Filter out results with very low relevance (< 0.2) to avoid noise
494 query := fmt.Sprintf(`
495 SELECT id, did, handle, name, display_name, description, description_facets,
496 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
497 visibility, allow_external_discovery, moderation_type, content_warnings,
498 member_count, subscriber_count, post_count,
499 federated_from, federated_id, created_at, updated_at,
500 record_uri, record_cid,
501 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
502 FROM communities
503 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
504 ORDER BY relevance DESC, member_count DESC
505 LIMIT $%d OFFSET $%d`,
506 whereClause, argCount, argCount+1)
507
508 args = append(args, req.Limit, req.Offset)
509
510 rows, err := r.db.QueryContext(ctx, query, args...)
511 if err != nil {
512 return nil, 0, fmt.Errorf("failed to search communities: %w", err)
513 }
514 defer func() {
515 if closeErr := rows.Close(); closeErr != nil {
516 log.Printf("Failed to close rows: %v", closeErr)
517 }
518 }()
519
520 result := []*communities.Community{}
521 for rows.Next() {
522 community := &communities.Community{}
523 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
524 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
525 var descFacets []byte
526 var contentWarnings []string
527 var relevance float64
528
529 scanErr := rows.Scan(
530 &community.ID, &community.DID, &community.Handle, &community.Name,
531 &displayName, &description, &descFacets,
532 &avatarCID, &bannerCID,
533 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
534 &community.Visibility, &community.AllowExternalDiscovery,
535 &moderationType, pq.Array(&contentWarnings),
536 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
537 &federatedFrom, &federatedID,
538 &community.CreatedAt, &community.UpdatedAt,
539 &recordURI, &recordCID,
540 &relevance,
541 )
542 if scanErr != nil {
543 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
544 }
545
546 // Map nullable fields
547 community.DisplayName = displayName.String
548 community.Description = description.String
549 community.AvatarCID = avatarCID.String
550 community.BannerCID = bannerCID.String
551 community.ModerationType = moderationType.String
552 community.ContentWarnings = contentWarnings
553 community.FederatedFrom = federatedFrom.String
554 community.FederatedID = federatedID.String
555 community.RecordURI = recordURI.String
556 community.RecordCID = recordCID.String
557 if descFacets != nil {
558 community.DescriptionFacets = descFacets
559 }
560
561 result = append(result, community)
562 }
563
564 if err = rows.Err(); err != nil {
565 return nil, 0, fmt.Errorf("error iterating search results: %w", err)
566 }
567
568 return result, totalCount, nil
569}
570
571// Helper functions
572func nullString(s string) sql.NullString {
573 return sql.NullString{String: s, Valid: s != ""}
574}