A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "strings"
10
11 "github.com/lib/pq"
12)
13
14type postgresCommunityRepo struct {
15 db *sql.DB
16}
17
18// NewCommunityRepository creates a new PostgreSQL community repository
19func NewCommunityRepository(db *sql.DB) communities.Repository {
20 return &postgresCommunityRepo{db: db}
21}
22
23// Create inserts a new community into the communities table
24func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
25 query := `
26 INSERT INTO communities (
27 did, handle, name, display_name, description, description_facets,
28 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
29 pds_email, pds_password_encrypted,
30 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url,
31 visibility, allow_external_discovery, moderation_type, content_warnings,
32 member_count, subscriber_count, post_count,
33 federated_from, federated_id, created_at, updated_at,
34 record_uri, record_cid
35 ) VALUES (
36 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
37 $12,
38 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
39 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
40 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
41 $16,
42 $17, $18, $19, $20,
43 $21, $22, $23, $24, $25, $26, $27, $28, $29
44 )
45 RETURNING id, created_at, updated_at`
46
47 // Handle JSONB field - use sql.NullString with valid JSON or NULL
48 var descFacets interface{}
49 if len(community.DescriptionFacets) > 0 {
50 descFacets = community.DescriptionFacets
51 } else {
52 descFacets = nil
53 }
54
55 err := r.db.QueryRowContext(ctx, query,
56 community.DID,
57 community.Handle,
58 community.Name,
59 nullString(community.DisplayName),
60 nullString(community.Description),
61 descFacets,
62 nullString(community.AvatarCID),
63 nullString(community.BannerCID),
64 community.OwnerDID,
65 community.CreatedByDID,
66 community.HostedByDID,
67 // V2.0: PDS credentials for community account (encrypted at rest)
68 nullString(community.PDSEmail),
69 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt
70 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt
71 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt
72 nullString(community.PDSURL),
73 // V2.0: No key columns - PDS manages all keys
74 community.Visibility,
75 community.AllowExternalDiscovery,
76 nullString(community.ModerationType),
77 pq.Array(community.ContentWarnings),
78 community.MemberCount,
79 community.SubscriberCount,
80 community.PostCount,
81 nullString(community.FederatedFrom),
82 nullString(community.FederatedID),
83 community.CreatedAt,
84 community.UpdatedAt,
85 nullString(community.RecordURI),
86 nullString(community.RecordCID),
87 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
88 if err != nil {
89 // Check for unique constraint violations
90 if strings.Contains(err.Error(), "duplicate key") {
91 if strings.Contains(err.Error(), "communities_did_key") {
92 return nil, communities.ErrCommunityAlreadyExists
93 }
94 if strings.Contains(err.Error(), "communities_handle_key") {
95 return nil, communities.ErrHandleTaken
96 }
97 }
98 return nil, fmt.Errorf("failed to create community: %w", err)
99 }
100
101 return community, nil
102}
103
104// GetByDID retrieves a community by its DID
105// Note: PDS credentials are included (for internal service use only)
106// Handlers MUST use json:"-" tags to prevent credential exposure in APIs
107//
108// V2.0: Key columns not included - PDS manages all keys
109func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
110 community := &communities.Community{}
111 query := `
112 SELECT id, did, handle, name, display_name, description, description_facets,
113 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
114 pds_email,
115 CASE
116 WHEN pds_password_encrypted IS NOT NULL
117 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
118 ELSE NULL
119 END as pds_password,
120 CASE
121 WHEN pds_access_token_encrypted IS NOT NULL
122 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
123 ELSE NULL
124 END as pds_access_token,
125 CASE
126 WHEN pds_refresh_token_encrypted IS NOT NULL
127 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
128 ELSE NULL
129 END as pds_refresh_token,
130 pds_url,
131 visibility, allow_external_discovery, moderation_type, content_warnings,
132 member_count, subscriber_count, post_count,
133 federated_from, federated_id, created_at, updated_at,
134 record_uri, record_cid
135 FROM communities
136 WHERE did = $1`
137
138 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
139 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
140 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString
141 var descFacets []byte
142 var contentWarnings []string
143
144 err := r.db.QueryRowContext(ctx, query, did).Scan(
145 &community.ID, &community.DID, &community.Handle, &community.Name,
146 &displayName, &description, &descFacets,
147 &avatarCID, &bannerCID,
148 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
149 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt)
150 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL,
151 &community.Visibility, &community.AllowExternalDiscovery,
152 &moderationType, pq.Array(&contentWarnings),
153 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
154 &federatedFrom, &federatedID,
155 &community.CreatedAt, &community.UpdatedAt,
156 &recordURI, &recordCID,
157 )
158
159 if err == sql.ErrNoRows {
160 return nil, communities.ErrCommunityNotFound
161 }
162 if err != nil {
163 return nil, fmt.Errorf("failed to get community by DID: %w", err)
164 }
165
166 // Map nullable fields
167 community.DisplayName = displayName.String
168 community.Description = description.String
169 community.AvatarCID = avatarCID.String
170 community.BannerCID = bannerCID.String
171 community.PDSEmail = pdsEmail.String
172 community.PDSPassword = pdsPassword.String
173 community.PDSAccessToken = pdsAccessToken.String
174 community.PDSRefreshToken = pdsRefreshToken.String
175 community.PDSURL = pdsURL.String
176 // V2.0: No key fields - PDS manages all keys
177 community.RotationKeyPEM = "" // Empty - PDS-managed
178 community.SigningKeyPEM = "" // Empty - PDS-managed
179 community.ModerationType = moderationType.String
180 community.ContentWarnings = contentWarnings
181 community.FederatedFrom = federatedFrom.String
182 community.FederatedID = federatedID.String
183 community.RecordURI = recordURI.String
184 community.RecordCID = recordCID.String
185 if descFacets != nil {
186 community.DescriptionFacets = descFacets
187 }
188
189 return community, nil
190}
191
192// GetByHandle retrieves a community by its scoped handle
193func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
194 community := &communities.Community{}
195 query := `
196 SELECT id, did, handle, name, display_name, description, description_facets,
197 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
198 visibility, allow_external_discovery, moderation_type, content_warnings,
199 member_count, subscriber_count, post_count,
200 federated_from, federated_id, created_at, updated_at,
201 record_uri, record_cid
202 FROM communities
203 WHERE handle = $1`
204
205 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
206 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
207 var descFacets []byte
208 var contentWarnings []string
209
210 err := r.db.QueryRowContext(ctx, query, handle).Scan(
211 &community.ID, &community.DID, &community.Handle, &community.Name,
212 &displayName, &description, &descFacets,
213 &avatarCID, &bannerCID,
214 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
215 &community.Visibility, &community.AllowExternalDiscovery,
216 &moderationType, pq.Array(&contentWarnings),
217 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
218 &federatedFrom, &federatedID,
219 &community.CreatedAt, &community.UpdatedAt,
220 &recordURI, &recordCID,
221 )
222
223 if err == sql.ErrNoRows {
224 return nil, communities.ErrCommunityNotFound
225 }
226 if err != nil {
227 return nil, fmt.Errorf("failed to get community by handle: %w", err)
228 }
229
230 // Map nullable fields
231 community.DisplayName = displayName.String
232 community.Description = description.String
233 community.AvatarCID = avatarCID.String
234 community.BannerCID = bannerCID.String
235 community.ModerationType = moderationType.String
236 community.ContentWarnings = contentWarnings
237 community.FederatedFrom = federatedFrom.String
238 community.FederatedID = federatedID.String
239 community.RecordURI = recordURI.String
240 community.RecordCID = recordCID.String
241 if descFacets != nil {
242 community.DescriptionFacets = descFacets
243 }
244
245 return community, nil
246}
247
248// Update modifies an existing community's metadata
249func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
250 query := `
251 UPDATE communities
252 SET display_name = $2, description = $3, description_facets = $4,
253 avatar_cid = $5, banner_cid = $6,
254 visibility = $7, allow_external_discovery = $8,
255 moderation_type = $9, content_warnings = $10,
256 updated_at = NOW(),
257 record_uri = $11, record_cid = $12
258 WHERE did = $1
259 RETURNING updated_at`
260
261 // Handle JSONB field - use sql.NullString with valid JSON or NULL
262 var descFacets interface{}
263 if len(community.DescriptionFacets) > 0 {
264 descFacets = community.DescriptionFacets
265 } else {
266 descFacets = nil
267 }
268
269 err := r.db.QueryRowContext(ctx, query,
270 community.DID,
271 nullString(community.DisplayName),
272 nullString(community.Description),
273 descFacets,
274 nullString(community.AvatarCID),
275 nullString(community.BannerCID),
276 community.Visibility,
277 community.AllowExternalDiscovery,
278 nullString(community.ModerationType),
279 pq.Array(community.ContentWarnings),
280 nullString(community.RecordURI),
281 nullString(community.RecordCID),
282 ).Scan(&community.UpdatedAt)
283
284 if err == sql.ErrNoRows {
285 return nil, communities.ErrCommunityNotFound
286 }
287 if err != nil {
288 return nil, fmt.Errorf("failed to update community: %w", err)
289 }
290
291 return community, nil
292}
293
294// UpdateCredentials atomically updates community's PDS access and refresh tokens
295// CRITICAL: Both tokens must be updated together because refresh tokens are single-use
296// After a successful token refresh, the old refresh token is immediately revoked by the PDS
297func (r *postgresCommunityRepo) UpdateCredentials(ctx context.Context, did, accessToken, refreshToken string) error {
298 query := `
299 UPDATE communities
300 SET
301 pds_access_token_encrypted = pgp_sym_encrypt($2, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
302 pds_refresh_token_encrypted = pgp_sym_encrypt($3, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)),
303 updated_at = NOW()
304 WHERE did = $1
305 RETURNING did`
306
307 var returnedDID string
308 err := r.db.QueryRowContext(ctx, query, did, accessToken, refreshToken).Scan(&returnedDID)
309
310 if err == sql.ErrNoRows {
311 return communities.ErrCommunityNotFound
312 }
313 if err != nil {
314 return fmt.Errorf("failed to update credentials: %w", err)
315 }
316
317 return nil
318}
319
320// Delete removes a community from the database
321func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
322 query := `DELETE FROM communities WHERE did = $1`
323
324 result, err := r.db.ExecContext(ctx, query, did)
325 if err != nil {
326 return fmt.Errorf("failed to delete community: %w", err)
327 }
328
329 rowsAffected, err := result.RowsAffected()
330 if err != nil {
331 return fmt.Errorf("failed to check delete result: %w", err)
332 }
333
334 if rowsAffected == 0 {
335 return communities.ErrCommunityNotFound
336 }
337
338 return nil
339}
340
341// List retrieves communities with filtering and pagination
342func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) {
343 // Build query with filters
344 whereClauses := []string{}
345 args := []interface{}{}
346 argCount := 1
347
348 if req.Visibility != "" {
349 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
350 args = append(args, req.Visibility)
351 argCount++
352 }
353
354 if req.HostedBy != "" {
355 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount))
356 args = append(args, req.HostedBy)
357 argCount++
358 }
359
360 whereClause := ""
361 if len(whereClauses) > 0 {
362 whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
363 }
364
365 // Get total count
366 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
367 var totalCount int
368 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
369 if err != nil {
370 return nil, 0, fmt.Errorf("failed to count communities: %w", err)
371 }
372
373 // Build sort clause
374 sortColumn := "created_at"
375 if req.SortBy != "" {
376 switch req.SortBy {
377 case "member_count", "subscriber_count", "post_count", "created_at":
378 sortColumn = req.SortBy
379 }
380 }
381
382 sortOrder := "DESC"
383 if strings.ToUpper(req.SortOrder) == "ASC" {
384 sortOrder = "ASC"
385 }
386
387 // Get communities with pagination
388 query := fmt.Sprintf(`
389 SELECT id, did, handle, name, display_name, description, description_facets,
390 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
391 visibility, allow_external_discovery, moderation_type, content_warnings,
392 member_count, subscriber_count, post_count,
393 federated_from, federated_id, created_at, updated_at,
394 record_uri, record_cid
395 FROM communities
396 %s
397 ORDER BY %s %s
398 LIMIT $%d OFFSET $%d`,
399 whereClause, sortColumn, sortOrder, argCount, argCount+1)
400
401 args = append(args, req.Limit, req.Offset)
402
403 rows, err := r.db.QueryContext(ctx, query, args...)
404 if err != nil {
405 return nil, 0, fmt.Errorf("failed to list communities: %w", err)
406 }
407 defer func() {
408 if closeErr := rows.Close(); closeErr != nil {
409 log.Printf("Failed to close rows: %v", closeErr)
410 }
411 }()
412
413 result := []*communities.Community{}
414 for rows.Next() {
415 community := &communities.Community{}
416 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
417 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
418 var descFacets []byte
419 var contentWarnings []string
420
421 scanErr := rows.Scan(
422 &community.ID, &community.DID, &community.Handle, &community.Name,
423 &displayName, &description, &descFacets,
424 &avatarCID, &bannerCID,
425 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
426 &community.Visibility, &community.AllowExternalDiscovery,
427 &moderationType, pq.Array(&contentWarnings),
428 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
429 &federatedFrom, &federatedID,
430 &community.CreatedAt, &community.UpdatedAt,
431 &recordURI, &recordCID,
432 )
433 if scanErr != nil {
434 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
435 }
436
437 // Map nullable fields
438 community.DisplayName = displayName.String
439 community.Description = description.String
440 community.AvatarCID = avatarCID.String
441 community.BannerCID = bannerCID.String
442 community.ModerationType = moderationType.String
443 community.ContentWarnings = contentWarnings
444 community.FederatedFrom = federatedFrom.String
445 community.FederatedID = federatedID.String
446 community.RecordURI = recordURI.String
447 community.RecordCID = recordCID.String
448 if descFacets != nil {
449 community.DescriptionFacets = descFacets
450 }
451
452 result = append(result, community)
453 }
454
455 if err = rows.Err(); err != nil {
456 return nil, 0, fmt.Errorf("error iterating communities: %w", err)
457 }
458
459 return result, totalCount, nil
460}
461
462// Search searches communities by name/description using fuzzy matching
463func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
464 // Build query with fuzzy search and visibility filter
465 whereClauses := []string{
466 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
467 }
468 args := []interface{}{req.Query}
469 argCount := 2
470
471 if req.Visibility != "" {
472 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
473 args = append(args, req.Visibility)
474 argCount++
475 }
476
477 whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
478
479 // Get total count
480 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
481 var totalCount int
482 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
483 if err != nil {
484 return nil, 0, fmt.Errorf("failed to count search results: %w", err)
485 }
486
487 // Search with relevance ranking using pg_trgm similarity
488 // Filter out results with very low relevance (< 0.2) to avoid noise
489 query := fmt.Sprintf(`
490 SELECT id, did, handle, name, display_name, description, description_facets,
491 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
492 visibility, allow_external_discovery, moderation_type, content_warnings,
493 member_count, subscriber_count, post_count,
494 federated_from, federated_id, created_at, updated_at,
495 record_uri, record_cid,
496 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
497 FROM communities
498 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
499 ORDER BY relevance DESC, member_count DESC
500 LIMIT $%d OFFSET $%d`,
501 whereClause, argCount, argCount+1)
502
503 args = append(args, req.Limit, req.Offset)
504
505 rows, err := r.db.QueryContext(ctx, query, args...)
506 if err != nil {
507 return nil, 0, fmt.Errorf("failed to search communities: %w", err)
508 }
509 defer func() {
510 if closeErr := rows.Close(); closeErr != nil {
511 log.Printf("Failed to close rows: %v", closeErr)
512 }
513 }()
514
515 result := []*communities.Community{}
516 for rows.Next() {
517 community := &communities.Community{}
518 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
519 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
520 var descFacets []byte
521 var contentWarnings []string
522 var relevance float64
523
524 scanErr := rows.Scan(
525 &community.ID, &community.DID, &community.Handle, &community.Name,
526 &displayName, &description, &descFacets,
527 &avatarCID, &bannerCID,
528 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
529 &community.Visibility, &community.AllowExternalDiscovery,
530 &moderationType, pq.Array(&contentWarnings),
531 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
532 &federatedFrom, &federatedID,
533 &community.CreatedAt, &community.UpdatedAt,
534 &recordURI, &recordCID,
535 &relevance,
536 )
537 if scanErr != nil {
538 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
539 }
540
541 // Map nullable fields
542 community.DisplayName = displayName.String
543 community.Description = description.String
544 community.AvatarCID = avatarCID.String
545 community.BannerCID = bannerCID.String
546 community.ModerationType = moderationType.String
547 community.ContentWarnings = contentWarnings
548 community.FederatedFrom = federatedFrom.String
549 community.FederatedID = federatedID.String
550 community.RecordURI = recordURI.String
551 community.RecordCID = recordCID.String
552 if descFacets != nil {
553 community.DescriptionFacets = descFacets
554 }
555
556 result = append(result, community)
557 }
558
559 if err = rows.Err(); err != nil {
560 return nil, 0, fmt.Errorf("error iterating search results: %w", err)
561 }
562
563 return result, totalCount, nil
564}
565
566// Helper functions
567func nullString(s string) sql.NullString {
568 return sql.NullString{String: s, Valid: s != ""}
569}