A community based topic aggregation platform built on atproto
1package postgres
2
3import (
4 "context"
5 "database/sql"
6 "fmt"
7 "strings"
8
9 "Coves/internal/core/communities"
10 "github.com/lib/pq"
11)
12
13type postgresCommunityRepo struct {
14 db *sql.DB
15}
16
17// NewCommunityRepository creates a new PostgreSQL community repository
18func NewCommunityRepository(db *sql.DB) communities.Repository {
19 return &postgresCommunityRepo{db: db}
20}
21
22// Create inserts a new community into the communities table
23func (r *postgresCommunityRepo) Create(ctx context.Context, community *communities.Community) (*communities.Community, error) {
24 query := `
25 INSERT INTO communities (
26 did, handle, name, display_name, description, description_facets,
27 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
28 pds_email, pds_password_hash,
29 pds_access_token_encrypted, pds_refresh_token_encrypted, pds_url,
30 visibility, allow_external_discovery, moderation_type, content_warnings,
31 member_count, subscriber_count, post_count,
32 federated_from, federated_id, created_at, updated_at,
33 record_uri, record_cid
34 ) VALUES (
35 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11,
36 $12, $13,
37 CASE WHEN $14 != '' THEN pgp_sym_encrypt($14, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
38 CASE WHEN $15 != '' THEN pgp_sym_encrypt($15, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)) ELSE NULL END,
39 $16,
40 $17, $18, $19, $20,
41 $21, $22, $23, $24, $25, $26, $27, $28, $29
42 )
43 RETURNING id, created_at, updated_at`
44
45 // Handle JSONB field - use sql.NullString with valid JSON or NULL
46 var descFacets interface{}
47 if community.DescriptionFacets != nil && len(community.DescriptionFacets) > 0 {
48 descFacets = community.DescriptionFacets
49 } else {
50 descFacets = nil
51 }
52
53 err := r.db.QueryRowContext(ctx, query,
54 community.DID,
55 community.Handle,
56 community.Name,
57 nullString(community.DisplayName),
58 nullString(community.Description),
59 descFacets,
60 nullString(community.AvatarCID),
61 nullString(community.BannerCID),
62 community.OwnerDID,
63 community.CreatedByDID,
64 community.HostedByDID,
65 // V2: PDS credentials for community account
66 nullString(community.PDSEmail),
67 nullString(community.PDSPasswordHash),
68 nullString(community.PDSAccessToken),
69 nullString(community.PDSRefreshToken),
70 nullString(community.PDSURL),
71 community.Visibility,
72 community.AllowExternalDiscovery,
73 nullString(community.ModerationType),
74 pq.Array(community.ContentWarnings),
75 community.MemberCount,
76 community.SubscriberCount,
77 community.PostCount,
78 nullString(community.FederatedFrom),
79 nullString(community.FederatedID),
80 community.CreatedAt,
81 community.UpdatedAt,
82 nullString(community.RecordURI),
83 nullString(community.RecordCID),
84 ).Scan(&community.ID, &community.CreatedAt, &community.UpdatedAt)
85
86 if err != nil {
87 // Check for unique constraint violations
88 if strings.Contains(err.Error(), "duplicate key") {
89 if strings.Contains(err.Error(), "communities_did_key") {
90 return nil, communities.ErrCommunityAlreadyExists
91 }
92 if strings.Contains(err.Error(), "communities_handle_key") {
93 return nil, communities.ErrHandleTaken
94 }
95 }
96 return nil, fmt.Errorf("failed to create community: %w", err)
97 }
98
99 return community, nil
100}
101
102// GetByDID retrieves a community by its DID
103// Note: PDS credentials are included (for internal service use only)
104// Handlers MUST use json:"-" tags to prevent credential exposure in APIs
105func (r *postgresCommunityRepo) GetByDID(ctx context.Context, did string) (*communities.Community, error) {
106 community := &communities.Community{}
107 query := `
108 SELECT id, did, handle, name, display_name, description, description_facets,
109 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
110 pds_email, pds_password_hash,
111 COALESCE(pgp_sym_decrypt(pds_access_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), '') as pds_access_token,
112 COALESCE(pgp_sym_decrypt(pds_refresh_token_encrypted, (SELECT encode(key_data, 'hex') FROM encryption_keys WHERE id = 1)), '') as pds_refresh_token,
113 pds_url,
114 visibility, allow_external_discovery, moderation_type, content_warnings,
115 member_count, subscriber_count, post_count,
116 federated_from, federated_id, created_at, updated_at,
117 record_uri, record_cid
118 FROM communities
119 WHERE did = $1`
120
121 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
122 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
123 var pdsEmail, pdsPasswordHash, pdsAccessToken, pdsRefreshToken, pdsURL sql.NullString
124 var descFacets []byte
125 var contentWarnings []string
126
127 err := r.db.QueryRowContext(ctx, query, did).Scan(
128 &community.ID, &community.DID, &community.Handle, &community.Name,
129 &displayName, &description, &descFacets,
130 &avatarCID, &bannerCID,
131 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
132 // V2: PDS credentials
133 &pdsEmail, &pdsPasswordHash, &pdsAccessToken, &pdsRefreshToken, &pdsURL,
134 &community.Visibility, &community.AllowExternalDiscovery,
135 &moderationType, pq.Array(&contentWarnings),
136 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
137 &federatedFrom, &federatedID,
138 &community.CreatedAt, &community.UpdatedAt,
139 &recordURI, &recordCID,
140 )
141
142 if err == sql.ErrNoRows {
143 return nil, communities.ErrCommunityNotFound
144 }
145 if err != nil {
146 return nil, fmt.Errorf("failed to get community by DID: %w", err)
147 }
148
149 // Map nullable fields
150 community.DisplayName = displayName.String
151 community.Description = description.String
152 community.AvatarCID = avatarCID.String
153 community.BannerCID = bannerCID.String
154 community.PDSEmail = pdsEmail.String
155 community.PDSPasswordHash = pdsPasswordHash.String
156 community.PDSAccessToken = pdsAccessToken.String
157 community.PDSRefreshToken = pdsRefreshToken.String
158 community.PDSURL = pdsURL.String
159 community.ModerationType = moderationType.String
160 community.ContentWarnings = contentWarnings
161 community.FederatedFrom = federatedFrom.String
162 community.FederatedID = federatedID.String
163 community.RecordURI = recordURI.String
164 community.RecordCID = recordCID.String
165 if descFacets != nil {
166 community.DescriptionFacets = descFacets
167 }
168
169 return community, nil
170}
171
172// GetByHandle retrieves a community by its scoped handle
173func (r *postgresCommunityRepo) GetByHandle(ctx context.Context, handle string) (*communities.Community, error) {
174 community := &communities.Community{}
175 query := `
176 SELECT id, did, handle, name, display_name, description, description_facets,
177 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
178 visibility, allow_external_discovery, moderation_type, content_warnings,
179 member_count, subscriber_count, post_count,
180 federated_from, federated_id, created_at, updated_at,
181 record_uri, record_cid
182 FROM communities
183 WHERE handle = $1`
184
185 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
186 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
187 var descFacets []byte
188 var contentWarnings []string
189
190 err := r.db.QueryRowContext(ctx, query, handle).Scan(
191 &community.ID, &community.DID, &community.Handle, &community.Name,
192 &displayName, &description, &descFacets,
193 &avatarCID, &bannerCID,
194 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
195 &community.Visibility, &community.AllowExternalDiscovery,
196 &moderationType, pq.Array(&contentWarnings),
197 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
198 &federatedFrom, &federatedID,
199 &community.CreatedAt, &community.UpdatedAt,
200 &recordURI, &recordCID,
201 )
202
203 if err == sql.ErrNoRows {
204 return nil, communities.ErrCommunityNotFound
205 }
206 if err != nil {
207 return nil, fmt.Errorf("failed to get community by handle: %w", err)
208 }
209
210 // Map nullable fields
211 community.DisplayName = displayName.String
212 community.Description = description.String
213 community.AvatarCID = avatarCID.String
214 community.BannerCID = bannerCID.String
215 community.ModerationType = moderationType.String
216 community.ContentWarnings = contentWarnings
217 community.FederatedFrom = federatedFrom.String
218 community.FederatedID = federatedID.String
219 community.RecordURI = recordURI.String
220 community.RecordCID = recordCID.String
221 if descFacets != nil {
222 community.DescriptionFacets = descFacets
223 }
224
225 return community, nil
226}
227
228// Update modifies an existing community's metadata
229func (r *postgresCommunityRepo) Update(ctx context.Context, community *communities.Community) (*communities.Community, error) {
230 query := `
231 UPDATE communities
232 SET display_name = $2, description = $3, description_facets = $4,
233 avatar_cid = $5, banner_cid = $6,
234 visibility = $7, allow_external_discovery = $8,
235 moderation_type = $9, content_warnings = $10,
236 updated_at = NOW(),
237 record_uri = $11, record_cid = $12
238 WHERE did = $1
239 RETURNING updated_at`
240
241 // Handle JSONB field - use sql.NullString with valid JSON or NULL
242 var descFacets interface{}
243 if community.DescriptionFacets != nil && len(community.DescriptionFacets) > 0 {
244 descFacets = community.DescriptionFacets
245 } else {
246 descFacets = nil
247 }
248
249 err := r.db.QueryRowContext(ctx, query,
250 community.DID,
251 nullString(community.DisplayName),
252 nullString(community.Description),
253 descFacets,
254 nullString(community.AvatarCID),
255 nullString(community.BannerCID),
256 community.Visibility,
257 community.AllowExternalDiscovery,
258 nullString(community.ModerationType),
259 pq.Array(community.ContentWarnings),
260 nullString(community.RecordURI),
261 nullString(community.RecordCID),
262 ).Scan(&community.UpdatedAt)
263
264 if err == sql.ErrNoRows {
265 return nil, communities.ErrCommunityNotFound
266 }
267 if err != nil {
268 return nil, fmt.Errorf("failed to update community: %w", err)
269 }
270
271 return community, nil
272}
273
274// Delete removes a community from the database
275func (r *postgresCommunityRepo) Delete(ctx context.Context, did string) error {
276 query := `DELETE FROM communities WHERE did = $1`
277
278 result, err := r.db.ExecContext(ctx, query, did)
279 if err != nil {
280 return fmt.Errorf("failed to delete community: %w", err)
281 }
282
283 rowsAffected, err := result.RowsAffected()
284 if err != nil {
285 return fmt.Errorf("failed to check delete result: %w", err)
286 }
287
288 if rowsAffected == 0 {
289 return communities.ErrCommunityNotFound
290 }
291
292 return nil
293}
294
295// List retrieves communities with filtering and pagination
296func (r *postgresCommunityRepo) List(ctx context.Context, req communities.ListCommunitiesRequest) ([]*communities.Community, int, error) {
297 // Build query with filters
298 whereClauses := []string{}
299 args := []interface{}{}
300 argCount := 1
301
302 if req.Visibility != "" {
303 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
304 args = append(args, req.Visibility)
305 argCount++
306 }
307
308 if req.HostedBy != "" {
309 whereClauses = append(whereClauses, fmt.Sprintf("hosted_by_did = $%d", argCount))
310 args = append(args, req.HostedBy)
311 argCount++
312 }
313
314 whereClause := ""
315 if len(whereClauses) > 0 {
316 whereClause = "WHERE " + strings.Join(whereClauses, " AND ")
317 }
318
319 // Get total count
320 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
321 var totalCount int
322 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
323 if err != nil {
324 return nil, 0, fmt.Errorf("failed to count communities: %w", err)
325 }
326
327 // Build sort clause
328 sortColumn := "created_at"
329 if req.SortBy != "" {
330 switch req.SortBy {
331 case "member_count", "subscriber_count", "post_count", "created_at":
332 sortColumn = req.SortBy
333 }
334 }
335
336 sortOrder := "DESC"
337 if strings.ToUpper(req.SortOrder) == "ASC" {
338 sortOrder = "ASC"
339 }
340
341 // Get communities with pagination
342 query := fmt.Sprintf(`
343 SELECT id, did, handle, name, display_name, description, description_facets,
344 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
345 visibility, allow_external_discovery, moderation_type, content_warnings,
346 member_count, subscriber_count, post_count,
347 federated_from, federated_id, created_at, updated_at,
348 record_uri, record_cid
349 FROM communities
350 %s
351 ORDER BY %s %s
352 LIMIT $%d OFFSET $%d`,
353 whereClause, sortColumn, sortOrder, argCount, argCount+1)
354
355 args = append(args, req.Limit, req.Offset)
356
357 rows, err := r.db.QueryContext(ctx, query, args...)
358 if err != nil {
359 return nil, 0, fmt.Errorf("failed to list communities: %w", err)
360 }
361 defer rows.Close()
362
363 result := []*communities.Community{}
364 for rows.Next() {
365 community := &communities.Community{}
366 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
367 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
368 var descFacets []byte
369 var contentWarnings []string
370
371 err := rows.Scan(
372 &community.ID, &community.DID, &community.Handle, &community.Name,
373 &displayName, &description, &descFacets,
374 &avatarCID, &bannerCID,
375 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
376 &community.Visibility, &community.AllowExternalDiscovery,
377 &moderationType, pq.Array(&contentWarnings),
378 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
379 &federatedFrom, &federatedID,
380 &community.CreatedAt, &community.UpdatedAt,
381 &recordURI, &recordCID,
382 )
383 if err != nil {
384 return nil, 0, fmt.Errorf("failed to scan community: %w", err)
385 }
386
387 // Map nullable fields
388 community.DisplayName = displayName.String
389 community.Description = description.String
390 community.AvatarCID = avatarCID.String
391 community.BannerCID = bannerCID.String
392 community.ModerationType = moderationType.String
393 community.ContentWarnings = contentWarnings
394 community.FederatedFrom = federatedFrom.String
395 community.FederatedID = federatedID.String
396 community.RecordURI = recordURI.String
397 community.RecordCID = recordCID.String
398 if descFacets != nil {
399 community.DescriptionFacets = descFacets
400 }
401
402 result = append(result, community)
403 }
404
405 if err = rows.Err(); err != nil {
406 return nil, 0, fmt.Errorf("error iterating communities: %w", err)
407 }
408
409 return result, totalCount, nil
410}
411
412// Search searches communities by name/description using fuzzy matching
413func (r *postgresCommunityRepo) Search(ctx context.Context, req communities.SearchCommunitiesRequest) ([]*communities.Community, int, error) {
414 // Build query with fuzzy search and visibility filter
415 whereClauses := []string{
416 "(name ILIKE '%' || $1 || '%' OR description ILIKE '%' || $1 || '%')",
417 }
418 args := []interface{}{req.Query}
419 argCount := 2
420
421 if req.Visibility != "" {
422 whereClauses = append(whereClauses, fmt.Sprintf("visibility = $%d", argCount))
423 args = append(args, req.Visibility)
424 argCount++
425 }
426
427 whereClause := "WHERE " + strings.Join(whereClauses, " AND ")
428
429 // Get total count
430 countQuery := fmt.Sprintf("SELECT COUNT(*) FROM communities %s", whereClause)
431 var totalCount int
432 err := r.db.QueryRowContext(ctx, countQuery, args...).Scan(&totalCount)
433 if err != nil {
434 return nil, 0, fmt.Errorf("failed to count search results: %w", err)
435 }
436
437 // Search with relevance ranking using pg_trgm similarity
438 // Filter out results with very low relevance (< 0.2) to avoid noise
439 query := fmt.Sprintf(`
440 SELECT id, did, handle, name, display_name, description, description_facets,
441 avatar_cid, banner_cid, owner_did, created_by_did, hosted_by_did,
442 visibility, allow_external_discovery, moderation_type, content_warnings,
443 member_count, subscriber_count, post_count,
444 federated_from, federated_id, created_at, updated_at,
445 record_uri, record_cid,
446 similarity(name, $1) + similarity(COALESCE(description, ''), $1) as relevance
447 FROM communities
448 %s AND (similarity(name, $1) + similarity(COALESCE(description, ''), $1)) > 0.2
449 ORDER BY relevance DESC, member_count DESC
450 LIMIT $%d OFFSET $%d`,
451 whereClause, argCount, argCount+1)
452
453 args = append(args, req.Limit, req.Offset)
454
455 rows, err := r.db.QueryContext(ctx, query, args...)
456 if err != nil {
457 return nil, 0, fmt.Errorf("failed to search communities: %w", err)
458 }
459 defer rows.Close()
460
461 result := []*communities.Community{}
462 for rows.Next() {
463 community := &communities.Community{}
464 var displayName, description, avatarCID, bannerCID, moderationType sql.NullString
465 var federatedFrom, federatedID, recordURI, recordCID sql.NullString
466 var descFacets []byte
467 var contentWarnings []string
468 var relevance float64
469
470 err := rows.Scan(
471 &community.ID, &community.DID, &community.Handle, &community.Name,
472 &displayName, &description, &descFacets,
473 &avatarCID, &bannerCID,
474 &community.OwnerDID, &community.CreatedByDID, &community.HostedByDID,
475 &community.Visibility, &community.AllowExternalDiscovery,
476 &moderationType, pq.Array(&contentWarnings),
477 &community.MemberCount, &community.SubscriberCount, &community.PostCount,
478 &federatedFrom, &federatedID,
479 &community.CreatedAt, &community.UpdatedAt,
480 &recordURI, &recordCID,
481 &relevance,
482 )
483 if err != nil {
484 return nil, 0, fmt.Errorf("failed to scan community: %w", err)
485 }
486
487 // Map nullable fields
488 community.DisplayName = displayName.String
489 community.Description = description.String
490 community.AvatarCID = avatarCID.String
491 community.BannerCID = bannerCID.String
492 community.ModerationType = moderationType.String
493 community.ContentWarnings = contentWarnings
494 community.FederatedFrom = federatedFrom.String
495 community.FederatedID = federatedID.String
496 community.RecordURI = recordURI.String
497 community.RecordCID = recordCID.String
498 if descFacets != nil {
499 community.DescriptionFacets = descFacets
500 }
501
502 result = append(result, community)
503 }
504
505 if err = rows.Err(); err != nil {
506 return nil, 0, fmt.Errorf("error iterating search results: %w", err)
507 }
508
509 return result, totalCount, nil
510}
511
512// Helper functions
513func nullString(s string) sql.NullString {
514 return sql.NullString{String: s, Valid: s != ""}
515}
516
517func nullBytes(b []byte) []byte {
518 if b == nil || len(b) == 0 {
519 return nil
520 }
521 return b
522}