A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "Coves/internal/core/communities"
5 "context"
6 "database/sql"
7 "fmt"
8 "log"
9 "strings"
10
11 "github.com/lib/pq"
12)
13
14type postgresCommunityRepo struct {
15 db *sql.DB
16}
17
18// NewCommunityRepository creates a new PostgreSQL community repository
19func NewCommunityRepository(db *sql.DB) communities.Repository {
20 return &postgresCommunityRepo{db: db}
21}
22
23// Create inserts a new community into the communities table
24func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
25 query := `
26 INSERT INTO communities (
27 did, handle, name, display_name, description, description_facets,
28 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
29 pds_email, pds_password_encrypted,
30 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url,
31 visibility, allow_external_discovery, moderation_type, content_warnings,
32 member_count, subscriber_count, post_count,
33 federated_from, federated_id, created_at, updated_at,
34 record_uri, record_cid
35 ) VALUES (
36 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
37 $12,
38 CASE WHEN $13 != '' THEN pgp_sym_encrypt($13, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
39 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
40 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
41 $16,
42 $17, $18, $19, $20,
43 $21, $22, $23, $24, $25, $26, $27, $28, $29
44 )
45 RETURNING id, created_at, updated_at`
46
47 // Handle JSONB field - use sql.NullString with valid JSON or NULL
48 var descFacets interface{}
49 if len(community.DescriptionFacets) > 0 {
50 descFacets = community.DescriptionFacets
51 } else {
52 descFacets = nil
53 }
54
55 err := r.db.QueryRowContext(ctx, query,
56 community.DID,
57 community.Handle,
58 community.Name,
59 nullString(community.DisplayName),
60 nullString(community.Description),
61 descFacets,
62 nullString(community.AvatarCID),
63 nullString(community.BannerCID),
64 community.OwnerDID,
65 community.CreatedByDID,
66 community.HostedByDID,
67 // V2.0: PDS credentials for community account (encrypted at rest)
68 nullString(community.PDSEmail),
69 nullString(community.PDSPassword), // Encrypted by pgp_sym_encrypt
70 nullString(community.PDSAccessToken), // Encrypted by pgp_sym_encrypt
71 nullString(community.PDSRefreshToken), // Encrypted by pgp_sym_encrypt
72 nullString(community.PDSURL),
73 // V2.0: No key columns - PDS manages all keys
74 community.Visibility,
75 community.AllowExternalDiscovery,
76 nullString(community.ModerationType),
77 pq.Array(community.ContentWarnings),
78 community.MemberCount,
79 community.SubscriberCount,
80 community.PostCount,
81 nullString(community.FederatedFrom),
82 nullString(community.FederatedID),
83 community.CreatedAt,
84 community.UpdatedAt,
85 nullString(community.RecordURI),
86 nullString(community.RecordCID),
87 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
88 if err != nil {
89 // Check for unique constraint violations
90 if strings.Contains(err.Error(), "duplicate key") {
91 if strings.Contains(err.Error(), "communities_did_key") {
92 return nil, communities.ErrCommunityAlreadyExists
93 }
94 if strings.Contains(err.Error(), "communities_handle_key") {
95 return nil, communities.ErrHandleTaken
96 }
97 }
98 return nil, fmt.Errorf("failed to create community: %w", err)
99 }
100
101 return community, nil
102}
103
104// GetByDID retrieves a community by its DID
105// Note: PDS credentials are included (for internal service use only)
106// Handlers MUST use json:"-" tags to prevent credential exposure in APIs
107//
108// V2.0: Key columns not included - PDS manages all keys
109func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
110 community := &communities.Community{}
111 query := `
112 SELECT id, did, handle, name, display_name, description, description_facets,
113 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
114 pds_email,
115 CASE
116 WHEN pds_password_encrypted IS NOT NULL
117 THEN pgp_sym_decrypt(pds_password_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
118 ELSE NULL
119 END as pds_password,
120 CASE
121 WHEN pds_access_token_encrypted IS NOT NULL
122 THEN pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
123 ELSE NULL
124 END as pds_access_token,
125 CASE
126 WHEN pds_refresh_token_encrypted IS NOT NULL
127 THEN pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1))
128 ELSE NULL
129 END as pds_refresh_token,
130 pds_url,
131 visibility, allow_external_discovery, moderation_type, content_warnings,
132 member_count, subscriber_count, post_count,
133 federated_from, federated_id, created_at, updated_at,
134 record_uri, record_cid
135 FROM communities
136 WHERE did = $1`
137
138 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
139 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
140 var pdsEmail, pdsPassword, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString
141 var descFacets []byte
142 var contentWarnings []string
143
144 err := r.db.QueryRowContext(ctx, query, did).Scan(
145 &community.ID, &community.DID, &community.Handle, &community.Name,
146 &displayName, &description, &descFacets,
147 &avatarCID, &bannerCID,
148 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
149 // V2.0: PDS credentials (decrypted from pgp_sym_encrypt)
150 &pdsEmail, &pdsPassword, &pdsAccessToken, &pdsRefreshToken, &pdsURL,
151 &community.Visibility, &community.AllowExternalDiscovery,
152 &moderationType, pq.Array(&contentWarnings),
153 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
154 &federatedFrom, &federatedID,
155 &community.CreatedAt, &community.UpdatedAt,
156 &recordURI, &recordCID,
157 )
158
159 if err == sql.ErrNoRows {
160 return nil, communities.ErrCommunityNotFound
161 }
162 if err != nil {
163 return nil, fmt.Errorf("failed to get community by DID: %w", err)
164 }
165
166 // Map nullable fields
167 community.DisplayName = displayName.String
168 community.Description = description.String
169 community.AvatarCID = avatarCID.String
170 community.BannerCID = bannerCID.String
171 community.PDSEmail = pdsEmail.String
172 community.PDSPassword = pdsPassword.String
173 community.PDSAccessToken = pdsAccessToken.String
174 community.PDSRefreshToken = pdsRefreshToken.String
175 community.PDSURL = pdsURL.String
176 // V2.0: No key fields - PDS manages all keys
177 community.RotationKeyPEM = "" // Empty - PDS-managed
178 community.SigningKeyPEM = "" // Empty - PDS-managed
179 community.ModerationType = moderationType.String
180 community.ContentWarnings = contentWarnings
181 community.FederatedFrom = federatedFrom.String
182 community.FederatedID = federatedID.String
183 community.RecordURI = recordURI.String
184 community.RecordCID = recordCID.String
185 if descFacets != nil {
186 community.DescriptionFacets = descFacets
187 }
188
189 return community, nil
190}
191
192// GetByHandle retrieves a community by its scoped handle
193func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
194 community := &communities.Community{}
195 query := `
196 SELECT id, did, handle, name, display_name, description, description_facets,
197 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
198 visibility, allow_external_discovery, moderation_type, content_warnings,
199 member_count, subscriber_count, post_count,
200 federated_from, federated_id, created_at, updated_at,
201 record_uri, record_cid
202 FROM communities
203 WHERE handle = $1`
204
205 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
206 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
207 var descFacets []byte
208 var contentWarnings []string
209
210 err := r.db.QueryRowContext(ctx, query, handle).Scan(
211 &community.ID, &community.DID, &community.Handle, &community.Name,
212 &displayName, &description, &descFacets,
213 &avatarCID, &bannerCID,
214 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
215 &community.Visibility, &community.AllowExternalDiscovery,
216 &moderationType, pq.Array(&contentWarnings),
217 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
218 &federatedFrom, &federatedID,
219 &community.CreatedAt, &community.UpdatedAt,
220 &recordURI, &recordCID,
221 )
222
223 if err == sql.ErrNoRows {
224 return nil, communities.ErrCommunityNotFound
225 }
226 if err != nil {
227 return nil, fmt.Errorf("failed to get community by handle: %w", err)
228 }
229
230 // Map nullable fields
231 community.DisplayName = displayName.String
232 community.Description = description.String
233 community.AvatarCID = avatarCID.String
234 community.BannerCID = bannerCID.String
235 community.ModerationType = moderationType.String
236 community.ContentWarnings = contentWarnings
237 community.FederatedFrom = federatedFrom.String
238 community.FederatedID = federatedID.String
239 community.RecordURI = recordURI.String
240 community.RecordCID = recordCID.String
241 if descFacets != nil {
242 community.DescriptionFacets = descFacets
243 }
244
245 return community, nil
246}
247
248// Update modifies an existing community's metadata
249func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
250 query := `
251 UPDATE communities
252 SET display_name = $2, description = $3, description_facets = $4,
253 avatar_cid = $5, banner_cid = $6,
254 visibility = $7, allow_external_discovery = $8,
255 moderation_type = $9, content_warnings = $10,
256 updated_at = NOW(),
257 record_uri = $11, record_cid = $12
258 WHERE did = $1
259 RETURNING updated_at`
260
261 // Handle JSONB field - use sql.NullString with valid JSON or NULL
262 var descFacets interface{}
263 if len(community.DescriptionFacets) > 0 {
264 descFacets = community.DescriptionFacets
265 } else {
266 descFacets = nil
267 }
268
269 err := r.db.QueryRowContext(ctx, query,
270 community.DID,
271 nullString(community.DisplayName),
272 nullString(community.Description),
273 descFacets,
274 nullString(community.AvatarCID),
275 nullString(community.BannerCID),
276 community.Visibility,
277 community.AllowExternalDiscovery,
278 nullString(community.ModerationType),
279 pq.Array(community.ContentWarnings),
280 nullString(community.RecordURI),
281 nullString(community.RecordCID),
282 ).Scan(&community.UpdatedAt)
283
284 if err == sql.ErrNoRows {
285 return nil, communities.ErrCommunityNotFound
286 }
287 if err != nil {
288 return nil, fmt.Errorf("failed to update community: %w", err)
289 }
290
291 return community, nil
292}
293
294// Delete removes a community from the database
295func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
296 query := `DELETE FROM communities WHERE did = $1`
297
298 result, err := r.db.ExecContext(ctx, query, did)
299 if err != nil {
300 return fmt.Errorf("failed to delete community: %w", err)
301 }
302
303 rowsAffected, err := result.RowsAffected()
304 if err != nil {
305 return fmt.Errorf("failed to check delete result: %w", err)
306 }
307
308 if rowsAffected == 0 {
309 return communities.ErrCommunityNotFound
310 }
311
312 return nil
313}
314
315// List retrieves communities with filtering and pagination
316func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) {
317 // Build query with filters
318 whereClauses := []string{}
319 args := []interface{}{}
320 argCount := 1
321
322 if req.Visibility != "" {
323 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
324 args = append(args, req.Visibility)
325 argCount++
326 }
327
328 if req.HostedBy != "" {
329 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount))
330 args = append(args, req.HostedBy)
331 argCount++
332 }
333
334 whereClause := ""
335 if len(whereClauses) > 0 {
336 whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
337 }
338
339 // Get total count
340 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
341 var totalCount int
342 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
343 if err != nil {
344 return nil, 0, fmt.Errorf("failed to count communities: %w", err)
345 }
346
347 // Build sort clause
348 sortColumn := "created_at"
349 if req.SortBy != "" {
350 switch req.SortBy {
351 case "member_count", "subscriber_count", "post_count", "created_at":
352 sortColumn = req.SortBy
353 }
354 }
355
356 sortOrder := "DESC"
357 if strings.ToUpper(req.SortOrder) == "ASC" {
358 sortOrder = "ASC"
359 }
360
361 // Get communities with pagination
362 query := fmt.Sprintf(`
363 SELECT id, did, handle, name, display_name, description, description_facets,
364 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
365 visibility, allow_external_discovery, moderation_type, content_warnings,
366 member_count, subscriber_count, post_count,
367 federated_from, federated_id, created_at, updated_at,
368 record_uri, record_cid
369 FROM communities
370 %s
371 ORDER BY %s %s
372 LIMIT $%d OFFSET $%d`,
373 whereClause, sortColumn, sortOrder, argCount, argCount+1)
374
375 args = append(args, req.Limit, req.Offset)
376
377 rows, err := r.db.QueryContext(ctx, query, args...)
378 if err != nil {
379 return nil, 0, fmt.Errorf("failed to list communities: %w", err)
380 }
381 defer func() {
382 if closeErr := rows.Close(); closeErr != nil {
383 log.Printf("Failed to close rows: %v", closeErr)
384 }
385 }()
386
387 result := []*communities.Community{}
388 for rows.Next() {
389 community := &communities.Community{}
390 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
391 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
392 var descFacets []byte
393 var contentWarnings []string
394
395 scanErr := rows.Scan(
396 &community.ID, &community.DID, &community.Handle, &community.Name,
397 &displayName, &description, &descFacets,
398 &avatarCID, &bannerCID,
399 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
400 &community.Visibility, &community.AllowExternalDiscovery,
401 &moderationType, pq.Array(&contentWarnings),
402 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
403 &federatedFrom, &federatedID,
404 &community.CreatedAt, &community.UpdatedAt,
405 &recordURI, &recordCID,
406 )
407 if scanErr != nil {
408 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
409 }
410
411 // Map nullable fields
412 community.DisplayName = displayName.String
413 community.Description = description.String
414 community.AvatarCID = avatarCID.String
415 community.BannerCID = bannerCID.String
416 community.ModerationType = moderationType.String
417 community.ContentWarnings = contentWarnings
418 community.FederatedFrom = federatedFrom.String
419 community.FederatedID = federatedID.String
420 community.RecordURI = recordURI.String
421 community.RecordCID = recordCID.String
422 if descFacets != nil {
423 community.DescriptionFacets = descFacets
424 }
425
426 result = append(result, community)
427 }
428
429 if err = rows.Err(); err != nil {
430 return nil, 0, fmt.Errorf("error iterating communities: %w", err)
431 }
432
433 return result, totalCount, nil
434}
435
436// Search searches communities by name/description using fuzzy matching
437func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
438 // Build query with fuzzy search and visibility filter
439 whereClauses := []string{
440 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
441 }
442 args := []interface{}{req.Query}
443 argCount := 2
444
445 if req.Visibility != "" {
446 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
447 args = append(args, req.Visibility)
448 argCount++
449 }
450
451 whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
452
453 // Get total count
454 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
455 var totalCount int
456 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
457 if err != nil {
458 return nil, 0, fmt.Errorf("failed to count search results: %w", err)
459 }
460
461 // Search with relevance ranking using pg_trgm similarity
462 // Filter out results with very low relevance (< 0.2) to avoid noise
463 query := fmt.Sprintf(`
464 SELECT id, did, handle, name, display_name, description, description_facets,
465 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
466 visibility, allow_external_discovery, moderation_type, content_warnings,
467 member_count, subscriber_count, post_count,
468 federated_from, federated_id, created_at, updated_at,
469 record_uri, record_cid,
470 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
471 FROM communities
472 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
473 ORDER BY relevance DESC, member_count DESC
474 LIMIT $%d OFFSET $%d`,
475 whereClause, argCount, argCount+1)
476
477 args = append(args, req.Limit, req.Offset)
478
479 rows, err := r.db.QueryContext(ctx, query, args...)
480 if err != nil {
481 return nil, 0, fmt.Errorf("failed to search communities: %w", err)
482 }
483 defer func() {
484 if closeErr := rows.Close(); closeErr != nil {
485 log.Printf("Failed to close rows: %v", closeErr)
486 }
487 }()
488
489 result := []*communities.Community{}
490 for rows.Next() {
491 community := &communities.Community{}
492 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
493 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
494 var descFacets []byte
495 var contentWarnings []string
496 var relevance float64
497
498 scanErr := rows.Scan(
499 &community.ID, &community.DID, &community.Handle, &community.Name,
500 &displayName, &description, &descFacets,
501 &avatarCID, &bannerCID,
502 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
503 &community.Visibility, &community.AllowExternalDiscovery,
504 &moderationType, pq.Array(&contentWarnings),
505 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
506 &federatedFrom, &federatedID,
507 &community.CreatedAt, &community.UpdatedAt,
508 &recordURI, &recordCID,
509 &relevance,
510 )
511 if scanErr != nil {
512 return nil, 0, fmt.Errorf("failed to scan community: %w", scanErr)
513 }
514
515 // Map nullable fields
516 community.DisplayName = displayName.String
517 community.Description = description.String
518 community.AvatarCID = avatarCID.String
519 community.BannerCID = bannerCID.String
520 community.ModerationType = moderationType.String
521 community.ContentWarnings = contentWarnings
522 community.FederatedFrom = federatedFrom.String
523 community.FederatedID = federatedID.String
524 community.RecordURI = recordURI.String
525 community.RecordCID = recordCID.String
526 if descFacets != nil {
527 community.DescriptionFacets = descFacets
528 }
529
530 result = append(result, community)
531 }
532
533 if err = rows.Err(); err != nil {
534 return nil, 0, fmt.Errorf("error iterating search results: %w", err)
535 }
536
537 return result, totalCount, nil
538}
539
540// Helper functions
541func nullString(s string) sql.NullString {
542 return sql.NullString{String: s, Valid: s != ""}
543}